ska_pst.send

Module init code.

class ska_pst.send.DetectedFilterbankScan(*args: Any, **kwargs: Any)[source]

Class representing PST Detected Filterbank Products for a Scan.

get_all_local_files() list[ska_pst.send.dsp_output_file.DspOutputFile][source]

Return a list of all data and config files.

Returns

list of all pertinent files for a scan

Return type

list[DspOutputFile]

get_scan_metadata() ScanMetadata

Get the metadata for the scan.

This method uses the generated PSRFITS files to extract metadata that is needed to create an instance of ScanMetadata.

Returns

the metadata for the scan.

Return type

ScanMetadata

process_file(data_file: DspOutputFile) None[source]

Process the next PSRFITS file.

This will move the file to the staging directory.

Parameters

data_file (DspOutputFile) – next file to process

process_next_file() bool

Process the next unprocessed file if one exists.

Returns

True if a file was processed else False

Return type

bool

update_files() None[source]

Update the data files.

class ska_pst.send.DlmTransfer(ctx: ProcessingContext, file_event_timeout: float = 10.0, scan_manager: Optional[ScanManager] = None, logger: Optional[Logger] = None, verbose: bool = False)[source]

Class to manage the main execution loop of the PST to DLM transfer.

interrupt_processing() None[source]

Interrupt the processing and transferring of the scan.

process() None[source]

Primary processing method for the PST to DLM transfer.

class ska_pst.send.DspOutputFile(file_name: Path, data_product_path: Path)[source]

Provides representation for PST DSP data, weights and and control files.

exists() bool[source]

Return true if the file exists.

Returns

flag indicating the file_name exists on the file system.

Return type

bool

property file_number: int

The file number of the voltage recorder file, or 0 in not applicable.

The file number is used for sequencing the order of files to process. :return: file number or rank :rtype: int

property file_size: int

The size of the voltage recorder file in bytes.

Returns

size of the file_name in bytes.

Return type

int

property relative_path: Path

The relative path to the data_product_path.

Returns

relative path to the data_product_path

Return type

pathlib.Path

class ska_pst.send.FlowThroughScan(*args: Any, **kwargs: Any)[source]

Class representing PST Flow Through Data Products for a Scan.

get_scan_metadata() ScanMetadata

Get the metadata for the scan.

class ska_pst.send.ProcessingContext(subsystem_id: str, telescope_config: TelescopeConfig, local_path: Path, staging_path: Path, dlm_path: Path, stop_processing_evt: Event, scan_timeout: float = 300.0)[source]

The class used to manage passing around the processing context of the SEND DLM process.

This class captures common properties that are used between the .dlm_transfer.DlmTransfer, .scan_manager.ScanManager and .scan.Scan classes. This class also has a threading.Event used to signal when processing should stop, before processing a scan or a scan file the context should be checked to see if processing can proceeded by doing the following:

if ctx.processing_stopped:
    return
dlm_path: Path

The path to where SEND should put completed scan directories for the DLM agent to process.

local_path: Path

The path of where PST processing will write files to that need to be processed by SEND.

property processing_stopped: bool

Check if processing has been requested to stop.

Returns

whether the stop_processing_evt has been set or not.

Return type

bool

scan_timeout: float = 300.0

The time out, in seconds, to mark a scan as being inactive.

staging_path: Path

The path that SEND uses to stage files ready to be moved to the DLM path.

stop_processing() None[source]

Set the stop_processing_evt to being set.

Calling this method will mean any clients waiting for the event to be set will be notified. After being set the processing_stopped property will return True.

stop_processing_evt: Event

The threading event used to signal that processing should stop.

subsystem_id: str

The ID of the SKA subsystem used in the path of the generated scan output files.

For PST the only valid values are pst-low and pst-mid.

telescope_config: TelescopeConfig

The telescope that SEND is deployed in.

This allows getting specific telescope configuration and for converting a CSP-LMC scan configuration into a PST internal representation.

