tasks

Module for highlevel workflow tasks to be imported into pipelines.

ska_sdp_distributed_self_cal_prototype.workflow.tasks.bin_data(processing_set_manager: ProcessingSetManager, dask_client: dask.distributed.Client) tuple[list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], dict][source]

Bin visibility data for subgrids.

Parameters:
  • processing_set_manager – Manager for visibility data.

  • dask_client – Dask client.

Returns:

  • visibility_bins_pruned: List of VisibilityBin objects containing binned data with empty bins removed.

  • binning_info: Dictionary containing global parameters for binning data for each subgrid:
    • channel_width (float): The width of each channel in Hz.

    • min_frequency (float): The minimum frequency in Hz.

    • channel_count (int): The number of channels.

    • time_count (int): The number of times.

    • baseline_count (int): The number of baselines.

    • normalisation_factor (int): The factor for normalisation.

    • image_scale_padded (float): Padded scale for the image.

    • subgrid_size_effective (int): Effective size of the subgrid in pixels.

    • subgrid_size (int): Size of the subgrid in pixels.

    • wtower_size (int): Size of the wtower.

    • w_step (int): Step size for W-coordinate.

    • vis_name (str): Name of the visibility column, either “VISIBILITY” or “VISIBILITY_CORRECTED”.

Return type:

tuple

ska_sdp_distributed_self_cal_prototype.workflow.tasks.configure_and_setup_pipeline(config_filepath: Path) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder, dask.distributed.Client][source]

Perform all common setup that is across every pipeline.

  1. configure_pipeline

  2. initialise_dask_client

  3. setup_pipeline

Parameters:

config_filepath – Configuration for the pipeline.

Returns:

Updated configuration for pipeline. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline. dask_client: A Dask client connected to either the specified scheduler or a local cluster.

Return type:

pipeline_config

ska_sdp_distributed_self_cal_prototype.workflow.tasks.configure_pipeline(config_file: Path) PipelineConfig[source]

Load configuration and parameterise workflow.

Parameters:

config_file – Path to YAML config file.

Returns:

Configuration for the pipeline.

Return type:

pipeline_config

ska_sdp_distributed_self_cal_prototype.workflow.tasks.distributed_hogbom(dask_client: dask.distributed.Client, residual_facets_to_clean: list[numpy.ndarray], psf_image: numpy.ndarray, model_facets_to_update: list[numpy.ndarray], gain: float, niter: int, fracthresh: float) list[numpy.ndarray][source]

Distribute hogbom over facets via dask.

Parameters:
  • dask_client – Dask client to distribute tasks on

  • residual_facets – residual facets to clean

  • psf_image – Single central PSF image of same dimensions as the facets

  • model_facets – List of Images (per facet) containing the clean components

  • gain – The “loop gain”, i.e. the fraction of the brightest pixel that is removed in each iteration

  • niter – Maximum number of minor cycles to clean for.

  • fracthresh – Fractional threshold at which to stop cleaning. (Computed over entire PSF not single facet)

Returns:

List of Images (per facet) containing the residual

Return type:

residual_facets

ska_sdp_distributed_self_cal_prototype.workflow.tasks.finish_setup(pipeline_config: PipelineConfig, processing_set_manager: ProcessingSetManager, swiftly_manager: Swiftly, gridding_manager: Gridder) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder][source]

Calculates and stores key observation parameters.

This includes pixel size in arcsec, image phase centre, etc.

Parameters:
  • pipeline_config – PipelineConfig instance that stores all information about the observation and pipeline.

  • processing_set_manager – Manager for visibility data.

  • swiftly_manager – Swiftly manager for the pipeline.

  • gridding_manager – Gridding manager for the pipeline.

Returns:

An updated PipelineConfig instance. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline.

Return type:

pipeline_config

ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_clean_beam_array(pipeline_config: PipelineConfig, psf: numpy.ndarray) numpy.ndarray[source]

Generates a clean beam array based on the provided point spread function (PSF) and the pixel size.

The function calculates the clean beam parameters, then generates the clean beam array.

Parameters:
  • pipeline_config – Configuration object containing image information and output directory.

  • psf – 2D array representing the point spread function (PSF) of the image.

Returns:

A 2D array representing the clean beam image generated from the PSF.

Return type:

clean_beam

ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_clean_beam_parameters(pipeline_config: PipelineConfig, psf: numpy.ndarray) numpy.ndarray[source]

Generates clean beam parameters based on the provided point spread function (PSF) and the pixel size.

