Source code for ska_pst.send.metadata_builder

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for writing YAML meta data files."""
from __future__ import annotations

__all__ = [
    "MetaDataBuilder",
]

import logging
import pathlib
import tempfile
from dataclasses import asdict
from datetime import datetime, timedelta
from decimal import Decimal

import yaml
from astropy import units as u
from astropy.coordinates import SkyCoord
from ska_control_model import PstProcessingMode

from .constants import DATA_PRODUCT_FILE_NAME
from .dada_file import DadaFileManager
from .metadata import PstFiles, PstMetaData, PstObsCore

DEFAULT_OUTPUT_DIR: pathlib.Path = pathlib.Path(tempfile.gettempdir())
INTERFACE: str = "http://schema.skao.int/ska-data-product-meta/0.1"
CONFIG_IMAGE: str = "artefact.skao.int/ska-pst/ska-pst"
CONFIG_VERSION: str = "1.3.1"
DATAPRODUCT_DEFINITIONS: dict = {
    "data": "Channelised voltage data raw files",
    "weights": "Channelised voltage weights raw files",
    "input-stats": "Monitoring statistics of the input data stream",
    "output-stats": "Monitoring statistics of the output data stream",
}


[docs]class MetaDataBuilder: """Class used for building metadata files.""" def __init__( self: MetaDataBuilder, output_dir: pathlib.Path = DEFAULT_OUTPUT_DIR, dada_file_manager: DadaFileManager | None = None, pst_processing_mode: PstProcessingMode = PstProcessingMode.VOLTAGE_RECORDER, logger: logging.Logger | None = None, ) -> None: """ Create instance of PST metadata object. :param output_dir: the output directory to write metadata file to, defaults to DEFAULT_OUTPUT_DIR :type output_dir: pathlib.Path, optional :param dada_file_manager: an instance of a ``DataFileManager`` that helps reading DADA files, defaults to None :type dada_file_manager: DadaFileManager | None, optional :param pst_processing_mode: the PST processing mode used for the current scan, defaults to PstProcessingMode.VOLTAGE_RECORDER :type pst_processing_mode: PstProcessingMode, optional :param logger: the logger to use for logging, defaults to None :type logger: logging.Logger | None, optional """ self.logger = logger or logging.getLogger(__name__) self._output_dir = output_dir self._dada_file_manager = dada_file_manager or DadaFileManager(folder=self._output_dir, logger=logger) self._pst_metadata = PstMetaData(interface=INTERFACE) self.processing_mode = pst_processing_mode
[docs] def generate_metadata(self: MetaDataBuilder) -> None: """Build and write the metadata product.""" self.logger.debug("generating metadata") self._build_metadata() self.write_metadata() self.logger.debug("generating metadata done")
def _build_metadata(self: MetaDataBuilder) -> None: """Build the PstMetaData object.""" try: assert ( self._dada_file_manager is not None ), "Expected _dada_file_manager to have been initialised." assert len(self._dada_file_manager.data_files) > 0, "Expected at least 1 data file" assert len(self._dada_file_manager.weights_files) > 0, "Expected at least 1 weights file" self._pst_metadata.execution_block = self._dada_file_manager.data_files[0].eb_id self._build_context() self._build_config() self._build_files() self._build_obscore() self.logger.debug(f"pst_metadata: {self.pst_metadata}") except Exception as e: # Handle exceptions here, for example, log the error self.logger.error(f"An error occurred while building metadata: {str(e)}") def _build_context(self: MetaDataBuilder) -> None: """Populate Fields used for PstContext.""" self._pst_metadata.context.observer = self._dada_file_manager.data_files[0].observer self._pst_metadata.context.intent = self._dada_file_manager.data_files[0].intent self._pst_metadata.context.notes = self._dada_file_manager.data_files[0].notes def _build_config(self: MetaDataBuilder) -> None: """Build PstConfig. Placeholder for replacing defaults.""" self._pst_metadata.config.image = CONFIG_IMAGE self._pst_metadata.config.version = CONFIG_VERSION def _build_files(self: MetaDataBuilder) -> None: """Build PstFiles used for file block in metadata file..""" for file_type, description in DATAPRODUCT_DEFINITIONS.items(): try: total_size = self.get_total_filesize(file_type) if total_size > 0: self._pst_metadata.files.append( PstFiles( description=description, path=file_type, size=total_size, status="done", ) ) except Exception as e: self.logger.error(f"Error in obtaining {file_type} files: {e}")
[docs] def convert_utc_to_mjd( self: MetaDataBuilder, utc_datetime: str | datetime, datetime_format: str = "%Y-%m-%d-%H:%M:%S", ) -> float: """Convert datetime UTC format to MJD.""" if isinstance(utc_datetime, str): date_object = datetime.strptime(utc_datetime, datetime_format) else: date_object = utc_datetime # Calculate Julian Date (including fractional part) days_since_2000_01_01 = (date_object - datetime(2000, 1, 1)).total_seconds() / (24 * 3600) jd = days_since_2000_01_01 + 2451544.5 # Calculate Modified Julian Date (MJD) mjd = jd - 2400000.5 # Round to the desired accuracy rounded_mjd = round(mjd, 10) return float(Decimal(rounded_mjd))
def _get_scan_length(self: MetaDataBuilder) -> float: # The DADA files don't explicitly contain a header to know how long # in seconds the file is for, but this can be determined by working # out how many samples are in the file from resolution per sample and # the size of the file and then multiply by TSAMP scan_len_usec = 0.0 for f in self._dada_file_manager.data_files: scan_len_usec += (f.data_size // f.resolution_per_sample) * f.tsamp return u.Quantity(scan_len_usec, unit=u.microsecond).to(u.s).value def _build_obscore(self: MetaDataBuilder) -> None: """Build PstObsCore used for obscore block in metadata file.""" # Grab fields from header file first_file = self._dada_file_manager.data_files[0] utc_start = first_file.utc_start scan_id = first_file.scan_id tsamp = float(first_file.tsamp) npol = int(first_file.npol) nchan = int(first_file.nchan) freq = float(first_file.freq) bw = float(first_file.bw) stt_crd1: str | float = first_file.stt_crd1 stt_crd2: str | float = first_file.stt_crd2 target_name = first_file.source try: stt_crd1 = float(stt_crd1) crd1_unit = u.deg except ValueError: crd1_unit = u.hourangle # NOTE: In the future when we support different co-ordinate systems # based on the SKA sky direction (see ADR-63) we may have to change # this and including the reference_frame from the schema all the way # through. sky_coord = SkyCoord( stt_crd1, stt_crd2, equinox=first_file.sky_coord_equinox, frame="icrs", unit=(crd1_unit, u.deg), ) # NOTE: even though RA is normally measured in hours # SKAO uses degrees when it is a float (see ADR-63) s_ra = float(sky_coord.ra.deg) s_dec = float(sky_coord.dec.deg) t_min = self.convert_utc_to_mjd(utc_start) datetime_t_min = datetime.strptime(utc_start, "%Y-%m-%d-%H:%M:%S") # Convert microseconds to seconds t_resolution = float(u.Quantity(tsamp, unit=u.microsecond).to(u.s).value) total_dataproducts_size = 0 for pst_files in self._pst_metadata.files: total_dataproducts_size += pst_files.size scan_len = self._get_scan_length() access_estsize = total_dataproducts_size datetime_with_seconds_added = datetime_t_min + timedelta(seconds=scan_len) t_max = self.convert_utc_to_mjd(datetime_with_seconds_added) # This is effectively the observation length t_exptime = float(scan_len) instrument_name = first_file.telescope.upper().replace("SKA", "SKA-") pol_xel = npol pol_states = "null" if self.processing_mode == PstProcessingMode.VOLTAGE_RECORDER: pol_states = "POLA,POLB" elif self.processing_mode == PstProcessingMode.FLOW_THROUGH: poln_ft = first_file.poln_ft if poln_ft == "A": pol_states = "POLA" if poln_ft == "B": pol_states = "POLA" if poln_ft == "Both": if pol_xel != 2: self.logger.warning( f"Expected npol={npol} to be 2 when poln_ft {poln_ft}==Both, but it is {pol_xel}" ) else: pol_states = "POLA,POLB" em_xel = nchan em_unit = "Hz" em_min = (freq - (bw / 2)) * 1e6 em_max = (freq + (bw / 2)) * 1e6 em_res_power = "null" # Value of Resolution along the spectral axis". # Not sure about oversampling here... em_resolution = (bw / nchan) * 1e6 # Unified Content Descriptor of observable, # not really anything suitable for PST. Put phys.polarisation for now. o_ucd = "phys.polarisation" """ TODO: The following are to be populated after confirming their source. dataproduct_type=dataproduct_type, dataproduct_subtype=dataproduct_subtype, calib_level=calib_level, """ obscore = PstObsCore( obs_id=scan_id, access_estsize=access_estsize, target_name=target_name, s_ra=s_ra, s_dec=s_dec, t_min=t_min, t_max=t_max, t_resolution=t_resolution, t_exptime=t_exptime, instrument_name=instrument_name, pol_xel=pol_xel, pol_states=pol_states, em_xel=em_xel, em_unit=em_unit, em_min=em_min, em_max=em_max, em_res_power=em_res_power, em_resolution=em_resolution, o_ucd=o_ucd, ) self._pst_metadata.obscore = obscore
[docs] def write_metadata(self: MetaDataBuilder, file_name: str = DATA_PRODUCT_FILE_NAME) -> None: """Write YAML object to a YAML file.""" absolute_path = self._output_dir / file_name with open(absolute_path, "w") as yaml_file: yaml.dump(asdict(self.pst_metadata), yaml_file) self.logger.info(f"PST metadata written at: {absolute_path}")
[docs] def get_total_filesize(self: MetaDataBuilder, _path: str) -> int: """Return the total size in bytes of all files under the given path.""" absolute_path = (self._output_dir / _path).resolve() total_size = 0 if not absolute_path.exists(): return total_size # Iterate through all files and directories recursively for item in absolute_path.rglob("*"): if item.is_file() and not item.is_symlink(): total_size += item.stat().st_size return total_size
@property def output_dir(self: MetaDataBuilder) -> pathlib.Path: """Get the output directory that files are written to.""" return self._output_dir @property def pst_metadata(self: MetaDataBuilder) -> PstMetaData: """Get the PST metadata.""" return self._pst_metadata