ska_pst.send

Module init code.

class ska_pst.send.DadaFileManager(folder: Path, logger: Optional[Logger] = None)[source]

Class that captures PST data files.

Parses attributes from a set of PST voltage recorder data and weights files and computes some of the derived quantities from the scales and weights, such as the inferred number of dropped/invalid packets.

property data_files: List[DadaFileReader]

Get list of DadaFileReader objects.

property weights_files: List[WeightsFileReader]

Get list of WeightsFileReader objects.

class ska_pst.send.DadaFileReader(file: Path, logger: Optional[Logger] = None)[source]

Class that can be used to read a PSR DADA file.

property bw: str

Get the BW value from header.

property data_size: int

Get the size of the data.

property eb_id: str

Get the EB_ID value from the header.

property equinox: str

Get the EQUINOX value from header.

property file_number: int

Get the FILE_NUMBER value from header.

property file_path: str

Get path of file in string.

property file_size: int

Get size of file in bytes.

property freq: str

Get the FREQ value from header.

property header: Dict[str, str]

Get header for file.

property intent: str

Build value using SOURCE from header.

property nbit: int

Get the number of bits the data is encoded in.

property nchan: int

Get the NCHAN value from header.

property ndim: int

Get the number of dimensions (2=complex, 1=real).

property notes: str

Get the NOTES value from header.

property npol: int

Get the NPOL value from header.

property obs_offset: int

Get the OBS_OFFSET value.

property observer: str

Get the OBSERVER value from header.

property poln_ft: str

Get the POLN_FT value from header.

property resolution: int

Get the RESOLUTION value from header.

property resolution_per_sample: int

The amount of bytes needed for one time sample of all channels, polarisations and bits.

Note that this may be different to the resolution property as that value may include a factor UDP_NSAMP in the resultant value.

property scan_id: str

Get the SCAN_ID value from header.

property sky_coord_equinox: str

Get the EQUINOX value from header in the format of J<value>.

property source: str

Get the SOURCE value from header.

property stt_crd1: str

Get the STT_CRD1 value from header.

property stt_crd2: str

Get the STT_CRD2 value from header.

property telescope: str

Get the TELESCOPE value from header.

property tsamp: float

Get the TSAMP value from header.

property udp_nsamp: int

Get the UDP_NSAMP value from header.

property utc_start: str

Get the UTC_START value from header.

class ska_pst.send.DlmTransfer(ctx: ProcessingContext, file_event_timeout: float = 10.0, scan_manager: Optional[ScanManager] = None, logger: Optional[Logger] = None, verbose: bool = False)[source]

Class to manage the main execution loop of the PST to DLM transfer.

interrupt_processing() None[source]

Interrupt the processing and transferring of the scan.

process() None[source]

Primary processing method for the PST to DLM transfer.

class ska_pst.send.DspOutputFile(file_name: Path, data_product_path: Path)[source]

Provides representation for PST DSP data, weights and and control files.

exists() bool[source]

Return true if the file exists.

Returns

flag indicating the file_name exists on the file system.

Return type

bool

property file_number: int

The file number of the voltage recorder file, or 0 in not applicable.

The file number is used for sequencing the order of files to process. :return: file number or rank :rtype: int

property file_size: int

The size of the voltage recorder file in bytes.

Returns

size of the file_name in bytes.

Return type

int

property relative_path: Path

The relative path to the data_product_path.

Returns

relative path to the data_product_path

Return type

pathlib.Path

class ska_pst.send.FlowThroughScan(*args: Any, **kwargs: Any)[source]

Class representing PST Flow Through Data Products for a Scan.

class ska_pst.send.MetaDataBuilder(output_dir: Path = PosixPath('/tmp'), dada_file_manager: Optional[DadaFileManager] = None, pst_processing_mode: ska_control_model.PstProcessingMode = ska_control_model.PstProcessingMode.VOLTAGE_RECORDER, logger: Optional[Logger] = None)[source]

Class used for building metadata files.

convert_utc_to_mjd(utc_datetime: str | datetime.datetime, datetime_format: str = '%Y-%m-%d-%H:%M:%S') float[source]

Convert datetime UTC format to MJD.

generate_metadata() None[source]

Build and write the metadata product.

get_total_filesize(_path: str) int[source]

Return the total size in bytes of all files under the given path.

property output_dir: Path

Get the output directory that files are written to.

property pst_metadata: PstMetaData

Get the PST metadata.

write_metadata(file_name: str = 'ska-data-product.yaml') None[source]

Write YAML object to a YAML file.

class ska_pst.send.ProcessingContext(subsystem_id: str, local_path: Path, staging_path: Path, dlm_path: Path, stop_processing_evt: Event, scan_timeout: float = 300.0)[source]