wait_for(timeout: Optional[float] = None) bool[source]

Wait for the stop_processing_evt to be set or until a timeout has been reached.

This is a utility method to make the code cleaner to wait for the threading event to have been set.

while not ctx.wait_for(timeout=1000):
    # perform something

versus

while not ctx.stop_processing_evt.wait(timeout=1000):
    # perform something
Parameters

timeout (float | None, optional) – how long to wait in seconds before returning false, defaults to None

Returns

whether the threading.Event has been set or not.

Return type

bool

class ska_pst.send.PstConfig(image: str = 'artefact.skao.int/ska-pst/ska-pst', version: str = '2.2.0')[source]

A data class to represent the config field of a pst metadata file.

Configuration of generating software.

image: str = 'artefact.skao.int/ska-pst/ska-pst'

The PST image name.

version: str = '2.2.0'

The version of PST.

class ska_pst.send.PstContext(observer: str = '', intent: str = 'Tied-array beam observation', notes: str = '')[source]

A data class to represent the context field of a pst metadata file.

context is meant to be data passed verbatim through from OET/TMC as part of AssignResources (DLM) or Configure (other sub-systems). To be made part of ska_schemas schemas.

intent: str = 'Tied-array beam observation'

The intent passed from OET/TMC

notes: str = ''

The notes passed from OET/TMC

observer: str = ''

Name or role of the person conducting the observation

class ska_pst.send.PstFiles(description: str, path: str, size: int, status: str)[source]

A data class to represent the files field of a PST metadata file.

Documentation concerning files coupled to the PST metadata file.

description: str

The description of the file.

path: str

The relative path of the file.

size: int

The size of the file in bytes.

status: str

The status of the file.

class ska_pst.send.PstMetadata(interface: str = 'http://schema.skao.int/ska-data-product-meta/0.1', execution_block: str = '', context: ~ska_pst.send.metadata.PstContext = <factory>, config: ~ska_pst.send.metadata.PstConfig = <factory>, files: list[ska_pst.send.metadata.PstFiles] = <factory>, obscore: ~ska_pst.send.metadata.PstObsCore = <factory>)[source]

Class representing the PST metadata.

This class encapsulates the metadata information for a PST (Processing Science Target) data product. It includes details about the interface, execution block, context, configuration, files, and observation core information.

config: PstConfig

The configuration information for the PST data.

context: PstContext

The context information for the PST data.

execution_block: str = ''

The execution block identifier.

files: list[ska_pst.send.metadata.PstFiles]

List of files associated with the PST data.

interface: str = 'http://schema.skao.int/ska-data-product-meta/0.1'

The interface of the metadata.

obscore: PstObsCore

The observation core information for the PST data.

class ska_pst.send.PstObsCore(dataproduct_type: str = 'timeseries', dataproduct_subtype: str = 'voltages', calib_level: int = 0, obs_id: str = '', access_estsize: int = 0, target_name: str = '', s_ra: float = 0.0, s_dec: float = 0.0, t_min: float = 0.0, t_max: float = 0.0, t_resolution: float = 0.0, t_exptime: float = 0.0, facility_name: str = 'SKA-Observatory', instrument_name: str = '', pol_xel: int = 0, pol_states: str = '', em_xel: int = 0, em_unit: str = 'Hz', em_min: float = 0.0, em_max: float = 0.0, em_res_power: str = 'null', em_resolution: float = 0.0, o_ucd: str = 'null')[source]

A dataclass to definition of the standard IVOA ObsCore table/view.

access_estsize: int = 0

An estimate of the overall data product size in bytes.

This value derived from the recorded files of the scan.

calib_level: int = 0

The calibration level.

Valid values are 0, 1, 2, 3, or 4.

0 = Raw instrumental data 1 = Instrumental data in a standard format (FITS, VOTable, SDFITS, ASDM, etc.) 2 = Calibrated, science ready data with the instrument signature removed 3 = Enhanced data products like mosaics, resampled or drizzled images, or heavily processed survey fields 4 = Analysis data products generated after some scientific data manipulation or interpretation.

