Source code for ska_pst.testutils.verification.metadata_verifier
# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for verification metadata in output products."""
from __future__ import annotations
import json
import logging
import pathlib
from typing import Any, Callable, List, Protocol, Tuple, cast
import pandas as pd
import yaml
from ska_control_model import PstProcessingMode
from ska_pst.common.constants import SKA_PST_LOW_SUBSYSTEM_ID, SKA_PST_MID_SUBSYSTEM_ID
from ska_pst.send.scan import DATA_PRODUCT_FILE_NAME
from ska_pydada import DadaFile
from ska_pst.common import convert_csp_config_to_pst_config, get_telescope_config
from .assertions import DADA_VALUE_ASSERTIONS, assert_header_value
from .mapping import DADA_HEADER_CONVERTER_MAPPING, ValueMapping, _safe_get_key_val
BASE_DLM_PATH: pathlib.Path = pathlib.Path("/mnt/pst/dlm")
[docs]class Metadata:
"""
A class to provide a Pandas data frame to view all the different metadata.
Instances of this are created by passing a scan configuration, or the
location of a JSON file of the configuration, the scan id and the option
location of where to find files. Access to the dataframe is via the
property :py:attr:`dataframe`.
.. code-block:: python
viewer = Metadata(
scan_configuration="/mnt/pst/dlm/product/eb-j354-20240212-11115_pst-low_998/scan_configuration.json",
scan_id=998,
file_mount="/mnt/pst/dlm"
)
df = viewer.dataframe
"""
def __init__(
self: Metadata,
scan_configuration: dict | str | pathlib.Path,
scan_id: int,
file_mount: pathlib.Path | str = BASE_DLM_PATH,
scan_path: pathlib.Path | None = None,
logger: logging.Logger | None = None,
) -> None:
"""Initialise the metadata viewer.
When creating an instance of the viewer you can pass a dictionary or JSON
string of the scan configuration, or a path to a JSON file from a scan.
:param scan_configuration: the scan configuration as a dict or JSON string,
or the location of where the viewer could find a JSON file.
:type scan_configuration: dict | str | pathlib.Path
:param scan_id: the scan ID of what scan to use.
:type scan_id: int
:param file_mount: the location of files, defaults to pathlib.Path("/mnt/pst/dlm")
:type file_mount: pathlib.Path | str, optional
:param scan_path: the location where the scan files are, defaults to None.
This is used in testing to over the calculated scan path.
:type scan_path: pathlib.Path | str, optional
:param logger: the logger to use for logging, default None.
:type logger: logging.Logger | None, optional
"""
self._logger = logger or logging.getLogger(__name__)
if isinstance(scan_configuration, str):
if pathlib.Path(scan_configuration).exists():
scan_configuration = pathlib.Path(scan_configuration)
else:
scan_configuration = json.loads(scan_configuration)
if isinstance(scan_configuration, pathlib.Path):
with open(scan_configuration, "r") as f:
scan_configuration = json.load(f)
assert isinstance(
scan_configuration, dict
), "Scan configuration should be a dictionary or a JSON string"
# need to get the frequency band config for telescope
frequency_band: str = cast(dict, scan_configuration["common"]).get("frequency_band", "low")
if frequency_band == "low":
self.telescope_config = get_telescope_config("SKALow")
else:
self.telescope_config = get_telescope_config("SKAMid")
self._scan_configuration = convert_csp_config_to_pst_config(
telescope_config=self.telescope_config, csp_configure_scan_request=scan_configuration
)
self._scan_configuration["scan_id"] = scan_id
self.frequency_band_config = self.telescope_config.frequency_bands[frequency_band]
self.pst_processing_mode = self._scan_configuration["pst_processing_mode"]
self.scan_id = scan_id
self.eb_id = str(self._scan_configuration["eb_id"])
self.file_mount = pathlib.Path(file_mount)
scan_path = scan_path or self.file_mount / "product" / f"{self.eb_id}_{self.subsystem_id}_{scan_id}"
assert scan_path.exists(), f"expected files to be found at {scan_path.absolute()}"
self.scan_path = scan_path
# load value mappings from YAML file
with open(pathlib.Path(__file__).parent / "mapping.yaml", "r") as f:
mapping_yaml: dict = yaml.safe_load(f)
self._value_mappings = {
k: ValueMapping(**v)
# hack to ensure we only include FT values when we need to
for k, v in mapping_yaml.items()
if self.pst_processing_mode == PstProcessingMode.FLOW_THROUGH
or not cast(str, k).startswith("ft/")
}
self._dataframe: pd.DataFrame | None = None
self._metadata: dict | None = None
self._data_files: List[DadaFile] | None = None
self._weights_files: List[DadaFile] | None = None
@property
def subsystem_id(self: Metadata) -> str:
"""Get the subsystem_id that the scan configuration is for."""
if self._scan_configuration["frequency_band"] == "low":
return SKA_PST_LOW_SUBSYSTEM_ID
else:
return SKA_PST_MID_SUBSYSTEM_ID
@property
def metadata_file_path(self: Metadata) -> pathlib.Path:
"""Get the file path of the DSP metadata for the file."""
return self.scan_path / DATA_PRODUCT_FILE_NAME
@property
def data_files(self: Metadata) -> List[DadaFile]:
"""Get all the data files for the scan."""
if self._data_files is None:
self._data_files = [
DadaFile.load_from_file(f, header_only=False)
for f in sorted(self.scan_path.glob("data/*.dada"))
]
return self._data_files
@property
def weights_files(self: Metadata) -> List[DadaFile]:
"""Get all the weights for the scan."""
if self._weights_files is None:
self._weights_files = [
DadaFile.load_from_file(f, header_only=False)
for f in sorted(self.scan_path.glob("weights/*.dada"))
]
return self._weights_files
@property
def metadata(self: Metadata) -> dict:
"""Get the metadata file as a dict."""
if self._metadata is None:
if self.metadata_file_path.exists():
self._metadata = yaml.safe_load(self.metadata_file_path.read_text())
else:
self._metadata = {}
return self._metadata
def _create_dataframe(self: Metadata) -> None:
def _datasource_mappings() -> List[Tuple[str, dict, Callable[[ValueMapping, dict], Any]]]:
data_file_mapping = [
(f"Data File {f.file_number}", f.header, ValueMapping.file_value) for f in self.data_files
]
weights_file_mapping = [
(f"Weights File {f.file_number}", f.header, ValueMapping.file_value)
for f in self.weights_files
]
files_mapping = (2 * len(data_file_mapping)) * [None]
files_mapping[0::2] = data_file_mapping # type: ignore
files_mapping[1::2] = weights_file_mapping # type: ignore
return [
("Scan Configuration", self._scan_configuration, ValueMapping.config_value),
("DLM Metadata", self.metadata, ValueMapping.metadata_value),
*files_mapping, # type: ignore
]
data = {
"Keys": self._value_mappings.keys(),
**{
col_name: [attr_fn(vm, config) for vm in self._value_mappings.values()]
for col_name, config, attr_fn in _datasource_mappings()
},
}
self._dataframe = df = pd.DataFrame(data=data)
df.fillna(value="", inplace=True)
@property
def dataframe(self: Metadata) -> pd.DataFrame:
"""Get all the scan's metadata as a Pandas dataframe."""
if self._dataframe is None:
self._create_dataframe()
# this is now not None
return self._dataframe # type: ignore
[docs] def config_key_for_header_key(self: Metadata, header_key: str) -> Tuple[str | None, str | None]:
"""Get the config key given a header key."""
for mapping in self._value_mappings.values():
if mapping.file_key == header_key:
return (mapping.config_key, mapping.alternate_config_key)
return (None, None)
[docs] def config_value(self: Metadata, key: str, alternate_key: str | None = None) -> Any | None:
"""Get a config value for a given key."""
try:
[key, *keys] = key.split("/")
value = None
if key in self._scan_configuration:
value = self._scan_configuration[key]
for k in keys:
value = _safe_get_key_val(value, k)
if value is None and alternate_key is not None:
value = self.config_value(alternate_key)
return value
except KeyError:
if alternate_key is not None:
return self.config_value(alternate_key)
raise
[docs]class MetadataVerifier:
"""Class that can be used to verify the metadata of a scan is correct.
The following is an example of how to use this class.
.. code-block:: python
# create an instance of a verifier
metadata_verifier = MetadataVerifier(
scan_configuration="/mnt/pst/dlm/product/eb-j354-20240212-11115_pst-low_998/scan_configuration.json",
scan_id=998,
)
# perform a verification
try:
metadata_verifier.verify()
except AssertionError as e:
# handle error
print(e)
# get a Pandas data frame of the metadata.
# If in a notebook then the dataframe can be displayed as a HTML table
df = metadata_verifier.dataframe
"""
def __init__(
self: MetadataVerifier,
scan_configuration: dict | str | pathlib.Path,
scan_id: int,
file_mount: pathlib.Path | str = BASE_DLM_PATH,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""Initialise an instance of the metadata verifier.
:param scan_configuration: the scan configuration to use to verify the metadata against.
:type scan_configuration: dict | str
:param scan_id: the scan id to use to verify the metadata against.
:type scan_id: int
:param file_mount: the mount path to verify files against, defaults to pathlib.Path("/mnt/pst/dlm")
:type file_mount: pathlib.Path | str, optional
:param logger: the logger to use for logging, default None.
:type logger: logging.Logger | None, optional
"""
self._logger = logger or logging.getLogger(__name__)
self._metadata = Metadata(
scan_configuration=scan_configuration,
scan_id=scan_id,
file_mount=file_mount,
logger=self._logger,
**kwargs,
)
@property
def dataframe(self: MetadataVerifier) -> pd.DataFrame:
"""Get all the scan's metadata as a Pandas dataframe."""
return self._metadata.dataframe
[docs] def verify(self: MetadataVerifier) -> None:
"""Verify the consistency of the metadata across data products.
This method will find all the files for the scan, including the
metadata file and then compare them. If there are any inconsistencies
an :py:class:`AssertionError` will be raised.
:raises: AssertionError
"""
self._logger.debug("Verifying metadata")
errors: List[str] = []
mode_verifier: ProcessingModeVerifier
# delegate to a strategy for verifying files
if self._metadata.pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER:
mode_verifier = VoltageRecorderVerifier(logger=self._logger)
else:
raise ValueError("Unsupported processing mode")
mode_verifier.verify(metadata=self._metadata, errors=errors)
self._logger.debug(f"Verifier found {len(errors)} errors.")
if len(errors) > 0:
error_msg = "\n".join(errors)
raise AssertionError(error_msg)
def __getattr__(self: MetadataVerifier, attr: str) -> Any:
"""Get attribute from scan configuration."""
return getattr(self._metadata, attr)
[docs]class ProcessingModeVerifier(Protocol):
"""
A Python protocol that abstracts over the verification of processing mode files.
Classes don't have to extend from this directly but must implement the :py:meth:`verify`
method. The implementation should assert against the given configuration and
files that are associated with the processing mode.
"""
[docs] def verify(self: ProcessingModeVerifier, metadata: Metadata, errors: List[str], **kwargs: Any) -> None:
"""Perform verification for the given processing mode.
:param metadata: the metadata loaded for the given scan
:type metadata: Metadata
:param errors: the list of errors to put any validation errors into.
:type errors: List[str]
"""
[docs]class VoltageRecorderVerifier:
"""A verifier for voltage recorder processing mode."""
def __init__(self: VoltageRecorderVerifier, logger: logging.Logger | None = None) -> None:
"""Initialise a voltage recorder verifier.
:param logger: the logger to use when logging output, defaults to None
:type logger: logging.Logger | None, optional
"""
self._logger = logger or logging.getLogger(__name__)
def _verify_header(
self: VoltageRecorderVerifier,
file: DadaFile,
metadata: Metadata,
errors: List[str],
is_weights: bool,
**kwargs: Any,
) -> None:
self._logger.debug(f"Verifying file {file.file.name}")
for header_key, header_value in file.header.items():
try:
if header_key in DADA_VALUE_ASSERTIONS:
value_assertion = DADA_VALUE_ASSERTIONS[header_key]
value_assertion(
file=file,
header=file.header,
header_key=header_key,
header_value=header_value,
scan_config=metadata._scan_configuration,
is_weights=is_weights,
cbf_pst_config=metadata.frequency_band_config.cbf_pst_config,
logger=self._logger,
)
else:
config_key, alternate_config_key = metadata.config_key_for_header_key(header_key)
if config_key:
value = DADA_HEADER_CONVERTER_MAPPING.get(header_key, str)(header_value)
expected_value = metadata.config_value(config_key, alternate_config_key)
if expected_value is None:
continue
assert_header_value(
file=file,
header=file.header,
header_key=header_key,
header_value=value,
expected_value=expected_value,
scan_config=metadata._scan_configuration,
is_weights=is_weights,
logger=self._logger,
cbf_pst_config=metadata.frequency_band_config.cbf_pst_config,
)
except AssertionError as e:
errors.append(str(e))
[docs] def verify(self: VoltageRecorderVerifier, metadata: Metadata, errors: List[str], **kwargs: Any) -> None:
"""Perform verification for voltage recording mode.
:param metadata: the metadata loaded for the given scan
:type metadata: Metadata
:param errors: the list of errors to put any validation errors into.
:type errors: List[str]
"""
for file in metadata.data_files:
self._verify_header(
file=file,
metadata=metadata,
errors=errors,
is_weights=False,
frequency_band_config=metadata.frequency_band_config,
**kwargs,
)
for file in metadata.weights_files:
self._verify_header(
file=file,
metadata=metadata,
errors=errors,
is_weights=True,
frequency_band_config=metadata.frequency_band_config,
**kwargs,
)