The class used to manage passing around the processing context of the SEND DLM process.

This class captures common properties that are used between the .dlm_transfer.DlmTransfer, .scan_manager.ScanManager and .scan.Scan classes. This class also has a threading.Event used to signal when processing should stop, before processing a scan or a scan file the context should be checked to see if processing can proceeded by doing the following:

if ctx.processing_stopped:
    return
dlm_path: Path

The path to where SEND should put completed scan directories for the DLM agent to process.

local_path: Path

The path of where PST processing will write files to that need to be processed by SEND.

property processing_stopped: bool

Check if processing has been requested to stop.

Returns

whether the stop_processing_evt has been set or not.

Return type

bool

scan_timeout: float = 300.0

The time out, in seconds, to mark a scan as being inactive.

staging_path: Path

The path that SEND uses to stage files ready to be moved to the DLM path.

stop_processing() None[source]

Set the stop_processing_evt to being set.

Calling this method will mean any clients waiting for the event to be set will be notified. After being set the processing_stopped property will return True.

stop_processing_evt: Event

The threading event used to signal that processing should stop.

subsystem_id: str

The ID of the SKA subsystem used in the path of the generated scan output files.

For PST the only valid values are pst-low and pst-mid.

wait_for(timeout: Optional[float] = None) bool[source]

Wait for the stop_processing_evt to be set or until a timeout has been reached.

This is a utility method to make the code cleaner to wait for the threading event to have been set.

while not ctx.wait_for(timeout=1000):
    # perform something

versus

while not ctx.stop_processing_evt.wait(timeout=1000):
    # perform something
Parameters

timeout (float | None, optional) – how long to wait in seconds before returning false, defaults to None

Returns

whether the threading.Event has been set or not.

Return type

bool

class ska_pst.send.Scan(*args: Any, **kwargs: Any)[source]

Base class for representing PST Scan Data Products, stored on the local file system.

property age: float

Return the age of the scan in seconds.

Returns

difference between the current time and the modified time in seconds

Return type

float

can_finalise() bool

Return true if the Scan can be finalised.

A scan is ready to be finalised if a scan completed file exists and all files have been processed.

Returns

flag indicating if the scan can be finalised.

Return type

bool

compare_modified(first: Scan, second: Scan) int

Compare two scan objects by modified time to allow for sorting.

This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties.

As the scan modified time can be updated this comparator should not be used to sort dictionaries.

Parameters
  • first – in the A < B comparison, this parameter is A

  • second – in the A < B comparison, this parameter is B

create_scan(*, ctx: ProcessingContext, relative_scan_path: Path, logger: logging.Logger | None = None, **kwargs: Any) ska_pst.send.scan.Scan | None

Construct a Scan sub-class from the processing/observing mode scan_configuration.json file.

Parameters
  • ctx (ProcessingContext) – the processing context for the DLM transfer

  • relative_scan_path (pathlib.Path) – the path of the scan, relative to the ctx.local_path.

Returns

Subclass of Scan that matches the PST processing mode

Return type

Scan | None

property data_product_file: Path

Return the pathlib object of the metadata file.

Returns

pathlib object concerning the metadata file

Return type

pathlib.Path

data_product_file_exists() bool

Return true if the ska-data-product.yaml file exists.

Returns

flag indicating the data product file exists

Return type

bool

delete_scan() None

Delete all the local data files associated with a scan.

property dlm_output_path: Path

Get the DLM output path for the scan.

Returns

the DLM output path for the scan.

Return type

pathlib.Path

finalise() None

Finalise the scan if it is marked as completed and has no unprocessed files.

See can_finalise() for details about if a Scan can be finalised.

The method will call the _finalise_scan that subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have the processing_failed property marked as True which will result in the scan as being marked as invalid and won’t be processed any further.

After calling _finalise_scan this will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have the transfer_failed property marked as True which will result in the scan as being marked as invalid and won’t be processed any further.

property flattened_scan_path: str

Return the flattened relative scan path from the execution block, subsystem and scan IDs.

Returns

flattened relative scan path

Return type

str

force_completion() None

Create the scan_completed file for the scan, if it does not exist.

generate_data_product_file() None

Generate the ska-data-product.yaml file.

get_all_files() List[DspOutputFile][source]

Return a list of all data, weights and control files.

Returns

list of all pertinent files for a scan

Return type

List[DspOutputFile]

get_stat_filename(data_filename: Path) Path

Return the expected filename of the HDF statistics file for a data filename.

Parameters

data_filename (pathlib.Path) – filename of the data file from which to infer the stat filename.

Returns

stat filename corresponding to the data filename.

Return type

pathlib.Path

have_files_to_process() bool

Return true if there are Scan files to process.

Returns

True if the Scan has unprocessed files

Return type

