ska_sdp_instrumental_calibration.scheduler module

class ska_sdp_instrumental_calibration.scheduler.UpstreamOutput[source]

Bases: object

Container for managing outputs and metadata between pipeline stages.

This class acts as a shared context object, allowing downstream stages to access the results of upstream stages via dictionary-style or attribute- style access. It also tracks computational tasks, checkpoint keys, and execution counts for each stage.

stage_compute_tasks

A list of delayed compute tasks (e.g., Dask graphs) accumulated during the pipeline execution.

Type:

list

checkpoint_keys

A list of keys identifying data that should be checkpointed or persisted.

Type:

list

compute_outputs

A list to store results of computations.

Type:

list

get_call_count(stage_name)[source]

Get the number of times a specific stage has been executed.

Parameters:

stage_name (str) -- The name of the stage.

Returns:

The execution count (default is 0).

Return type:

int

increment_call_count(stage_name)[source]

Increment the execution counter for a specific stage.

Parameters:

stage_name (str) -- The name of the stage to increment.

property compute_tasks

Get the list of accumulated compute tasks.

Type:

list

add_compute_tasks(*args)[source]

Register new compute tasks to the pipeline.

Parameters:

*args -- One or more task objects (e.g., Dask delayed objects) to add to the execution queue.

add_checkpoint_key(*args)[source]

Register keys that should be checkpointed.

Parameters:

*args -- One or more string keys identifying outputs that require checkpointing or persistence.

class ska_sdp_instrumental_calibration.scheduler.InstrumentalDaskRunner(*, dask_scheduler=None, with_report=False, **kwargs)[source]

Bases: DaskRunner

DaskRunner implementation for Instrumental Calibration

execute()[source]

Execute the provided list of pipeline stages.

Iterates through the stages, executing each one sequentially. For each stage, it:

  1. Logs the start of the stage.

  2. Invokes the stage callable with the current upstream outputs.

  3. Persists any data flagged for checkpointing and any accumulated compute tasks using dask.persist.

  4. Updates the output container with the persisted results.

  5. Waits for completion if a Dask client is present.

  6. Logs the completion of the stage.

Parameters:

stages (list of Stage) -- A list of stage objects to be executed.