Source code for ska_pst.send.flow_through_scan

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for managing scans recorded by the PST Through Through processing mode."""

from __future__ import annotations

import datetime as dt
from typing import Any

import astropy.units as u
from overrides import override
from ska_pst.lmc.dsp import generate_dsp_scan_request

from ska_pst.common import DadaFileReader

from .constants import DATA_FILES, INPUT_STATS_FILES, OUTPUT_STATS_FILES, WEIGHTS_FILES
from .dada_file_scan import DadaFileScan
from .file_util import move_files
from .scan import ScanMetadata

__all__ = [
    "FlowThroughScan",
]

FT_FILES_DEFINITIONS: dict = {
    DATA_FILES: "Channelised voltage data raw files",
    WEIGHTS_FILES: "Channelised voltage weights raw files",
    INPUT_STATS_FILES: "Monitoring statistics of the input data stream",
    OUTPUT_STATS_FILES: "Monitoring statistics of the output data stream",
}

FT_METADATA_POLARISATION_MAPPING: dict = {
    "Both": ["POLX", "POLY"],
    "A": ["POLX"],
    "B": ["POLY"],
}


[docs]class FlowThroughScan(DadaFileScan): """Class representing PST Flow Through Data Products for a Scan.""" def __init__( self: FlowThroughScan, **kwargs: Any, ) -> None: """Initialise a Flow Through Scan object.""" super().__init__(**kwargs) self.update_files() @override def _finalise_scan(self: FlowThroughScan) -> None: """ Finalise the scan. For Flow Through mode this is ensuring the scales and offset (scloffs) file is moved to the staging directory. """ for scloffs_file in self.local_scan_path.glob("????-??-??-??:??:??_scloffs.dada"): move_files(scloffs_file, self.staging_scan_path) @override def get_scan_metadata(self: FlowThroughScan) -> ScanMetadata: """Get the metadata for the scan.""" dsp_ft_config = generate_dsp_scan_request(**self.pst_scan_config) ft_polarisations: str = dsp_ft_config["polarisations"] polarisations_out = FT_METADATA_POLARISATION_MAPPING[ft_polarisations] ft_channels: list[int] = dsp_ft_config["channels"] nchan_out = ft_channels[1] - ft_channels[0] + 1 centre_freq_out_mhz: float bandwidth_out_mhz: float utc_start: dt.datetime picoseconds: int resolution_per_sample: int tsamp: float total_data_size = 0 data_files = (self.staging_scan_path / "data").glob("*.dada") for idx, f in enumerate(data_files): dada_file = DadaFileReader(file=f) if idx == 0: utc_start = dt.datetime.fromisoformat(dada_file.utc_start).replace(tzinfo=dt.timezone.utc) centre_freq_out_mhz = float(dada_file.freq) bandwidth_out_mhz = float(dada_file.bw) picoseconds = dada_file.picoseconds resolution_per_sample = dada_file.resolution_per_sample tsamp = dada_file.tsamp total_data_size += dada_file.data_size scan_length_microsecs = ( total_data_size // resolution_per_sample ) * tsamp # pylint: disable=possibly-used-before-assignment scan_length_secs = float(u.Quantity(scan_length_microsecs, unit="us").si.value) scan_files = self._get_scan_files_metadata(files_definitions=FT_FILES_DEFINITIONS) return ScanMetadata( utc_start=utc_start, picoseconds=picoseconds, scan_length_secs=scan_length_secs, tsamp_out=tsamp, centre_freq_out_mhz=centre_freq_out_mhz, bandwidth_out_mhz=bandwidth_out_mhz, nchan_out=nchan_out, polarisations_out=polarisations_out, scan_files=scan_files, output_data_type="voltages", )