bool

property invalid_output_path: Path

Get the output path where an invalid processing would move files to.

If there is an error while processing a scan all the files will be moved to $STAGING_PATH/invalid/<flatten_scan_path>. This way the files are not lost but the scan will not attempt to be reprocessed.

Returns

the output path where an invalid processing would move files to.

Return type

pathlib.Path

is_active(max_scan_age: float) bool

Return true if the scan is active.

An scan is considered active if the time since it’s last modification is less than the maximum scan age.

Parameters

max_scan_age (float) – maximum age of a scan before it is considered inactive

Returns

flag indicating if the scan is valid

Return type

bool

is_complete() bool

Return true if the scan_completed file exists.

Returns

flag indicating the scan recording is complete

Return type

bool

is_recording() bool

Return true is the scan been not yet been marked as completed.

Returns

flag indicating if the scan is currently recording

Return type

bool

is_valid() bool

Get whether the the scan is still valid or not.

A valid scan matches the following conditions:
  • file processing hasn’t failed

  • file transfer hasn’t failed

  • the file directory still exists

Returns

flag indicating if the scan is valid

Return type

bool

property modified_time_secs: float

Get last modified time in seconds.

move_input_stats_to_staging() None[source]

Move the input-stats files from local scan path to the staging scan input-stats path.

next_unprocessed_file() Optional[Tuple[DspOutputFile, DspOutputFile]][source]

Return data and weights files that has not yet been staged.

In the future some processing of the data, weights and scloffs may be performed.

Returns

data and weights `*.dada` files

Return type

Tuple[DspOutputFile, DspOutputFile]

path_exists() bool

Get whether the full path to scan exists or not.

process() None

Process the current scan.

This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready.

If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan.

Parameters

ctx (ProcessingContext) – the processing context to check if processing should proceed or not.

process_file(unprocessed_file: Tuple[DspOutputFile, DspOutputFile]) None

Process the pair of data and weights files to generate a stat file.

Parameters

unprocessed_file (Tuple[DspOutputFile, DspOutputFile]) – next file to process

scan_config_file_exists() bool

Return true if the scan-config.json file exists.

Returns

flag indicating the scan config file exists

Return type

bool

property staging_scan_path: Path

Get the staging path for the scan.

Returns

the staging path for the scan

Return type

pathlib.Path

update_files() None[source]

Check the file system for new data and weights files from VR or FT scans.

update_modified_time() None

Update the last time the scan was processed with the current timestamp.

class ska_pst.send.ScanManager(*, ctx: ProcessingContext, logger: Optional[Logger] = None)[source]

Class that manages the processing of recorded scans.

property active_scans: List[Scan]

Get scans that have been updated within the last scan_timeout.

property inactive_scans: List[Scan]

Get scans that have been not updated within the last scan_timeout.

property relative_scan_paths: Set[Path]

Return a current set of the relative scan paths stored in the local_path.

Returns

the list of relative scan paths.

Return type

List[pathlib.Path].

property scan_paths: Set[Path]

Return a set of the current full scan paths stored in the local_path.

The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id.

Returns

the list of full scan paths.

Return type

List[pathlib.Path].

class ska_pst.send.VoltageRecorderScan(*args: Any, **kwargs: Any)[source]

Class representing PST Voltage Recorder Data Products for a Scan.

class ska_pst.send.WeightsFileReader(file: Path, logger: Optional[Logger] = None)[source]

Class used to read Weights PSRDADA generated by ska_pst_dsp_disk.

property nsamp_per_weight: int

Get the number of samples the weights are used for.

property packet_offset: int

Get the package offset for current file.

This converts the obs_offset a packet offset by dividing the value by the weights_packet_stride. This will assert that the obs_offset is a multiple of weights_packet_stride

property packet_scales_size: int

Get the packet scales size.

property packet_weights_size: int

Get the packet weights size.

ska_pst.send.dlm_transfer_main(arg_list: Optional[List[str]] = None) None

Parse command line arguments and execute the main processing loop.

ska_pst.send.move_files(input_path: Path, output_path: Path) None[source]

Move file(s) from input to output location.

This utility method will move file(s) from the input path to the output path.

If the input path is a directory, then the output path must be a directory.

If the input is a file and the output path is directory then the file will be moved into the output directory.

If the input is a directory then this method will attempt to move the directory but if the output directory already exists it will move the individual files to the output path and then delete the input path directory.

If the input path doesn’t exist then this will log a warning that it doesn’t exist rather than raising an error.

Parameters
  • input_path (pathlib.Path) – a file or directory that needs to be moved to the output path

  • output_path (pathlib.Path) – a file or directory that the input file(s) will be moved to

Throws AssertionError

raised when input_path is a directory, output_path exists but is not a directory.