tasks
Module for highlevel workflow tasks to be imported into pipelines.
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.bin_data(processing_set_manager: ProcessingSetManager, dask_client: dask.distributed.Client) tuple[list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], dict][source]
Bin visibility data for subgrids.
- Parameters:
processing_set_manager – Manager for visibility data.
dask_client – Dask client.
- Returns:
visibility_bins_pruned: List of VisibilityBin objects containing binned data with empty bins removed.
- binning_info: Dictionary containing global parameters for binning data for each subgrid:
channel_width (float): The width of each channel in Hz.
min_frequency (float): The minimum frequency in Hz.
channel_count (int): The number of channels.
time_count (int): The number of times.
baseline_count (int): The number of baselines.
normalisation_factor (int): The factor for normalisation.
image_scale_padded (float): Padded scale for the image.
subgrid_size_effective (int): Effective size of the subgrid in pixels.
subgrid_size (int): Size of the subgrid in pixels.
wtower_size (int): Size of the wtower.
w_step (int): Step size for W-coordinate.
vis_name (str): Name of the visibility column, either “VISIBILITY” or “VISIBILITY_CORRECTED”.
- Return type:
tuple
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.configure_and_setup_pipeline(config_filepath: Path) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder, dask.distributed.Client][source]
Perform all common setup that is across every pipeline.
configure_pipeline
initialise_dask_client
setup_pipeline
- Parameters:
config_filepath – Configuration for the pipeline.
- Returns:
Updated configuration for pipeline. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline. dask_client: A Dask client connected to either the specified scheduler or a local cluster.
- Return type:
pipeline_config
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.configure_pipeline(config_file: Path) PipelineConfig[source]
Load configuration and parameterise workflow.
- Parameters:
config_file – Path to YAML config file.
- Returns:
Configuration for the pipeline.
- Return type:
pipeline_config
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.distributed_hogbom(dask_client: dask.distributed.Client, residual_facets_to_clean: list[numpy.ndarray], psf_image: numpy.ndarray, model_facets_to_update: list[numpy.ndarray], gain: float, niter: int, fracthresh: float) list[numpy.ndarray][source]
Distribute hogbom over facets via dask.
- Parameters:
dask_client – Dask client to distribute tasks on
residual_facets – residual facets to clean
psf_image – Single central PSF image of same dimensions as the facets
model_facets – List of Images (per facet) containing the clean components
gain – The “loop gain”, i.e. the fraction of the brightest pixel that is removed in each iteration
niter – Maximum number of minor cycles to clean for.
fracthresh – Fractional threshold at which to stop cleaning. (Computed over entire PSF not single facet)
- Returns:
List of Images (per facet) containing the residual
- Return type:
residual_facets
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.finish_setup(pipeline_config: PipelineConfig, processing_set_manager: ProcessingSetManager, swiftly_manager: Swiftly, gridding_manager: Gridder) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder][source]
Calculates and stores key observation parameters.
This includes pixel size in arcsec, image phase centre, etc.
- Parameters:
pipeline_config – PipelineConfig instance that stores all information about the observation and pipeline.
processing_set_manager – Manager for visibility data.
swiftly_manager – Swiftly manager for the pipeline.
gridding_manager – Gridding manager for the pipeline.
- Returns:
An updated PipelineConfig instance. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline.
- Return type:
pipeline_config
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_clean_beam_array(pipeline_config: PipelineConfig, psf: numpy.ndarray) numpy.ndarray[source]
Generates a clean beam array based on the provided point spread function (PSF) and the pixel size.
The function calculates the clean beam parameters, then generates the clean beam array.
- Parameters:
pipeline_config – Configuration object containing image information and output directory.
psf – 2D array representing the point spread function (PSF) of the image.
- Returns:
A 2D array representing the clean beam image generated from the PSF.
- Return type:
clean_beam
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_clean_beam_parameters(pipeline_config: PipelineConfig, psf: numpy.ndarray) numpy.ndarray[source]
Generates clean beam parameters based on the provided point spread function (PSF) and the pixel size.
- Parameters:
pipeline_config – Configuration object containing image information and output directory.
psf – 2D array representing the point spread function (PSF) of the image.
- Returns:
- A dictionary containing the clean beam parameters:
”sigma_x”: Standard deviation of the clean beam along the x-axis in pixel units.
”sigma_y”: Standard deviation of the clean beam along the y-axis in pixel units.
”position_angle”: Position angle of the clean beam in degrees.
- Return type:
dict
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_facets_from_image_data(fits_path: Path | str, image_size: int, facet_count: int) list[numpy.ndarray][source]
Generate facets from a FITS image file.
- Parameters:
fits_path – The file path to the FITS file.
image_size – The size of the image length in pixels.
facet_count – Total number of facets.
- Returns:
List of image facets.
- Return type:
facets
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_facets_with_corrections(swiftly_manager: Swiftly, gridding_manager: Gridder, total_num_visibilities: int, channel_count: int)[source]
Generate facets with grid corrections applied.
- Parameters:
swiftly_manager – SwiFTly object.
gridding_manager – Gridder object.
total_num_visibilities – factor used with pixel sum to normalise pixels
- Returns:
list of facets after grid corrections normalisation_factor: factor used to normalise pixels
- Return type:
corrected_facets (list[np.ndarray])
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.get_dataset_info(dataset: xarray.Dataset, vis_name: str)[source]
Get metadata from dataset.
- Parameters:
dataset – dataset containing visibility data
vis_name – name of visibility data column
- Returns:
- dictionary containing information about dataset:
channel_width: float min_frequency: float channel_count: int time_count: int baseline_count: int total_num_visibilities: int
- Return type:
metadata
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.grid_visibilities(visibility_bins: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], binning_info: dict, swiftly_manager: Swiftly, gridding_manager: Gridder) Swiftly[source]
Grid visibilities.
- Parameters:
visibility_bins – list of VisibilityBin objects containing binned visibility data.
binning_info – parameters for gridding
swiftly_manager – Swiftly manager for the pipeline.
gridding_manager – Gridding manager for the pipeline.
- Returns:
Swiftly manager with tasks for gridding.
- Return type:
swiftly_manager
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.hogbom(residual: numpy.ndarray, psf: numpy.ndarray, model: numpy.ndarray, window: bool, gain: float, thresh: float, niter: int, fracthresh: float, prefix='')[source]
Modified function from ska_sdp_func_python/image/cleaners.py
Clean the point spread function from a residual image.
See Hogbom CLEAN (1974A&AS…15..417H).
This version operates on numpy arrays. :py:func:’ska_sdp_func_python.image.deconvolution.deconvolve_cube’ provides a version for Images.
- Parameters:
residual – The residual Image, i.e., the Image to be deconvolved
psf – The point spread-function
model – List of Images (per facet) containing the clean components
window – Regions where clean components are allowed. If True, entire residual Image is allowed
gain – The “loop gain”, i.e., the fraction of the brightest pixel that is removed in each iteration
thresh – Cleaning stops when the maximum of the absolute deviation of the residual is less than this value
niter – Maximum number of components to make if the threshold thresh is not hit
fracthresh – The predefined fractional threshold at which to stop cleaning
prefix – Informational prefix for log messages
- Returns:
res: array of the updated residual Image
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_dask_client(dask_address: str) dask.distributed.Client[source]
Initialise and return a Dask client.
This function creates a Dask client connected to the specified Dask scheduler address. If no address is provided, it initializes a local Dask cluster and returns a client connected to it.
- Parameters:
dask_address – The address of the Dask scheduler. If not provided, a local Dask cluster will be used.
- Returns:
- A Dask client connected to either the specified scheduler or a
local cluster.
- Return type:
Client
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_gridder(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]
Constructs the gridding manager and updates the pipeline config.
- Parameters:
pipeline_config – Configuration for pipeline.
- Returns:
Gridding manager for the pipeline. pipeline_config: Configuration for the pipeline.
- Return type:
gridding_manager
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_self_calibration(pipeline_config: PipelineConfig) SelfCalibrationManager[source]
Initialize self-calibration.
- Parameters:
pipeline_config – Configuration for the pipeline.
processing_set_manager – Manager for visibility data.
- Returns:
Manager for applying self-calibration.
- Return type:
self_calibration_manager
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_swiftly(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]
Constructs the swiftly manager and updates the pipeline config.
- Parameters:
pipeline_config – Configuration for pipeline.
- Returns:
Swiftly manager for the pipeline. pipeline_config: Configuration for the pipeline.
- Return type:
swiftly_manager
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.load_data(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]
Loads measurement set from file.
Updates configuration using observation info.
- Parameters:
pipeline_config – Configuration for pipeline.
- Returns:
Manager for visibility data. pipeline_config: Configuration for pipeline.
- Return type:
processing_set_manager
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.predict_residual_visibilities(facets: list[numpy.ndarray], visibility_bins_list: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], binning_info: dict, swiftly_manager: Swiftly, gridding_manager: Gridder, normalisation_factor: float) None[source]
Degrid (predict) visibilities.
- Parameters:
processing_set_manager (ProcessingSetManager) – Processing manager for original data set.
swiftly_manager (Swiftly) – SwiFTly manager object.
gridding_manager (Gridder) – Gridder manager for the pipeline.
facets (np.ndarray) – list of image facets to degrid
- Returns:
list of arrays containing degridded visibilities
- Return type:
visibilities (list[np.ndarray])
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.restore_model(model_facets: list[numpy.ndarray], residual_facets: list[numpy.ndarray], clean_beam_parameters: dict[str, float], dask_client: dask.distributed.Client) list[numpy.ndarray][source]
Convolves model facets with a clean beam then adds the corresponding residual facets.
- Parameters:
model_facets – A list of 2D arrays representing the model facets.
residual_facets – A list of 2D arrays representing the residual facets.
clean_beam_parameters – A dictionary containing the clean beam parameters: - “sigma_x”: Standard deviation of the clean beam along the x-axis. - “sigma_y”: Standard deviation of the clean beam along the y-axis. - “position_angle”: Position angle of the clean beam in degrees.
dask_client – The dask client used to distribute the work.
- Returns:
A list of 2D numpy arrays representing the cleaned facets (convolved model + residual).
- Return type:
cleaned_facets
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.save_image(pipeline_config: PipelineConfig, image_data: numpy.ndarray, image_name: str)[source]
Constructs an image from full (non facted) imaged ata and exports to file. If you are working with facets then first join them using join_facets.
- Parameters:
pipeline_config – Configuration for pipeline.
image_data – Pixel data for the image.
image_name – Name (prefix) for the output image; .fits, .png extensions will be added automatically.
- Returns:
None
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.setup_pipeline(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder][source]
Setup required managers for all pipelines.
- Parameters:
pipeline_config – Initial configuration for pipeline.
- Returns:
Updated configuration for pipeline. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline.
- Return type:
pipeline_config
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.split_image(image_data: numpy.ndarray, facet_count: int) list[numpy.ndarray][source]
Split an image into a specified number of square facets.
- Parameters:
image_data – A 2D square array representing the image to be split.
facet_count – The number of square facets to split the image into.
- Returns:
A list of 2D arrays, each representing a facet of the original image.
- Return type:
facets
- ska_sdp_distributed_self_cal_prototype.workflow.tasks.validate_visibility_bins(visibility_bins: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin]) bool[source]
Task to validate visibility data.
Checks for negative values and NaNs.
- Parameters:
data – array of data to validate
- Returns:
True if all validation checks pass, otherwise False
- Return type:
data_ok