Source code for ska_pst.send.metadata

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for structure used for writing YAML meta data files."""

from __future__ import annotations

import datetime as dt
import logging
import pathlib
import tempfile
from dataclasses import asdict, dataclass, field
from decimal import Decimal

import astropy.time as apt
import yaml
from astropy import units as u
from astropy.coordinates import SkyCoord
from ska_pst.common.constants import VERSION
from ska_pst.lmc.receive.receive_util import generate_recv_configure_scan_request

from ska_pst.common import TelescopeConfig

from .constants import DATA_PRODUCT_FILE_NAME

__all__ = [
    "PstContext",
    "PstConfig",
    "PstFiles",
    "PstObsCore",
    "PstMetadata",
]

INTERFACE: str = "http://schema.skao.int/ska-data-product-meta/0.1"
CONFIG_IMAGE: str = "artefact.skao.int/ska-pst/ska-pst"

DEFAULT_OUTPUT_DIR: pathlib.Path = pathlib.Path(tempfile.gettempdir())

_logger = logging.getLogger(__name__)


[docs]@dataclass(kw_only=True, frozen=True) class ScanMetadata: """ Metadata relating to a scan. This is a PST internal representation of that metadata that is used in the generation of the SKA data product metadata file. Different scan types will need to be able to extract this information from the scan configuration and/or output files. """ utc_start: dt.datetime """ The UTC_START of the scan. This value must have the UTC timezone set rather than being a naive datetime instance. In PST this value has second resolution and no fractional seconds, that is captured by the ``picoseconds`` value. """ picoseconds: int """ The fractional seconds when the scan starts in picoseconds. In PST the fractional seconds of a scan's start time is stored as picoseconds in a separate value from the ``utc_start`` value. """ scan_length_secs: float """ The length of a scan in seconds. This value can capture fractional seconds. """ tsamp_out: float """The temporal resolution in microseconds.""" centre_freq_out_mhz: float """The centre frequency of the output data, in MHz.""" bandwidth_out_mhz: float """The bandwidth of the output data, in MHz.""" nchan_out: int """The number of output channels.""" polarisations_out: list[str] """The polarisations in the output data.""" scan_files: list[PstFiles] """ The output files for the scan. The property ``total_file_size`` uses this list to get the overall size of the scan products. """ output_data_type: str """ The output data type as defined in the solution intent for a given scan type. This should be "voltages" for voltage recorder and flow through modes and for detected filter bank it should be "spectra". """ @staticmethod def _as_mjd(datetime: dt.datetime) -> float: mjd = apt.Time(datetime, scale="utc").mjd # Round to the desired accuracy rounded_mjd = round(mjd, 11) return float(Decimal(rounded_mjd)) @property def start_time(self: ScanMetadata) -> dt.datetime: """ Get the scan's start time. This returns the UTC_START + PICOSECONDS as a Python's datetime object. """ microseconds = u.Quantity(self.picoseconds, unit="ps").to("us").value return self.utc_start + dt.timedelta(microseconds=microseconds) @property def start_time_mjd(self: ScanMetadata) -> float: """Get the scan's start time as a Modified Julian date (MJD) value.""" return ScanMetadata._as_mjd(self.start_time) @property def end_time(self: ScanMetadata) -> dt.datetime: """ Get the scan's end time. This is defined as the ``start_time + scan_length_secs``. """ return self.start_time + dt.timedelta(seconds=self.scan_length_secs) @property def end_time_mjd(self: ScanMetadata) -> float: """Get the scan's end time as a Modified Julian date (MJD) value.""" return ScanMetadata._as_mjd(self.end_time) @property def total_dataproducts_size(self: ScanMetadata) -> int: """Get the total size of the data products for the scan.""" total_dataproducts_size = 0 for f in self.scan_files: total_dataproducts_size += f.size return total_dataproducts_size @property def bandwidth_out_hz(self: ScanMetadata) -> float: """Get the output bandwidth in Hertz.""" return u.Quantity(self.bandwidth_out_mhz, unit="MHz").si.value
[docs]@dataclass class PstContext: """ A data class to represent the context field of a pst metadata file. context is meant to be data passed verbatim through from OET/TMC as part of AssignResources (DLM) or Configure (other sub-systems). To be made part of ska_schemas schemas. """ observer: str = "" """Name or role of the person conducting the observation""" intent: str = "Tied-array beam observation" """The intent passed from OET/TMC""" notes: str = "" """The notes passed from OET/TMC"""
[docs]@dataclass class PstConfig: """ A data class to represent the config field of a pst metadata file. Configuration of generating software. """ image: str = CONFIG_IMAGE """The PST image name.""" version: str = VERSION """The version of PST."""
[docs]@dataclass class PstFiles: """ A data class to represent the files field of a PST metadata file. Documentation concerning files coupled to the PST metadata file. """ description: str """The description of the file.""" path: str """The relative path of the file.""" size: int """The size of the file in bytes.""" status: str """The status of the file."""
[docs]@dataclass class PstObsCore: """A dataclass to definition of the standard IVOA ObsCore table/view.""" dataproduct_type: str = "timeseries" """ Logical data product type. Values can be image, cube, spectrum, sed, timeseries, visibility, event or measurements. """ dataproduct_subtype: str = "voltages" """ The subtype of the data product. Values can be voltages, spectra, oversampled, channelised, quantised voltages """ calib_level: int = 0 """ The calibration level. Valid values are 0, 1, 2, 3, or 4. 0 = Raw instrumental data 1 = Instrumental data in a standard format (FITS, VOTable, SDFITS, ASDM, etc.) 2 = Calibrated, science ready data with the instrument signature removed 3 = Enhanced data products like mosaics, resampled or drizzled images, or heavily processed survey fields 4 = Analysis data products generated after some scientific data manipulation or interpretation. """ obs_id: str = "" """The scan id.""" access_estsize: int = 0 """ An estimate of the overall data product size in bytes. This value derived from the recorded files of the scan. """ target_name: str = "" """ The name of the target. For PST this is the SOURCE field. """ s_ra: float = 0.0 """ The centre of observation right ascension, ICRS. This value is in degrees, not hour angle. """ s_dec: float = 0.0 """Centre of observation declination, ICRS.""" t_min: float = 0.0 """Start time in Modified Julian Date (MJD).""" t_max: float = 0.0 """End time in Modified Julian Date (MJD).""" t_resolution: float = 0.0 """ Temporal resolution FWHM (full width at half maximum) in seconds. For PST this the TSAMP converted to seconds. This is the output TSAMP which for Voltage Recorder and Flow Through is the same as the input TSAMP. For Detected filterbank this can be different when there are time averaging or inverse filterbank applied. """ t_exptime: float = 0.0 """ Total exposure time. This the length of the PST scan. """ facility_name: str = "SKA-Observatory" """The observatory or facility used to collect the data.""" instrument_name: str = "" """The name of the instrument used for the acquisition of the observation.""" pol_xel: int = 0 """ Number of polarisation samples. This is the number of output polarisations (i.e. NPOL_OUT), not the input signal NPOL. """ pol_states: str = "" """List of polarisation states.""" em_xel: int = 0 """ Number of elements along the spectral axis. For PST this is the number output channels (e.g. NCHAN_OUT). """ em_unit: str = "Hz" """ Spectral coordinates unit type. The unit used of the values of ``em_min`` and ``em_max``. This defaults to Hz. """ em_min: float = 0.0 """ Start in spectral coordinates (vacuum wavelength). For PST this is the centre frequency of the first PST channel. """ em_max: float = 0.0 """ Stop in spectral coordinates (vacuum wavelength). For PST this is the centre frequency of the last PST channel. """ em_res_power: str = "null" """ Spectral resolving power. For PST this is not used and defaults to null. """ em_resolution: float = 0.0 """ The spectral resolution. For PST this is the width of a PST channel in Hz. """ o_ucd: str = "null" """ Unified Content Descriptor of observable Example of this are phot.count or phot.flux.density see section 4.18 and B.6.4.1 in Obscore standard, UCD1+ controlled vocabulary and especially list of observables), This is not used PST and is set defaults to "null". """
[docs]@dataclass class PstMetadata: """Class representing the PST metadata. This class encapsulates the metadata information for a PST (Processing Science Target) data product. It includes details about the interface, execution block, context, configuration, files, and observation core information. """ interface: str = INTERFACE """The interface of the metadata.""" execution_block: str = "" """The execution block identifier.""" context: PstContext = field(default_factory=PstContext) """The context information for the PST data.""" config: PstConfig = field(default_factory=PstConfig) """The configuration information for the PST data.""" files: list[PstFiles] = field(default_factory=list) """List of files associated with the PST data.""" obscore: PstObsCore = field(default_factory=PstObsCore) """The observation core information for the PST data."""
def generate_metadata( scan_id: str, scan_metadata: ScanMetadata, pst_scan_config: dict, *, file_name: str = DATA_PRODUCT_FILE_NAME, output_dir: pathlib.Path = DEFAULT_OUTPUT_DIR, ) -> None: """Build and write the metadata product.""" _logger.debug("generating metadata") try: telescope_config: TelescopeConfig = pst_scan_config["telescope_config"] eb_id = pst_scan_config["eb_id"] receive_config = generate_recv_configure_scan_request(**pst_scan_config) observer = receive_config["observer"] source: str = receive_config["source"] stt_crd1: str | float = receive_config["stt_crd1"] stt_crd2: str | float = receive_config["stt_crd2"] equinox: str = receive_config["equinox"] intent = f"Tied-array beam observation of {source}" context = PstContext( observer=observer, intent=intent, notes="Unknown", ) try: stt_crd1 = float(stt_crd1) crd1_unit = u.deg except ValueError: crd1_unit = u.hourangle # NOTE: In the future when we support different co-ordinate systems # based on the SKA sky direction (see ADR-63) we may have to change # this and including the reference_frame from the schema all the way # through. sky_coord = SkyCoord( stt_crd1, stt_crd2, equinox=f"J{equinox}", frame="icrs", unit=(crd1_unit, u.deg), ) # NOTE: even though RA is normally measured in hours # SKAO uses degrees when it is a float (see ADR-63) s_ra = float(sky_coord.ra.deg) s_dec = float(sky_coord.dec.deg) t_min = scan_metadata.start_time_mjd t_max = scan_metadata.end_time_mjd t_exptime = scan_metadata.scan_length_secs t_resolution = float(u.Quantity(scan_metadata.tsamp_out, unit="us").si.value) instrument_name = telescope_config.name.replace("SKA", "SKA-") pol_states = scan_metadata.polarisations_out pol_xel = len(pol_states) em_xel = scan_metadata.nchan_out em_unit = "Hz" bw_mhz = scan_metadata.bandwidth_out_mhz freq_mhz = scan_metadata.centre_freq_out_mhz em_min_mhz = freq_mhz - bw_mhz / 2 em_max_mhz = freq_mhz + bw_mhz / 2 em_min_hz = float(u.Quantity(em_min_mhz, unit="MHz").to("Hz").value) em_max_hz = float(u.Quantity(em_max_mhz, unit="MHz").to("Hz").value) em_res_power = "null" # Value of Resolution along the spectral axis". # Not sure about oversampling here... em_resolution = float(u.Quantity(bw_mhz / em_xel, unit="MHz").to("Hz").value) # Unified Content Descriptor of observable, # not really anything suitable for PST. Put phys.polarisation for now. o_ucd = "phys.polarisation" """ TODO: The following are to be populated after confirming their source. dataproduct_type=dataproduct_type, dataproduct_subtype=dataproduct_subtype, calib_level=calib_level, """ obscore = PstObsCore( obs_id=scan_id, access_estsize=scan_metadata.total_dataproducts_size, target_name=source, s_ra=s_ra, s_dec=s_dec, t_min=t_min, t_max=t_max, t_resolution=t_resolution, t_exptime=t_exptime, instrument_name=instrument_name, pol_xel=pol_xel, pol_states=",".join(pol_states), em_xel=em_xel, em_unit=em_unit, em_min=em_min_hz, em_max=em_max_hz, em_res_power=em_res_power, em_resolution=em_resolution, o_ucd=o_ucd, dataproduct_subtype=scan_metadata.output_data_type, ) metadata = PstMetadata( execution_block=eb_id, context=context, files=scan_metadata.scan_files, obscore=obscore, ) _logger.debug(f"metadata: {metadata}") absolute_path = output_dir / file_name with open(absolute_path, "w") as yaml_file: yaml.dump(asdict(metadata), yaml_file) _logger.info(f"PST metadata written at: {absolute_path}") except Exception as e: # Handle exceptions here, for example, log the error _logger.exception(f"An error occurred while building metadata: {e}") _logger.debug("generating metadata done") def get_path_total_filesize(_path: pathlib.Path) -> int: """Return the total size in bytes of all files under the given path.""" absolute_path = _path.resolve() total_size = 0 if not absolute_path.exists(): return total_size # Iterate through all files and directories recursively for item in absolute_path.rglob("*"): if item.is_file() and not item.is_symlink(): total_size += item.stat().st_size return total_size