Source code for ska_pst.send.metadata_builder
# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for writing YAML meta data files."""
from __future__ import annotations
__all__ = [
"MetaDataBuilder",
]
import logging
import pathlib
import tempfile
from dataclasses import asdict
from datetime import datetime, timedelta
from decimal import Decimal
import yaml
from astropy import units as u
from astropy.coordinates import SkyCoord
from ska_control_model import PstProcessingMode
from .constants import DATA_PRODUCT_FILE_NAME
from .dada_file import DadaFileManager
from .metadata import PstFiles, PstMetaData, PstObsCore
DEFAULT_OUTPUT_DIR: pathlib.Path = pathlib.Path(tempfile.gettempdir())
INTERFACE: str = "http://schema.skao.int/ska-data-product-meta/0.1"
CONFIG_IMAGE: str = "artefact.skao.int/ska-pst/ska-pst"
CONFIG_VERSION: str = "1.3.1"
DATAPRODUCT_DEFINITIONS: dict = {
"data": "Channelised voltage data raw files",
"weights": "Channelised voltage weights raw files",
"input-stats": "Monitoring statistics of the input data stream",
"output-stats": "Monitoring statistics of the output data stream",
}
[docs]class MetaDataBuilder:
"""Class used for building metadata files."""
def __init__(
self: MetaDataBuilder,
output_dir: pathlib.Path = DEFAULT_OUTPUT_DIR,
dada_file_manager: DadaFileManager | None = None,
pst_processing_mode: PstProcessingMode = PstProcessingMode.VOLTAGE_RECORDER,
logger: logging.Logger | None = None,
) -> None:
"""
Create instance of PST metadata object.
:param output_dir: the output directory to write metadata file to, defaults to DEFAULT_OUTPUT_DIR
:type output_dir: pathlib.Path, optional
:param dada_file_manager: an instance of a ``DataFileManager`` that helps reading DADA files,
defaults to None
:type dada_file_manager: DadaFileManager | None, optional
:param pst_processing_mode: the PST processing mode used for the current scan,
defaults to PstProcessingMode.VOLTAGE_RECORDER
:type pst_processing_mode: PstProcessingMode, optional
:param logger: the logger to use for logging, defaults to None
:type logger: logging.Logger | None, optional
"""
self.logger = logger or logging.getLogger(__name__)
self._output_dir = output_dir
self._dada_file_manager = dada_file_manager or DadaFileManager(folder=self._output_dir, logger=logger)
self._pst_metadata = PstMetaData(interface=INTERFACE)
self.processing_mode = pst_processing_mode
[docs] def generate_metadata(self: MetaDataBuilder) -> None:
"""Build and write the metadata product."""
self.logger.debug("generating metadata")
self._build_metadata()
self.write_metadata()
self.logger.debug("generating metadata done")
def _build_metadata(self: MetaDataBuilder) -> None:
"""Build the PstMetaData object."""
try:
assert (
self._dada_file_manager is not None
), "Expected _dada_file_manager to have been initialised."
assert len(self._dada_file_manager.data_files) > 0, "Expected at least 1 data file"
assert len(self._dada_file_manager.weights_files) > 0, "Expected at least 1 weights file"
self._pst_metadata.execution_block = self._dada_file_manager.data_files[0].eb_id
self._build_context()
self._build_config()
self._build_files()
self._build_obscore()
self.logger.debug(f"pst_metadata: {self.pst_metadata}")
except Exception as e:
# Handle exceptions here, for example, log the error
self.logger.error(f"An error occurred while building metadata: {str(e)}")
def _build_context(self: MetaDataBuilder) -> None:
"""Populate Fields used for PstContext."""
self._pst_metadata.context.observer = self._dada_file_manager.data_files[0].observer
self._pst_metadata.context.intent = self._dada_file_manager.data_files[0].intent
self._pst_metadata.context.notes = self._dada_file_manager.data_files[0].notes
def _build_config(self: MetaDataBuilder) -> None:
"""Build PstConfig. Placeholder for replacing defaults."""
self._pst_metadata.config.image = CONFIG_IMAGE
self._pst_metadata.config.version = CONFIG_VERSION
def _build_files(self: MetaDataBuilder) -> None:
"""Build PstFiles used for file block in metadata file.."""
for file_type, description in DATAPRODUCT_DEFINITIONS.items():
try:
total_size = self.get_total_filesize(file_type)
if total_size > 0:
self._pst_metadata.files.append(
PstFiles(
description=description,
path=file_type,
size=total_size,
status="done",
)
)
except Exception as e:
self.logger.error(f"Error in obtaining {file_type} files: {e}")
[docs] def convert_utc_to_mjd(
self: MetaDataBuilder,
utc_datetime: str | datetime,
datetime_format: str = "%Y-%m-%d-%H:%M:%S",
) -> float:
"""Convert datetime UTC format to MJD."""
if isinstance(utc_datetime, str):
date_object = datetime.strptime(utc_datetime, datetime_format)
else:
date_object = utc_datetime
# Calculate Julian Date (including fractional part)
days_since_2000_01_01 = (date_object - datetime(2000, 1, 1)).total_seconds() / (24 * 3600)
jd = days_since_2000_01_01 + 2451544.5
# Calculate Modified Julian Date (MJD)
mjd = jd - 2400000.5
# Round to the desired accuracy
rounded_mjd = round(mjd, 10)
return float(Decimal(rounded_mjd))
def _get_scan_length(self: MetaDataBuilder) -> float:
# The DADA files don't explicitly contain a header to know how long
# in seconds the file is for, but this can be determined by working
# out how many samples are in the file from resolution per sample and
# the size of the file and then multiply by TSAMP
scan_len_usec = 0.0
for f in self._dada_file_manager.data_files:
scan_len_usec += (f.data_size // f.resolution_per_sample) * f.tsamp
return u.Quantity(scan_len_usec, unit=u.microsecond).to(u.s).value
def _build_obscore(self: MetaDataBuilder) -> None:
"""Build PstObsCore used for obscore block in metadata file."""
# Grab fields from header file
first_file = self._dada_file_manager.data_files[0]
utc_start = first_file.utc_start
scan_id = first_file.scan_id
tsamp = float(first_file.tsamp)
npol = int(first_file.npol)
nchan = int(first_file.nchan)
freq = float(first_file.freq)
bw = float(first_file.bw)
stt_crd1: str | float = first_file.stt_crd1
stt_crd2: str | float = first_file.stt_crd2
target_name = first_file.source
try:
stt_crd1 = float(stt_crd1)
crd1_unit = u.deg
except ValueError:
crd1_unit = u.hourangle
# NOTE: In the future when we support different co-ordinate systems
# based on the SKA sky direction (see ADR-63) we may have to change
# this and including the reference_frame from the schema all the way
# through.
sky_coord = SkyCoord(
stt_crd1,
stt_crd2,
equinox=first_file.sky_coord_equinox,
frame="icrs",
unit=(crd1_unit, u.deg),
)
# NOTE: even though RA is normally measured in hours
# SKAO uses degrees when it is a float (see ADR-63)
s_ra = float(sky_coord.ra.deg)
s_dec = float(sky_coord.dec.deg)
t_min = self.convert_utc_to_mjd(utc_start)
datetime_t_min = datetime.strptime(utc_start, "%Y-%m-%d-%H:%M:%S")
# Convert microseconds to seconds
t_resolution = float(u.Quantity(tsamp, unit=u.microsecond).to(u.s).value)
total_dataproducts_size = 0
for pst_files in self._pst_metadata.files:
total_dataproducts_size += pst_files.size
scan_len = self._get_scan_length()
access_estsize = total_dataproducts_size
datetime_with_seconds_added = datetime_t_min + timedelta(seconds=scan_len)
t_max = self.convert_utc_to_mjd(datetime_with_seconds_added)
# This is effectively the observation length
t_exptime = float(scan_len)
instrument_name = first_file.telescope.upper().replace("SKA", "SKA-")
pol_xel = npol
pol_states = "null"
if self.processing_mode == PstProcessingMode.VOLTAGE_RECORDER:
pol_states = "POLA,POLB"
elif self.processing_mode == PstProcessingMode.FLOW_THROUGH:
poln_ft = first_file.poln_ft
if poln_ft == "A":
pol_states = "POLA"
if poln_ft == "B":
pol_states = "POLA"
if poln_ft == "Both":
if pol_xel != 2:
self.logger.warning(
f"Expected npol={npol} to be 2 when poln_ft {poln_ft}==Both, but it is {pol_xel}"
)
else:
pol_states = "POLA,POLB"
em_xel = nchan
em_unit = "Hz"
em_min = (freq - (bw / 2)) * 1e6
em_max = (freq + (bw / 2)) * 1e6
em_res_power = "null"
# Value of Resolution along the spectral axis".
# Not sure about oversampling here...
em_resolution = (bw / nchan) * 1e6
# Unified Content Descriptor of observable,
# not really anything suitable for PST. Put phys.polarisation for now.
o_ucd = "phys.polarisation"
"""
TODO: The following are to be populated after confirming their source.
dataproduct_type=dataproduct_type,
dataproduct_subtype=dataproduct_subtype,
calib_level=calib_level,
"""
obscore = PstObsCore(
obs_id=scan_id,
access_estsize=access_estsize,
target_name=target_name,
s_ra=s_ra,
s_dec=s_dec,
t_min=t_min,
t_max=t_max,
t_resolution=t_resolution,
t_exptime=t_exptime,
instrument_name=instrument_name,
pol_xel=pol_xel,
pol_states=pol_states,
em_xel=em_xel,
em_unit=em_unit,
em_min=em_min,
em_max=em_max,
em_res_power=em_res_power,
em_resolution=em_resolution,
o_ucd=o_ucd,
)
self._pst_metadata.obscore = obscore
[docs] def write_metadata(self: MetaDataBuilder, file_name: str = DATA_PRODUCT_FILE_NAME) -> None:
"""Write YAML object to a YAML file."""
absolute_path = self._output_dir / file_name
with open(absolute_path, "w") as yaml_file:
yaml.dump(asdict(self.pst_metadata), yaml_file)
self.logger.info(f"PST metadata written at: {absolute_path}")
[docs] def get_total_filesize(self: MetaDataBuilder, _path: str) -> int:
"""Return the total size in bytes of all files under the given path."""
absolute_path = (self._output_dir / _path).resolve()
total_size = 0
if not absolute_path.exists():
return total_size
# Iterate through all files and directories recursively
for item in absolute_path.rglob("*"):
if item.is_file() and not item.is_symlink():
total_size += item.stat().st_size
return total_size
@property
def output_dir(self: MetaDataBuilder) -> pathlib.Path:
"""Get the output directory that files are written to."""
return self._output_dir
@property
def pst_metadata(self: MetaDataBuilder) -> PstMetaData:
"""Get the PST metadata."""
return self._pst_metadata