# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for managing scans recorded by the PST Through Through processing mode."""
from __future__ import annotations
import datetime as dt
from typing import Any
import astropy.units as u
from overrides import override
from ska_pst.lmc.dsp import generate_dsp_scan_request
from ska_pst.common import DadaFileReader
from .constants import DATA_FILES, INPUT_STATS_FILES, OUTPUT_STATS_FILES, WEIGHTS_FILES
from .dada_file_scan import DadaFileScan
from .file_util import move_files
from .scan import ScanMetadata
__all__ = [
"FlowThroughScan",
]
FT_FILES_DEFINITIONS: dict = {
DATA_FILES: "Channelised voltage data raw files",
WEIGHTS_FILES: "Channelised voltage weights raw files",
INPUT_STATS_FILES: "Monitoring statistics of the input data stream",
OUTPUT_STATS_FILES: "Monitoring statistics of the output data stream",
}
FT_METADATA_POLARISATION_MAPPING: dict = {
"Both": ["POLX", "POLY"],
"A": ["POLX"],
"B": ["POLY"],
}
[docs]class FlowThroughScan(DadaFileScan):
"""Class representing PST Flow Through Data Products for a Scan."""
def __init__(
self: FlowThroughScan,
**kwargs: Any,
) -> None:
"""Initialise a Flow Through Scan object."""
super().__init__(**kwargs)
self.update_files()
@override
def _finalise_scan(self: FlowThroughScan) -> None:
"""
Finalise the scan.
For Flow Through mode this is ensuring the scales and offset (scloffs) file is moved
to the staging directory.
"""
for scloffs_file in self.local_scan_path.glob("????-??-??-??:??:??_scloffs.dada"):
move_files(scloffs_file, self.staging_scan_path)
@override
def get_scan_metadata(self: FlowThroughScan) -> ScanMetadata:
"""Get the metadata for the scan."""
dsp_ft_config = generate_dsp_scan_request(**self.pst_scan_config)
ft_polarisations: str = dsp_ft_config["polarisations"]
polarisations_out = FT_METADATA_POLARISATION_MAPPING[ft_polarisations]
ft_channels: list[int] = dsp_ft_config["channels"]
nchan_out = ft_channels[1] - ft_channels[0] + 1
centre_freq_out_mhz: float
bandwidth_out_mhz: float
utc_start: dt.datetime
picoseconds: int
resolution_per_sample: int
tsamp: float
total_data_size = 0
data_files = (self.staging_scan_path / "data").glob("*.dada")
for idx, f in enumerate(data_files):
dada_file = DadaFileReader(file=f)
if idx == 0:
utc_start = dt.datetime.fromisoformat(dada_file.utc_start).replace(tzinfo=dt.timezone.utc)
centre_freq_out_mhz = float(dada_file.freq)
bandwidth_out_mhz = float(dada_file.bw)
picoseconds = dada_file.picoseconds
resolution_per_sample = dada_file.resolution_per_sample
tsamp = dada_file.tsamp
total_data_size += dada_file.data_size
scan_length_microsecs = (
total_data_size // resolution_per_sample
) * tsamp # pylint: disable=possibly-used-before-assignment
scan_length_secs = float(u.Quantity(scan_length_microsecs, unit="us").si.value)
scan_files = self._get_scan_files_metadata(files_definitions=FT_FILES_DEFINITIONS)
return ScanMetadata(
utc_start=utc_start,
picoseconds=picoseconds,
scan_length_secs=scan_length_secs,
tsamp_out=tsamp,
centre_freq_out_mhz=centre_freq_out_mhz,
bandwidth_out_mhz=bandwidth_out_mhz,
nchan_out=nchan_out,
polarisations_out=polarisations_out,
scan_files=scan_files,
output_data_type="voltages",
)