dataproduct_subtype: str = 'voltages'

The subtype of the data product.

Values can be voltages, spectra, oversampled, channelised, quantised voltages

dataproduct_type: str = 'timeseries'

Logical data product type.

Values can be image, cube, spectrum, sed, timeseries, visibility, event or measurements.

em_max: float = 0.0

Stop in spectral coordinates (vacuum wavelength).

For PST this is the centre frequency of the last PST channel.

em_min: float = 0.0

Start in spectral coordinates (vacuum wavelength).

For PST this is the centre frequency of the first PST channel.

em_res_power: str = 'null'

Spectral resolving power.

For PST this is not used and defaults to null.

em_resolution: float = 0.0

The spectral resolution.

For PST this is the width of a PST channel in Hz.

em_unit: str = 'Hz'

Spectral coordinates unit type.

The unit used of the values of em_min and em_max.

This defaults to Hz.

em_xel: int = 0

Number of elements along the spectral axis.

For PST this is the number output channels (e.g. NCHAN_OUT).

facility_name: str = 'SKA-Observatory'

The observatory or facility used to collect the data.

instrument_name: str = ''

The name of the instrument used for the acquisition of the observation.

o_ucd: str = 'null'

Unified Content Descriptor of observable

Example of this are phot.count or phot.flux.density see section 4.18 and B.6.4.1 in Obscore standard, UCD1+ controlled vocabulary and especially list of observables),

This is not used PST and is set defaults to “null”.

obs_id: str = ''

The scan id.

pol_states: str = ''

List of polarisation states.

pol_xel: int = 0

Number of polarisation samples.

This is the number of output polarisations (i.e. NPOL_OUT), not the input signal NPOL.

s_dec: float = 0.0

Centre of observation declination, ICRS.

s_ra: float = 0.0

The centre of observation right ascension, ICRS.

This value is in degrees, not hour angle.

t_exptime: float = 0.0

Total exposure time.

This the length of the PST scan.

t_max: float = 0.0

End time in Modified Julian Date (MJD).

t_min: float = 0.0

Start time in Modified Julian Date (MJD).

t_resolution: float = 0.0

Temporal resolution FWHM (full width at half maximum) in seconds.

For PST this the TSAMP converted to seconds. This is the output TSAMP which for Voltage Recorder and Flow Through is the same as the input TSAMP. For Detected filterbank this can be different when there are time averaging or inverse filterbank applied.

target_name: str = ''

The name of the target.

For PST this is the SOURCE field.

class ska_pst.send.Scan(*args: Any, **kwargs: Any)[source]

Base class for representing PST Scan Data Products, stored on the local file system.

property age: float

Return the age of the scan in seconds.

Returns

difference between the current time and the modified time in seconds

Return type

float

can_finalise() bool

Return true if the Scan can be finalised.

A scan is ready to be finalised if a scan completed file exists and all files have been processed.

Returns

flag indicating if the scan can be finalised.

Return type

bool

compare_modified(first: Scan, second: Scan) int

Compare two scan objects by modified time to allow for sorting.

This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties.

As the scan modified time can be updated this comparator should not be used to sort dictionaries.

Parameters
  • first – in the A < B comparison, this parameter is A

  • second – in the A < B comparison, this parameter is B

property data_product_file: Path

Return the pathlib object of the metadata file.

Returns

pathlib object concerning the metadata file

Return type

pathlib.Path

data_product_file_exists() bool

Return true if the ska-data-product.yaml file exists.

Returns

flag indicating the data product file exists

Return type

bool

delete_scan() None

Delete all the local data files associated with a scan.

property dlm_output_path: Path

Get the DLM output path for the scan.

Returns

the DLM output path for the scan.

Return type

pathlib.Path

finalise() None

Finalise the scan if it is marked as completed and has no unprocessed files.

See can_finalise() for details about if a Scan can be finalised.