Parameters:
  • pipeline_config – Configuration object containing image information and output directory.

  • psf – 2D array representing the point spread function (PSF) of the image.

Returns:

A dictionary containing the clean beam parameters:
  • ”sigma_x”: Standard deviation of the clean beam along the x-axis in pixel units.

  • ”sigma_y”: Standard deviation of the clean beam along the y-axis in pixel units.

  • ”position_angle”: Position angle of the clean beam in degrees.

Return type:

dict

ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_facets_from_image_data(fits_path: Path | str, image_size: int, facet_count: int) list[numpy.ndarray][source]

Generate facets from a FITS image file.

Parameters:
  • fits_path – The file path to the FITS file.

  • image_size – The size of the image length in pixels.

  • facet_count – Total number of facets.

Returns:

List of image facets.

Return type:

facets

ska_sdp_distributed_self_cal_prototype.workflow.tasks.generate_facets_with_corrections(swiftly_manager: Swiftly, gridding_manager: Gridder, total_num_visibilities: int, channel_count: int)[source]

Generate facets with grid corrections applied.

Parameters:
  • swiftly_manager – SwiFTly object.

  • gridding_manager – Gridder object.

  • total_num_visibilities – factor used with pixel sum to normalise pixels

Returns:

list of facets after grid corrections normalisation_factor: factor used to normalise pixels

Return type:

corrected_facets (list[np.ndarray])

ska_sdp_distributed_self_cal_prototype.workflow.tasks.get_dataset_info(dataset: xarray.Dataset, vis_name: str)[source]

Get metadata from dataset.

Parameters:
  • dataset – dataset containing visibility data

  • vis_name – name of visibility data column

Returns:

dictionary containing information about dataset:

channel_width: float min_frequency: float channel_count: int time_count: int baseline_count: int total_num_visibilities: int

Return type:

metadata

ska_sdp_distributed_self_cal_prototype.workflow.tasks.grid_visibilities(visibility_bins: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], binning_info: dict, swiftly_manager: Swiftly, gridding_manager: Gridder) Swiftly[source]

Grid visibilities.

Parameters:
  • visibility_bins – list of VisibilityBin objects containing binned visibility data.

  • binning_info – parameters for gridding

  • swiftly_manager – Swiftly manager for the pipeline.

  • gridding_manager – Gridding manager for the pipeline.

Returns:

Swiftly manager with tasks for gridding.

Return type:

swiftly_manager

ska_sdp_distributed_self_cal_prototype.workflow.tasks.hogbom(residual: numpy.ndarray, psf: numpy.ndarray, model: numpy.ndarray, window: bool, gain: float, thresh: float, niter: int, fracthresh: float, prefix='')[source]

Modified function from ska_sdp_func_python/image/cleaners.py

Clean the point spread function from a residual image.

See Hogbom CLEAN (1974A&AS…15..417H).

This version operates on numpy arrays. :py:func:’ska_sdp_func_python.image.deconvolution.deconvolve_cube’ provides a version for Images.

Parameters:
  • residual – The residual Image, i.e., the Image to be deconvolved

  • psf – The point spread-function

  • model – List of Images (per facet) containing the clean components

  • window – Regions where clean components are allowed. If True, entire residual Image is allowed

  • gain – The “loop gain”, i.e., the fraction of the brightest pixel that is removed in each iteration

  • thresh – Cleaning stops when the maximum of the absolute deviation of the residual is less than this value

  • niter – Maximum number of components to make if the threshold thresh is not hit

  • fracthresh – The predefined fractional threshold at which to stop cleaning

  • prefix – Informational prefix for log messages

Returns:

res: array of the updated residual Image

ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_dask_client(dask_address: str) dask.distributed.Client[source]

Initialise and return a Dask client.

This function creates a Dask client connected to the specified Dask scheduler address. If no address is provided, it initializes a local Dask cluster and returns a client connected to it.

Parameters:

dask_address – The address of the Dask scheduler. If not provided, a local Dask cluster will be used.

Returns:

A Dask client connected to either the specified scheduler or a

local cluster.

Return type:

Client

ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_gridder(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]

Constructs the gridding manager and updates the pipeline config.

Parameters:

pipeline_config – Configuration for pipeline.

Returns:

Gridding manager for the pipeline. pipeline_config: Configuration for the pipeline.

Return type:

gridding_manager

ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_self_calibration(pipeline_config: PipelineConfig) SelfCalibrationManager[source]

Initialize self-calibration.

Parameters:
  • pipeline_config – Configuration for the pipeline.

  • processing_set_manager – Manager for visibility data.

Returns:

Manager for applying self-calibration.

