ska_pst.send
Module init code.
- class ska_pst.send.DetectedFilterbankScan(*args: Any, **kwargs: Any)[source]
Class representing PST Detected Filterbank Products for a Scan.
- get_all_local_files() list[ska_pst.send.dsp_output_file.DspOutputFile][source]
Return a list of all data and config files.
- Returns
list of all pertinent files for a scan
- Return type
list[DspOutputFile]
- get_scan_metadata() ScanMetadata
Get the metadata for the scan.
This method uses the generated PSRFITS files to extract metadata that is needed to create an instance of
ScanMetadata.- Returns
the metadata for the scan.
- Return type
- process_file(data_file: DspOutputFile) None[source]
Process the next PSRFITS file.
This will move the file to the staging directory.
- Parameters
data_file (DspOutputFile) – next file to process
- process_next_file() bool
Process the next unprocessed file if one exists.
- Returns
True if a file was processed else False
- Return type
bool
- class ska_pst.send.DlmTransfer(ctx: ProcessingContext, file_event_timeout: float = 10.0, scan_manager: Optional[ScanManager] = None, logger: Optional[Logger] = None, verbose: bool = False)[source]
Class to manage the main execution loop of the PST to DLM transfer.
- class ska_pst.send.DspOutputFile(file_name: Path, data_product_path: Path)[source]
Provides representation for PST DSP data, weights and and control files.
- exists() bool[source]
Return true if the file exists.
- Returns
flag indicating the file_name exists on the file system.
- Return type
bool
- property file_number: int
The file number of the voltage recorder file, or 0 in not applicable.
The file number is used for sequencing the order of files to process. :return: file number or rank :rtype: int
- property file_size: int
The size of the voltage recorder file in bytes.
- Returns
size of the file_name in bytes.
- Return type
int
- property relative_path: Path
The relative path to the data_product_path.
- Returns
relative path to the data_product_path
- Return type
pathlib.Path
- class ska_pst.send.FlowThroughScan(*args: Any, **kwargs: Any)[source]
Class representing PST Flow Through Data Products for a Scan.
- get_scan_metadata() ScanMetadata
Get the metadata for the scan.
- class ska_pst.send.ProcessingContext(subsystem_id: str, telescope_config: TelescopeConfig, local_path: Path, staging_path: Path, dlm_path: Path, stop_processing_evt: Event, scan_timeout: float = 300.0)[source]
The class used to manage passing around the processing context of the SEND DLM process.
This class captures common properties that are used between the
.dlm_transfer.DlmTransfer,.scan_manager.ScanManagerand.scan.Scanclasses. This class also has athreading.Eventused to signal when processing should stop, before processing a scan or a scan file the context should be checked to see if processing can proceeded by doing the following:if ctx.processing_stopped: return
- dlm_path: Path
The path to where SEND should put completed scan directories for the DLM agent to process.
- local_path: Path
The path of where PST processing will write files to that need to be processed by SEND.
- property processing_stopped: bool
Check if processing has been requested to stop.
- Returns
whether the
stop_processing_evthas been set or not.- Return type
bool
- scan_timeout: float = 300.0
The time out, in seconds, to mark a scan as being inactive.
- staging_path: Path
The path that SEND uses to stage files ready to be moved to the DLM path.
- stop_processing() None[source]
Set the
stop_processing_evtto being set.Calling this method will mean any clients waiting for the event to be set will be notified. After being set the
processing_stoppedproperty will returnTrue.
- stop_processing_evt: Event
The threading event used to signal that processing should stop.
- subsystem_id: str
The ID of the SKA subsystem used in the path of the generated scan output files.
For PST the only valid values are
pst-lowandpst-mid.
- telescope_config: TelescopeConfig
The telescope that SEND is deployed in.
This allows getting specific telescope configuration and for converting a CSP-LMC scan configuration into a PST internal representation.
- wait_for(timeout: Optional[float] = None) bool[source]
Wait for the
stop_processing_evtto be set or until a timeout has been reached.This is a utility method to make the code cleaner to wait for the threading event to have been set.
while not ctx.wait_for(timeout=1000): # perform something
versus
while not ctx.stop_processing_evt.wait(timeout=1000): # perform something
- Parameters
timeout (float | None, optional) – how long to wait in seconds before returning false, defaults to None
- Returns
whether the
threading.Eventhas been set or not.- Return type
bool
- class ska_pst.send.PstConfig(image: str = 'artefact.skao.int/ska-pst/ska-pst', version: str = '2.2.0')[source]
A data class to represent the config field of a pst metadata file.
Configuration of generating software.
- image: str = 'artefact.skao.int/ska-pst/ska-pst'
The PST image name.
- version: str = '2.2.0'
The version of PST.
- class ska_pst.send.PstContext(observer: str = '', intent: str = 'Tied-array beam observation', notes: str = '')[source]
A data class to represent the context field of a pst metadata file.
context is meant to be data passed verbatim through from OET/TMC as part of AssignResources (DLM) or Configure (other sub-systems). To be made part of ska_schemas schemas.
- intent: str = 'Tied-array beam observation'
The intent passed from OET/TMC
- notes: str = ''
The notes passed from OET/TMC
- observer: str = ''
Name or role of the person conducting the observation
- class ska_pst.send.PstFiles(description: str, path: str, size: int, status: str)[source]
A data class to represent the files field of a PST metadata file.
Documentation concerning files coupled to the PST metadata file.
- description: str
The description of the file.
- path: str
The relative path of the file.
- size: int
The size of the file in bytes.
- status: str
The status of the file.
- class ska_pst.send.PstMetadata(interface: str = 'http://schema.skao.int/ska-data-product-meta/0.1', execution_block: str = '', context: ~ska_pst.send.metadata.PstContext = <factory>, config: ~ska_pst.send.metadata.PstConfig = <factory>, files: list[ska_pst.send.metadata.PstFiles] = <factory>, obscore: ~ska_pst.send.metadata.PstObsCore = <factory>)[source]
Class representing the PST metadata.
This class encapsulates the metadata information for a PST (Processing Science Target) data product. It includes details about the interface, execution block, context, configuration, files, and observation core information.
- context: PstContext
The context information for the PST data.
- execution_block: str = ''
The execution block identifier.
- files: list[ska_pst.send.metadata.PstFiles]
List of files associated with the PST data.
- interface: str = 'http://schema.skao.int/ska-data-product-meta/0.1'
The interface of the metadata.
- obscore: PstObsCore
The observation core information for the PST data.
- class ska_pst.send.PstObsCore(dataproduct_type: str = 'timeseries', dataproduct_subtype: str = 'voltages', calib_level: int = 0, obs_id: str = '', access_estsize: int = 0, target_name: str = '', s_ra: float = 0.0, s_dec: float = 0.0, t_min: float = 0.0, t_max: float = 0.0, t_resolution: float = 0.0, t_exptime: float = 0.0, facility_name: str = 'SKA-Observatory', instrument_name: str = '', pol_xel: int = 0, pol_states: str = '', em_xel: int = 0, em_unit: str = 'Hz', em_min: float = 0.0, em_max: float = 0.0, em_res_power: str = 'null', em_resolution: float = 0.0, o_ucd: str = 'null')[source]
A dataclass to definition of the standard IVOA ObsCore table/view.
- access_estsize: int = 0
An estimate of the overall data product size in bytes.
This value derived from the recorded files of the scan.
- calib_level: int = 0
The calibration level.
Valid values are 0, 1, 2, 3, or 4.
0 = Raw instrumental data 1 = Instrumental data in a standard format (FITS, VOTable, SDFITS, ASDM, etc.) 2 = Calibrated, science ready data with the instrument signature removed 3 = Enhanced data products like mosaics, resampled or drizzled images, or heavily processed survey fields 4 = Analysis data products generated after some scientific data manipulation or interpretation.
- dataproduct_subtype: str = 'voltages'
The subtype of the data product.
Values can be voltages, spectra, oversampled, channelised, quantised voltages
- dataproduct_type: str = 'timeseries'
Logical data product type.
Values can be image, cube, spectrum, sed, timeseries, visibility, event or measurements.
- em_max: float = 0.0
Stop in spectral coordinates (vacuum wavelength).
For PST this is the centre frequency of the last PST channel.
- em_min: float = 0.0
Start in spectral coordinates (vacuum wavelength).
For PST this is the centre frequency of the first PST channel.
- em_res_power: str = 'null'
Spectral resolving power.
For PST this is not used and defaults to null.
- em_resolution: float = 0.0
The spectral resolution.
For PST this is the width of a PST channel in Hz.
- em_unit: str = 'Hz'
Spectral coordinates unit type.
The unit used of the values of
em_minandem_max.This defaults to Hz.
- em_xel: int = 0
Number of elements along the spectral axis.
For PST this is the number output channels (e.g. NCHAN_OUT).
- facility_name: str = 'SKA-Observatory'
The observatory or facility used to collect the data.
- instrument_name: str = ''
The name of the instrument used for the acquisition of the observation.
- o_ucd: str = 'null'
Unified Content Descriptor of observable
Example of this are phot.count or phot.flux.density see section 4.18 and B.6.4.1 in Obscore standard, UCD1+ controlled vocabulary and especially list of observables),
This is not used PST and is set defaults to “null”.
- obs_id: str = ''
The scan id.
- pol_states: str = ''
List of polarisation states.
- pol_xel: int = 0
Number of polarisation samples.
This is the number of output polarisations (i.e. NPOL_OUT), not the input signal NPOL.
- s_dec: float = 0.0
Centre of observation declination, ICRS.
- s_ra: float = 0.0
The centre of observation right ascension, ICRS.
This value is in degrees, not hour angle.
- t_exptime: float = 0.0
Total exposure time.
This the length of the PST scan.
- t_max: float = 0.0
End time in Modified Julian Date (MJD).
- t_min: float = 0.0
Start time in Modified Julian Date (MJD).
- t_resolution: float = 0.0
Temporal resolution FWHM (full width at half maximum) in seconds.
For PST this the TSAMP converted to seconds. This is the output TSAMP which for Voltage Recorder and Flow Through is the same as the input TSAMP. For Detected filterbank this can be different when there are time averaging or inverse filterbank applied.
- target_name: str = ''
The name of the target.
For PST this is the SOURCE field.
- class ska_pst.send.Scan(*args: Any, **kwargs: Any)[source]
Base class for representing PST Scan Data Products, stored on the local file system.
- property age: float
Return the age of the scan in seconds.
- Returns
difference between the current time and the modified time in seconds
- Return type
float
- can_finalise() bool
Return true if the Scan can be finalised.
A scan is ready to be finalised if a scan completed file exists and all files have been processed.
- Returns
flag indicating if the scan can be finalised.
- Return type
bool
- compare_modified(first: Scan, second: Scan) int
Compare two scan objects by modified time to allow for sorting.
This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties.
As the scan modified time can be updated this comparator should not be used to sort dictionaries.
- Parameters
first – in the A < B comparison, this parameter is A
second – in the A < B comparison, this parameter is B
- property data_product_file: Path
Return the pathlib object of the metadata file.
- Returns
pathlib object concerning the metadata file
- Return type
pathlib.Path
- data_product_file_exists() bool
Return true if the ska-data-product.yaml file exists.
- Returns
flag indicating the data product file exists
- Return type
bool
- delete_scan() None
Delete all the local data files associated with a scan.
- property dlm_output_path: Path
Get the DLM output path for the scan.
- Returns
the DLM output path for the scan.
- Return type
pathlib.Path
- finalise() None
Finalise the scan if it is marked as completed and has no unprocessed files.
See
can_finalise()for details about if a Scan can be finalised.The method will call the
_finalise_scanthat subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have theprocessing_failedproperty marked asTruewhich will result in the scan as being marked as invalid and won’t be processed any further.After calling
_finalise_scanthis will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have thetransfer_failedproperty marked asTruewhich will result in the scan as being marked as invalid and won’t be processed any further.
- property flattened_scan_path: str
Return the flattened relative scan path from the execution block, subsystem and scan IDs.
- Returns
flattened relative scan path
- Return type
str
- force_completion() None
Create the scan_completed file for the scan, if it does not exist.
- generate_data_product_file() None
Generate the ska-data-product.yaml file.
- abstract get_scan_metadata() ScanMetadata[source]
Get the metadata for the scan.
This is specific for the type of a Scan. This allows for the scan type to expect the values from the PST scan config, such as the number of output channels or polarisations.
- get_stat_filename(data_filename: Path) Path
Return the expected filename of the HDF statistics file for a data filename.
- Parameters
data_filename (pathlib.Path) – filename of the data file from which to infer the stat filename.
- Returns
stat filename corresponding to the data filename.
- Return type
pathlib.Path
- abstract have_files_to_process() bool[source]
Return true if there are Scan files to process.
- Returns
True if the Scan has unprocessed files
- Return type
bool
- property invalid_output_path: Path
Get the output path where an invalid processing would move files to.
If there is an error while processing a scan all the files will be moved to
$STAGING_PATH/invalid/<flatten_scan_path>. This way the files are not lost but the scan will not attempt to be reprocessed.- Returns
the output path where an invalid processing would move files to.
- Return type
pathlib.Path
- is_active(max_scan_age: float) bool
Return true if the scan is active.
An scan is considered active if the time since it’s last modification is less than the maximum scan age.
- Parameters
max_scan_age (float) – maximum age of a scan before it is considered inactive
- Returns
flag indicating if the scan is valid
- Return type
bool
- is_complete() bool
Return true if the scan_completed file exists.
- Returns
flag indicating the scan recording is complete
- Return type
bool
- is_recording() bool
Return true is the scan been not yet been marked as completed.
- Returns
flag indicating if the scan is currently recording
- Return type
bool
- is_valid() bool
Get whether the scan is still valid or not.
- A valid scan matches the following conditions:
file processing hasn’t failed
file transfer hasn’t failed
the file directory still exists
- Returns
flag indicating if the scan is valid
- Return type
bool
- property modified_time_secs: float
Get last modified time in seconds.
- move_input_stats_to_staging() None[source]
Move the input-stats files from local scan path to the staging scan input-stats path.
- path_exists() bool
Get whether the full path to scan exists or not.
- process() None
Process the current scan.
This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready.
If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan.
- Parameters
ctx (ProcessingContext) – the processing context to check if processing should proceed or not.
- abstract process_next_file() bool[source]
Process the next unprocessed file if one exists.
- Returns
True if a file was processed else False
- Return type
bool
- scan_config_file_exists() bool
Return true if the scan-config.json file exists.
- Returns
flag indicating the scan config file exists
- Return type
bool
- property staging_scan_path: Path
Get the staging path for the scan.
- Returns
the staging path for the scan
- Return type
pathlib.Path
- update_modified_time() None
Update the last time the scan was processed with the current timestamp.
- class ska_pst.send.ScanManager(*, ctx: ProcessingContext, logger: Optional[Logger] = None)[source]
Class that manages the processing of recorded scans.
- property inactive_scans: List[Scan]
Get scans that have been not updated within the last scan_timeout.
- property relative_scan_paths: Set[Path]
Return a current set of the relative scan paths stored in the local_path.
- Returns
the list of relative scan paths.
- Return type
List[pathlib.Path].
- property scan_paths: Set[Path]
Return a set of the current full scan paths stored in the local_path.
The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id.
- Returns
the list of full scan paths.
- Return type
List[pathlib.Path].
- class ska_pst.send.ScanMetadata(*, utc_start: datetime, picoseconds: int, scan_length_secs: float, tsamp_out: float, centre_freq_out_mhz: float, bandwidth_out_mhz: float, nchan_out: int, polarisations_out: list[str], scan_files: list[ska_pst.send.metadata.PstFiles], output_data_type: str)[source]
Metadata relating to a scan.
This is a PST internal representation of that metadata that is used in the generation of the SKA data product metadata file.
Different scan types will need to be able to extract this information from the scan configuration and/or output files.
- property bandwidth_out_hz: float
Get the output bandwidth in Hertz.
- bandwidth_out_mhz: float
The bandwidth of the output data, in MHz.
- centre_freq_out_mhz: float
The centre frequency of the output data, in MHz.
- property end_time: datetime
Get the scan’s end time.
This is defined as the
start_time + scan_length_secs.
- property end_time_mjd: float
Get the scan’s end time as a Modified Julian date (MJD) value.
- nchan_out: int
The number of output channels.
- output_data_type: str
The output data type as defined in the solution intent for a given scan type.
This should be “voltages” for voltage recorder and flow through modes and for detected filter bank it should be “spectra”.
- picoseconds: int
The fractional seconds when the scan starts in picoseconds.
In PST the fractional seconds of a scan’s start time is stored as picoseconds in a separate value from the
utc_startvalue.
- polarisations_out: list[str]
The polarisations in the output data.
- scan_files: list[ska_pst.send.metadata.PstFiles]
The output files for the scan.
The property
total_file_sizeuses this list to get the overall size of the scan products.
- scan_length_secs: float
The length of a scan in seconds.
This value can capture fractional seconds.
- property start_time: datetime
Get the scan’s start time.
This returns the UTC_START + PICOSECONDS as a Python’s datetime object.
- property start_time_mjd: float
Get the scan’s start time as a Modified Julian date (MJD) value.
- property total_dataproducts_size: int
Get the total size of the data products for the scan.
- tsamp_out: float
The temporal resolution in microseconds.
- utc_start: datetime
The UTC_START of the scan.
This value must have the UTC timezone set rather than being a naive datetime instance.
In PST this value has second resolution and no fractional seconds, that is captured by the
picosecondsvalue.
- class ska_pst.send.VoltageRecorderScan(*args: Any, **kwargs: Any)[source]
Class representing PST Voltage Recorder Data Products for a Scan.
- get_scan_metadata() ScanMetadata
Get the metadata for the scan.
- ska_pst.send.create_scan(*, ctx: ProcessingContext, relative_scan_path: Path, logger: Optional[Logger] = None, **kwargs: Any) ska_pst.send.scan.Scan | None[source]
Construct a Scan sub-class from the processing/observing mode scan_configuration.json file.
- Parameters
ctx (ProcessingContext) – the processing context for the DLM transfer
relative_scan_path (pathlib.Path) – the path of the scan, relative to the
ctx.local_path.
- Returns
Subclass of Scan that matches the PST processing mode
- Return type
Scan | None
- ska_pst.send.dlm_transfer_main(arg_list: Optional[list[str]] = None) None
Parse command line arguments and execute the main processing loop.
- Parameters
arg_list (list[str] | None, optional) – a list of arguments, defaults to None. Python’s argparse will use this list if set else it uses the command line arguments. Setting this list is used in unit testing.
- ska_pst.send.move_files(input_path: Path, output_path: Path) None[source]
Move file(s) from input to output location.
This utility method will move file(s) from the input path to the output path.
If the input path is a directory, then the output path must be a directory.
If the input is a file and the output path is directory then the file will be moved into the output directory.
If the input is a directory then this method will attempt to move the directory but if the output directory already exists it will move the individual files to the output path and then delete the input path directory.
If the input path doesn’t exist then this will log a warning that it doesn’t exist rather than raising an error.
- Parameters
input_path (pathlib.Path) – a file or directory that needs to be moved to the output path
output_path (pathlib.Path) – a file or directory that the input file(s) will be moved to
- Throws AssertionError
raised when input_path is a directory, output_path exists but is not a directory.