ska_pst.send

Module init code.

class ska_pst.send.DpdApiClient(endpoint: str, logger: Optional[Logger] = None)[source]

Class used for interacting with the DataProduct Dashboard API.

property endpoint: str

Getter method for the API endpoint.

metadata_exists(search_value: str) bool[source]

Check if metadata with a given search value exists.

This method sends a GET request to the API endpoint for searching data products and checks if metadata with the specified search value exists.

Returns

True if metadata exists, False otherwise.

Return type

bool

reindex_dataproducts() None[source]

Execute a cURL API call to reindex data products.

This method sends a POST request to the API endpoint for reindexing data products.

class ska_pst.send.MetaDataBuilder(dsp_mount_path: Optional[Path] = None, dada_file_manager: Optional[DadaFileManager] = None, logger: Optional[Logger] = None)[source]

Class used for building metadata files.

convert_utc_to_mjd(utc_datetime: str, datetime_format: str = '%Y-%m-%d-%H:%M:%S') float[source]

Convert datetime UTC format to MJD.

property dsp_mount_path: Path

Getter public method.

dsp_mount_path as a property of MetaDataBuilder.

generate_metadata() None[source]

Build and write the metadata product.

init_dada_file_manager() None[source]

Initialise DadaFileManager object.

Called upon by the main python application after the marker file is written by ska-pst-dsp.

property pst_metadata: PstMetaData

Getter public method.

PstMetaData as a property of MetaDataBuilder.

write_metadata(file_name: str = 'ska-data-product.yaml') None[source]

Write YAML object to a YAML file.

class ska_pst.send.Scan(data_product_path: Path, relative_scan_path: Path, logger: Optional[Logger] = None)[source]

Base class for representing PST Scan Data Products, stored on the local file system.

property data_product_file: Path

Return the pathlib object of the metadata file.

Returns

pathlib object concerning the metadata file

Rtype pathlib.Path

data_product_file_exists() bool[source]

Return true if the ska-pst-dataproduct.yaml file exists.

Returns

flag indicating the data product file exists

Return type

bool

delete_scan() None[source]

Delete all the local data files associated with a scan.

is_complete() bool[source]

Return true if the scan_completed file exists.

Returns

flag indicating the scan recording is complete

Return type

bool

is_recording() bool[source]

Return true is the scan been not yet been marked as completed.

Returns

flag indicating if the scan is currently recording

Return type

bool

path_exists() bool[source]

Get whether the full path to scan exists or not.

scan_config_file_exists() bool[source]

Return true if the scan-config.json file exists.

Returns

flag indicating the scan config file exists

Return type

bool

class ska_pst.send.ScanManager(data_product_path: Path, subsystem_id: str, scan_timeout: float = 300, logger: Optional[Logger] = None)[source]

Class that managers the processing of recorded scans.

property active_scans: List[VoltageRecorderScan]

Get scans that have been updated within the last scan_timeout.

next_unprocessed_scan() ska_pst.send.voltage_recorder_scan.VoltageRecorderScan | None[source]

Return the next unprocessed scan stored in the data_product_path.

Returns

the older scan currently stored in the data product path, or None if empty

Return type

VoltageRecorderScan | None

property relative_scan_paths: List[Path]

Return a current list of the relative scan paths stored in the data_product_path.

Returns

the list of relative scan paths.

Return type

List[pathlib.Path].

property scan_paths: List[Path]

Return a list of the current full scan paths stored in the data_product_path.

The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id.

Returns

the list of full scan paths.

Return type

List[pathlib.Path].

class ska_pst.send.ScanProcess(scan: VoltageRecorderScan, exit_cond: Condition, loop_wait: float = 2, minimum_age: float = 10, logger: Optional[Logger] = None)[source]

Thread to asynchronously generate PST data product files for transfer to remote storage.

run() None[source]

Perform processing of scan files.

class ska_pst.send.ScanTransfer(local_scan: VoltageRecorderScan, remote_scan: VoltageRecorderScan, exit_cond: Condition, loop_wait: float = 2, dir_perms: int = 511, minimum_age: float = 10, logger: Optional[Logger] = None)[source]