The method will call the _finalise_scan that subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have the processing_failed property marked as True which will result in the scan as being marked as invalid and won’t be processed any further.

After calling _finalise_scan this will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have the transfer_failed property marked as True which will result in the scan as being marked as invalid and won’t be processed any further.

property flattened_scan_path: str

Return the flattened relative scan path from the execution block, subsystem and scan IDs.

Returns

flattened relative scan path

Return type

str

force_completion() None

Create the scan_completed file for the scan, if it does not exist.

generate_data_product_file() None

Generate the ska-data-product.yaml file.

abstract get_scan_metadata() ScanMetadata[source]

Get the metadata for the scan.

This is specific for the type of a Scan. This allows for the scan type to expect the values from the PST scan config, such as the number of output channels or polarisations.

get_stat_filename(data_filename: Path) Path

Return the expected filename of the HDF statistics file for a data filename.

Parameters

data_filename (pathlib.Path) – filename of the data file from which to infer the stat filename.

Returns

stat filename corresponding to the data filename.

Return type

pathlib.Path

abstract have_files_to_process() bool[source]

Return true if there are Scan files to process.

Returns

True if the Scan has unprocessed files

Return type

bool

property invalid_output_path: Path

Get the output path where an invalid processing would move files to.

If there is an error while processing a scan all the files will be moved to $STAGING_PATH/invalid/<flatten_scan_path>. This way the files are not lost but the scan will not attempt to be reprocessed.

Returns

the output path where an invalid processing would move files to.

Return type

pathlib.Path

is_active(max_scan_age: float) bool

Return true if the scan is active.

An scan is considered active if the time since it’s last modification is less than the maximum scan age.

Parameters

max_scan_age (float) – maximum age of a scan before it is considered inactive

Returns

flag indicating if the scan is valid

Return type

bool

is_complete() bool

Return true if the scan_completed file exists.

Returns

flag indicating the scan recording is complete

Return type

bool

is_recording() bool

Return true is the scan been not yet been marked as completed.

Returns

flag indicating if the scan is currently recording

Return type

bool

is_valid() bool

Get whether the scan is still valid or not.

A valid scan matches the following conditions:
  • file processing hasn’t failed

  • file transfer hasn’t failed

  • the file directory still exists

Returns

flag indicating if the scan is valid

Return type

bool

property modified_time_secs: float

Get last modified time in seconds.

move_input_stats_to_staging() None[source]

Move the input-stats files from local scan path to the staging scan input-stats path.

path_exists() bool

Get whether the full path to scan exists or not.

process() None

Process the current scan.

This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready.

If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan.

Parameters

ctx (ProcessingContext) – the processing context to check if processing should proceed or not.

abstract process_next_file() bool[source]

Process the next unprocessed file if one exists.

Returns

True if a file was processed else False

Return type

bool

scan_config_file_exists() bool

Return true if the scan-config.json file exists.

Returns

flag indicating the scan config file exists

Return type

bool

property staging_scan_path: Path

Get the staging path for the scan.

Returns

the staging path for the scan

Return type

pathlib.Path

update_modified_time() None

Update the last time the scan was processed with the current timestamp.

class ska_pst.send.ScanManager(*, ctx: ProcessingContext, logger: Optional[Logger] = None)[source]

Class that manages the processing of recorded scans.

property active_scans: List[Scan]

Get scans that have been updated within the last scan_timeout.

property inactive_scans: List[Scan]

Get scans that have been not updated within the last scan_timeout.

property relative_scan_paths: Set[Path]

Return a current set of the relative scan paths stored in the local_path.

Returns

the list of relative scan paths.

Return type

List[pathlib.Path].

property scan_paths: Set[Path]

Return a set of the current full scan paths stored in the local_path.

The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id.

Returns

the list of full scan paths.

Return type

List[pathlib.Path].

