ska_pst.send
Module init code.
- class ska_pst.send.DadaFileManager(folder: Path, logger: Optional[Logger] = None)[source]
Class that captures PST data files.
Parses attributes from a set of PST voltage recorder data and weights files and computes some of the derived quantities from the scales and weights, such as the inferred number of dropped/invalid packets.
- property data_files: List[DadaFileReader]
Get list of DadaFileReader objects.
- property weights_files: List[WeightsFileReader]
Get list of WeightsFileReader objects.
- class ska_pst.send.DadaFileReader(file: Path, logger: Optional[Logger] = None)[source]
Class that can be used to read a PSR DADA file.
- property bw: str
Get the BW value from header.
- property data_size: int
Get the size of the data.
- property eb_id: str
Get the EB_ID value from the header.
- property equinox: str
Get the EQUINOX value from header.
- property file_number: int
Get the FILE_NUMBER value from header.
- property file_path: str
Get path of file in string.
- property file_size: int
Get size of file in bytes.
- property freq: str
Get the FREQ value from header.
- property header: Dict[str, str]
Get header for file.
- property intent: str
Build value using SOURCE from header.
- property nbit: int
Get the number of bits the data is encoded in.
- property nchan: int
Get the NCHAN value from header.
- property ndim: int
Get the number of dimensions (2=complex, 1=real).
- property notes: str
Get the NOTES value from header.
- property npol: int
Get the NPOL value from header.
- property obs_offset: int
Get the OBS_OFFSET value.
- property observer: str
Get the OBSERVER value from header.
- property poln_ft: str
Get the POLN_FT value from header.
- property resolution: int
Get the RESOLUTION value from header.
- property resolution_per_sample: int
The amount of bytes needed for one time sample of all channels, polarisations and bits.
Note that this may be different to the
resolutionproperty as that value may include a factorUDP_NSAMPin the resultant value.
- property scan_id: str
Get the SCAN_ID value from header.
- property sky_coord_equinox: str
Get the EQUINOX value from header in the format of J<value>.
- property source: str
Get the SOURCE value from header.
- property stt_crd1: str
Get the STT_CRD1 value from header.
- property stt_crd2: str
Get the STT_CRD2 value from header.
- property telescope: str
Get the TELESCOPE value from header.
- property tsamp: float
Get the TSAMP value from header.
- property udp_nsamp: int
Get the UDP_NSAMP value from header.
- property utc_start: str
Get the UTC_START value from header.
- class ska_pst.send.DlmTransfer(ctx: ProcessingContext, file_event_timeout: float = 10.0, scan_manager: Optional[ScanManager] = None, logger: Optional[Logger] = None, verbose: bool = False)[source]
Class to manage the main execution loop of the PST to DLM transfer.
- class ska_pst.send.DspOutputFile(file_name: Path, data_product_path: Path)[source]
Provides representation for PST DSP data, weights and and control files.
- exists() bool[source]
Return true if the file exists.
- Returns
flag indicating the file_name exists on the file system.
- Return type
bool
- property file_number: int
The file number of the voltage recorder file, or 0 in not applicable.
The file number is used for sequencing the order of files to process. :return: file number or rank :rtype: int
- property file_size: int
The size of the voltage recorder file in bytes.
- Returns
size of the file_name in bytes.
- Return type
int
- property relative_path: Path
The relative path to the data_product_path.
- Returns
relative path to the data_product_path
- Return type
pathlib.Path
- class ska_pst.send.FlowThroughScan(*args: Any, **kwargs: Any)[source]
Class representing PST Flow Through Data Products for a Scan.
- class ska_pst.send.MetaDataBuilder(output_dir: Path = PosixPath('/tmp'), dada_file_manager: Optional[DadaFileManager] = None, pst_processing_mode: ska_control_model.PstProcessingMode = ska_control_model.PstProcessingMode.VOLTAGE_RECORDER, logger: Optional[Logger] = None)[source]
Class used for building metadata files.
- convert_utc_to_mjd(utc_datetime: str | datetime.datetime, datetime_format: str = '%Y-%m-%d-%H:%M:%S') float[source]
Convert datetime UTC format to MJD.
- get_total_filesize(_path: str) int[source]
Return the total size in bytes of all files under the given path.
- property output_dir: Path
Get the output directory that files are written to.
- property pst_metadata: PstMetaData
Get the PST metadata.
- class ska_pst.send.ProcessingContext(subsystem_id: str, local_path: Path, staging_path: Path, dlm_path: Path, stop_processing_evt: Event, scan_timeout: float = 300.0)[source]
The class used to manage passing around the processing context of the SEND DLM process.
This class captures common properties that are used between the
.dlm_transfer.DlmTransfer,.scan_manager.ScanManagerand.scan.Scanclasses. This class also has athreading.Eventused to signal when processing should stop, before processing a scan or a scan file the context should be checked to see if processing can proceeded by doing the following:if ctx.processing_stopped: return
- dlm_path: Path
The path to where SEND should put completed scan directories for the DLM agent to process.
- local_path: Path
The path of where PST processing will write files to that need to be processed by SEND.
- property processing_stopped: bool
Check if processing has been requested to stop.
- Returns
whether the
stop_processing_evthas been set or not.- Return type
bool
- scan_timeout: float = 300.0
The time out, in seconds, to mark a scan as being inactive.
- staging_path: Path
The path that SEND uses to stage files ready to be moved to the DLM path.
- stop_processing() None[source]
Set the
stop_processing_evtto being set.Calling this method will mean any clients waiting for the event to be set will be notified. After being set the
processing_stoppedproperty will returnTrue.
- stop_processing_evt: Event
The threading event used to signal that processing should stop.
- subsystem_id: str
The ID of the SKA subsystem used in the path of the generated scan output files.
For PST the only valid values are
pst-lowandpst-mid.
- wait_for(timeout: Optional[float] = None) bool[source]
Wait for the
stop_processing_evtto be set or until a timeout has been reached.This is a utility method to make the code cleaner to wait for the threading event to have been set.
while not ctx.wait_for(timeout=1000): # perform something
versus
while not ctx.stop_processing_evt.wait(timeout=1000): # perform something
- Parameters
timeout (float | None, optional) – how long to wait in seconds before returning false, defaults to None
- Returns
whether the
threading.Eventhas been set or not.- Return type
bool
- class ska_pst.send.Scan(*args: Any, **kwargs: Any)[source]
Base class for representing PST Scan Data Products, stored on the local file system.
- property age: float
Return the age of the scan in seconds.
- Returns
difference between the current time and the modified time in seconds
- Return type
float
- can_finalise() bool
Return true if the Scan can be finalised.
A scan is ready to be finalised if a scan completed file exists and all files have been processed.
- Returns
flag indicating if the scan can be finalised.
- Return type
bool
- compare_modified(first: Scan, second: Scan) int
Compare two scan objects by modified time to allow for sorting.
This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties.
As the scan modified time can be updated this comparator should not be used to sort dictionaries.
- Parameters
first – in the A < B comparison, this parameter is A
second – in the A < B comparison, this parameter is B
- create_scan(*, ctx: ProcessingContext, relative_scan_path: Path, logger: logging.Logger | None = None, **kwargs: Any) ska_pst.send.scan.Scan | None
Construct a Scan sub-class from the processing/observing mode scan_configuration.json file.
- Parameters
ctx (ProcessingContext) – the processing context for the DLM transfer
relative_scan_path (pathlib.Path) – the path of the scan, relative to the
ctx.local_path.
- Returns
Subclass of Scan that matches the PST processing mode
- Return type
Scan | None
- property data_product_file: Path
Return the pathlib object of the metadata file.
- Returns
pathlib object concerning the metadata file
- Return type
pathlib.Path
- data_product_file_exists() bool
Return true if the ska-data-product.yaml file exists.
- Returns
flag indicating the data product file exists
- Return type
bool
- delete_scan() None
Delete all the local data files associated with a scan.
- property dlm_output_path: Path
Get the DLM output path for the scan.
- Returns
the DLM output path for the scan.
- Return type
pathlib.Path
- finalise() None
Finalise the scan if it is marked as completed and has no unprocessed files.
See
can_finalise()for details about if a Scan can be finalised.The method will call the
_finalise_scanthat subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have theprocessing_failedproperty marked asTruewhich will result in the scan as being marked as invalid and won’t be processed any further.After calling
_finalise_scanthis will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have thetransfer_failedproperty marked asTruewhich will result in the scan as being marked as invalid and won’t be processed any further.
- property flattened_scan_path: str
Return the flattened relative scan path from the execution block, subsystem and scan IDs.
- Returns
flattened relative scan path
- Return type
str
- force_completion() None
Create the scan_completed file for the scan, if it does not exist.
- generate_data_product_file() None
Generate the ska-data-product.yaml file.
- get_all_files() List[DspOutputFile][source]
Return a list of all data, weights and control files.
- Returns
list of all pertinent files for a scan
- Return type
List[DspOutputFile]
- get_stat_filename(data_filename: Path) Path
Return the expected filename of the HDF statistics file for a data filename.
- Parameters
data_filename (pathlib.Path) – filename of the data file from which to infer the stat filename.
- Returns
stat filename corresponding to the data filename.
- Return type
pathlib.Path
- have_files_to_process() bool
Return true if there are Scan files to process.
- Returns
True if the Scan has unprocessed files
- Return type
bool
- property invalid_output_path: Path
Get the output path where an invalid processing would move files to.
If there is an error while processing a scan all the files will be moved to
$STAGING_PATH/invalid/<flatten_scan_path>. This way the files are not lost but the scan will not attempt to be reprocessed.- Returns
the output path where an invalid processing would move files to.
- Return type
pathlib.Path
- is_active(max_scan_age: float) bool
Return true if the scan is active.
An scan is considered active if the time since it’s last modification is less than the maximum scan age.
- Parameters
max_scan_age (float) – maximum age of a scan before it is considered inactive
- Returns
flag indicating if the scan is valid
- Return type
bool
- is_complete() bool
Return true if the scan_completed file exists.
- Returns
flag indicating the scan recording is complete
- Return type
bool
- is_recording() bool
Return true is the scan been not yet been marked as completed.
- Returns
flag indicating if the scan is currently recording
- Return type
bool
- is_valid() bool
Get whether the the scan is still valid or not.
- A valid scan matches the following conditions:
file processing hasn’t failed
file transfer hasn’t failed
the file directory still exists
- Returns
flag indicating if the scan is valid
- Return type
bool
- property modified_time_secs: float
Get last modified time in seconds.
- move_input_stats_to_staging() None[source]
Move the input-stats files from local scan path to the staging scan input-stats path.
- next_unprocessed_file() Optional[Tuple[DspOutputFile, DspOutputFile]][source]
Return data and weights files that has not yet been staged.
In the future some processing of the data, weights and scloffs may be performed.
- Returns
data and weights
`*.dada`files- Return type
Tuple[DspOutputFile, DspOutputFile]
- path_exists() bool
Get whether the full path to scan exists or not.
- process() None
Process the current scan.
This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready.
If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan.
- Parameters
ctx (ProcessingContext) – the processing context to check if processing should proceed or not.
- process_file(unprocessed_file: Tuple[DspOutputFile, DspOutputFile]) None
Process the pair of data and weights files to generate a stat file.
- Parameters
unprocessed_file (Tuple[DspOutputFile, DspOutputFile]) – next file to process
- scan_config_file_exists() bool
Return true if the scan-config.json file exists.
- Returns
flag indicating the scan config file exists
- Return type
bool
- property staging_scan_path: Path
Get the staging path for the scan.
- Returns
the staging path for the scan
- Return type
pathlib.Path
- update_files() None[source]
Check the file system for new data and weights files from VR or FT scans.
- update_modified_time() None
Update the last time the scan was processed with the current timestamp.
- class ska_pst.send.ScanManager(*, ctx: ProcessingContext, logger: Optional[Logger] = None)[source]
Class that manages the processing of recorded scans.
- property inactive_scans: List[Scan]
Get scans that have been not updated within the last scan_timeout.
- property relative_scan_paths: Set[Path]
Return a current set of the relative scan paths stored in the local_path.
- Returns
the list of relative scan paths.
- Return type
List[pathlib.Path].
- property scan_paths: Set[Path]
Return a set of the current full scan paths stored in the local_path.
The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id.
- Returns
the list of full scan paths.
- Return type
List[pathlib.Path].
- class ska_pst.send.VoltageRecorderScan(*args: Any, **kwargs: Any)[source]
Class representing PST Voltage Recorder Data Products for a Scan.
- class ska_pst.send.WeightsFileReader(file: Path, logger: Optional[Logger] = None)[source]
Class used to read Weights PSRDADA generated by ska_pst_dsp_disk.
- property nsamp_per_weight: int
Get the number of samples the weights are used for.
- property packet_offset: int
Get the package offset for current file.
This converts the obs_offset a packet offset by dividing the value by the weights_packet_stride. This will assert that the obs_offset is a multiple of weights_packet_stride
- property packet_scales_size: int
Get the packet scales size.
- property packet_weights_size: int
Get the packet weights size.
- ska_pst.send.dlm_transfer_main(arg_list: Optional[List[str]] = None) None
Parse command line arguments and execute the main processing loop.
- ska_pst.send.move_files(input_path: Path, output_path: Path) None[source]
Move file(s) from input to output location.
This utility method will move file(s) from the input path to the output path.
If the input path is a directory, then the output path must be a directory.
If the input is a file and the output path is directory then the file will be moved into the output directory.
If the input is a directory then this method will attempt to move the directory but if the output directory already exists it will move the individual files to the output path and then delete the input path directory.
If the input path doesn’t exist then this will log a warning that it doesn’t exist rather than raising an error.
- Parameters
input_path (pathlib.Path) – a file or directory that needs to be moved to the output path
output_path (pathlib.Path) – a file or directory that the input file(s) will be moved to
- Throws AssertionError
raised when input_path is a directory, output_path exists but is not a directory.