Thread to asynchronously transfer PST data product files to remote storage.

run() None[source]

Run the transfer for the Scan from local to remote.

untransferred_files(minimum_age: float) List[VoltageRecorderFile][source]

Return the list of untransferred files for the scan.

Parameters

minimum_age – minimum file age to use when returning untransferred files

Returns

the list of voltage recorder files

Return type

List[VoltageRecorderFile].

class ska_pst.send.SdpTransfer(local_path: Path, remote_path: Path, ska_subsystem: str, data_product_dashboard: str, scan_timeout: float, verbose: bool = False)[source]

Class to manage the main execution loop of the PST to SDP transfer.

property dpd_api_client: Optional[DpdApiClient]

Returns DpdApiClient or None if data_product_dashboard is ‘disabled’.

interrrupt_processing() None[source]

Interrupt the processing and transferring of the scan.

process() None[source]

Primary processing method for the PST to SDP transfer.

class ska_pst.send.VoltageRecorderFile(file_name: Path, data_product_path: Path)[source]

Provides representation for PST voltage recorder data and control files.

property age: float

Return the number of seconds since the file was last modified, or -1 if the file does not exist.

Returns

age of the file in seconds

Return type

int

exists() bool[source]

Return true if the file exists.

Returns

flag indicating the file_name exists on the file system.

Return type

bool

property file_number: int

The file number of the voltage recorder file, or 0 in not applicable.

The file number is used for sequencing the order of files to process. :return: file number or rank :rtype: int

property file_size: int

The size of the voltage recorder file in bytes.

Returns

size of the file_name in bytes.

Return type

int

property relative_path: Path

The relative path to the data_product_path.

Returns

relative path to the data_product_path

Return type

pathlib.Path

class ska_pst.send.VoltageRecorderScan(data_product_path: Path, relative_scan_path: Path, logger: Optional[Logger] = None)[source]

Class representing PST Voltage Recoder Data Products for a Scan.

static compare_modified(first: VoltageRecorderScan, second: VoltageRecorderScan) int[source]

Compare two scan objects by modified time to allow for sorting.

This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties.

As the modified time can be updated on scans the use of this comparator is should not be used to sort dictionaries.

Parameters
  • first – in the A < B comparison, this parameter is A

  • second – in the A < B comparison, this parameter is B

generate_data_product_file() None[source]

Generate the ska-data-product.yaml file.

get_all_files() List[VoltageRecorderFile][source]

Return a list of all data, weights, stats and control files.

Returns

list of all pertitent files for a scan

Return type

List[VoltageRecorderFile]

is_valid() bool[source]

Get whether the the scan is still valid or not.

A valid scan matches the following conditions:
  • file processing hasn’t failed

  • file transfer hasn’t failed

  • the file directory still exists

property modified_time_secs: float

Get last modified time in seconds.

next_unprocessed_file(minimum_age: float = 10) Optional[Tuple[VoltageRecorderFile, VoltageRecorderFile, VoltageRecorderFile]][source]

Return a data and weights file that have not yet been processed into a stat file.

Parameters

minimum_age – minimum allowed age, the number of seconds since last modification

Returns

tuple of voltage recorder files to be processed

Return type

Tuple[VoltageRecorderFile, VoltageRecorderFile, VoltageRecorderFile]

process_file(unprocessed_file: Tuple[VoltageRecorderFile, VoltageRecorderFile, VoltageRecorderFile], dir_perms: int = 511) bool[source]

Process the data and weights file to generate a stat file.

:param Tuple[VoltageRecorderFile, VoltageRecorderFile, VoltageRecorderFile] unprocessed_file unprocessed file :param dir_perms: octal directory permissions to use on directory creation :return: flag indicating proessing was successful :rtype: bool

process_next_unprocessed_file(minimum_age: float = 10.0) None[source]

Process the next unprocessed file if one exists.

Parameters

minimum_age – minimum allowed age, the number of seconds since last modification

Returns

True if a file was processed else False

update_files() None[source]

Check the file system for new data, weights and stats files.

update_modified_time() None[source]

Update the last time the scan was processed.