class ska_pst.send.ScanMetadata(*, utc_start: datetime, picoseconds: int, scan_length_secs: float, tsamp_out: float, centre_freq_out_mhz: float, bandwidth_out_mhz: float, nchan_out: int, polarisations_out: list[str], scan_files: list[ska_pst.send.metadata.PstFiles], output_data_type: str)[source]

Metadata relating to a scan.

This is a PST internal representation of that metadata that is used in the generation of the SKA data product metadata file.

Different scan types will need to be able to extract this information from the scan configuration and/or output files.

property bandwidth_out_hz: float

Get the output bandwidth in Hertz.

bandwidth_out_mhz: float

The bandwidth of the output data, in MHz.

centre_freq_out_mhz: float

The centre frequency of the output data, in MHz.

property end_time: datetime

Get the scan’s end time.

This is defined as the start_time + scan_length_secs.

property end_time_mjd: float

Get the scan’s end time as a Modified Julian date (MJD) value.

nchan_out: int

The number of output channels.

output_data_type: str

The output data type as defined in the solution intent for a given scan type.

This should be “voltages” for voltage recorder and flow through modes and for detected filter bank it should be “spectra”.

picoseconds: int

The fractional seconds when the scan starts in picoseconds.

In PST the fractional seconds of a scan’s start time is stored as picoseconds in a separate value from the utc_start value.

polarisations_out: list[str]

The polarisations in the output data.

scan_files: list[ska_pst.send.metadata.PstFiles]

The output files for the scan.

The property total_file_size uses this list to get the overall size of the scan products.

scan_length_secs: float

The length of a scan in seconds.

This value can capture fractional seconds.

property start_time: datetime

Get the scan’s start time.

This returns the UTC_START + PICOSECONDS as a Python’s datetime object.

property start_time_mjd: float

Get the scan’s start time as a Modified Julian date (MJD) value.

property total_dataproducts_size: int

Get the total size of the data products for the scan.

tsamp_out: float

The temporal resolution in microseconds.

utc_start: datetime

The UTC_START of the scan.

This value must have the UTC timezone set rather than being a naive datetime instance.

In PST this value has second resolution and no fractional seconds, that is captured by the picoseconds value.

class ska_pst.send.VoltageRecorderScan(*args: Any, **kwargs: Any)[source]

Class representing PST Voltage Recorder Data Products for a Scan.

get_scan_metadata() ScanMetadata

Get the metadata for the scan.

ska_pst.send.create_scan(*, ctx: ProcessingContext, relative_scan_path: Path, logger: Optional[Logger] = None, **kwargs: Any) ska_pst.send.scan.Scan | None[source]

Construct a Scan sub-class from the processing/observing mode scan_configuration.json file.

Parameters
  • ctx (ProcessingContext) – the processing context for the DLM transfer

  • relative_scan_path (pathlib.Path) – the path of the scan, relative to the ctx.local_path.

Returns

Subclass of Scan that matches the PST processing mode

Return type

Scan | None

ska_pst.send.dlm_transfer_main(arg_list: Optional[list[str]] = None) None

Parse command line arguments and execute the main processing loop.

Parameters

arg_list (list[str] | None, optional) – a list of arguments, defaults to None. Python’s argparse will use this list if set else it uses the command line arguments. Setting this list is used in unit testing.

ska_pst.send.move_files(input_path: Path, output_path: Path) None[source]

Move file(s) from input to output location.

This utility method will move file(s) from the input path to the output path.

If the input path is a directory, then the output path must be a directory.

If the input is a file and the output path is directory then the file will be moved into the output directory.

If the input is a directory then this method will attempt to move the directory but if the output directory already exists it will move the individual files to the output path and then delete the input path directory.

If the input path doesn’t exist then this will log a warning that it doesn’t exist rather than raising an error.

Parameters
  • input_path (pathlib.Path) – a file or directory that needs to be moved to the output path

  • output_path (pathlib.Path) – a file or directory that the input file(s) will be moved to

Throws AssertionError

raised when input_path is a directory, output_path exists but is not a directory.