Return type:

self_calibration_manager

ska_sdp_distributed_self_cal_prototype.workflow.tasks.initialise_swiftly(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]

Constructs the swiftly manager and updates the pipeline config.

Parameters:

pipeline_config – Configuration for pipeline.

Returns:

Swiftly manager for the pipeline. pipeline_config: Configuration for the pipeline.

Return type:

swiftly_manager

ska_sdp_distributed_self_cal_prototype.workflow.tasks.load_data(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig][source]

Loads measurement set from file.

Updates configuration using observation info.

Parameters:

pipeline_config – Configuration for pipeline.

Returns:

Manager for visibility data. pipeline_config: Configuration for pipeline.

Return type:

processing_set_manager

ska_sdp_distributed_self_cal_prototype.workflow.tasks.predict_residual_visibilities(facets: list[numpy.ndarray], visibility_bins_list: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin], binning_info: dict, swiftly_manager: Swiftly, gridding_manager: Gridder, normalisation_factor: float) None[source]

Degrid (predict) visibilities.

Parameters:
  • processing_set_manager (ProcessingSetManager) – Processing manager for original data set.

  • swiftly_manager (Swiftly) – SwiFTly manager object.

  • gridding_manager (Gridder) – Gridder manager for the pipeline.

  • facets (np.ndarray) – list of image facets to degrid

Returns:

list of arrays containing degridded visibilities

Return type:

visibilities (list[np.ndarray])

ska_sdp_distributed_self_cal_prototype.workflow.tasks.restore_model(model_facets: list[numpy.ndarray], residual_facets: list[numpy.ndarray], clean_beam_parameters: dict[str, float], dask_client: dask.distributed.Client) list[numpy.ndarray][source]

Convolves model facets with a clean beam then adds the corresponding residual facets.

Parameters:
  • model_facets – A list of 2D arrays representing the model facets.

  • residual_facets – A list of 2D arrays representing the residual facets.

  • clean_beam_parameters – A dictionary containing the clean beam parameters: - “sigma_x”: Standard deviation of the clean beam along the x-axis. - “sigma_y”: Standard deviation of the clean beam along the y-axis. - “position_angle”: Position angle of the clean beam in degrees.

  • dask_client – The dask client used to distribute the work.

Returns:

A list of 2D numpy arrays representing the cleaned facets (convolved model + residual).

Return type:

cleaned_facets

ska_sdp_distributed_self_cal_prototype.workflow.tasks.save_image(pipeline_config: PipelineConfig, image_data: numpy.ndarray, image_name: str)[source]

Constructs an image from full (non facted) imaged ata and exports to file. If you are working with facets then first join them using join_facets.

Parameters:
  • pipeline_config – Configuration for pipeline.

  • image_data – Pixel data for the image.

  • image_name – Name (prefix) for the output image; .fits, .png extensions will be added automatically.

Returns:

None

ska_sdp_distributed_self_cal_prototype.workflow.tasks.setup_pipeline(pipeline_config: PipelineConfig) tuple[ska_sdp_distributed_self_cal_prototype.workflow.pipeline_config.PipelineConfig, ska_sdp_distributed_self_cal_prototype.data_managers.visibility_io.ProcessingSetManager, ska_sdp_distributed_self_cal_prototype.data_managers.swiftly.Swiftly, ska_sdp_distributed_self_cal_prototype.processing_tasks.gridding.Gridder][source]

Setup required managers for all pipelines.

Parameters:

pipeline_config – Initial configuration for pipeline.

Returns:

Updated configuration for pipeline. processing_set_manager: Manager for visibility data. swiftly_manager: Swiftly manager for the pipeline. gridding_manager: Gridding manager for the pipeline.

Return type:

pipeline_config

ska_sdp_distributed_self_cal_prototype.workflow.tasks.split_image(image_data: numpy.ndarray, facet_count: int) list[numpy.ndarray][source]

Split an image into a specified number of square facets.

Parameters:
  • image_data – A 2D square array representing the image to be split.

  • facet_count – The number of square facets to split the image into.

Returns:

A list of 2D arrays, each representing a facet of the original image.

Return type:

facets

ska_sdp_distributed_self_cal_prototype.workflow.tasks.validate_visibility_bins(visibility_bins: list[ska_sdp_distributed_self_cal_prototype.data_managers.visibility_bin.VisibilityBin]) bool[source]

Task to validate visibility data.

Checks for negative values and NaNs.

Parameters:

data – array of data to validate

Returns:

True if all validation checks pass, otherwise False

Return type:

data_ok