Task Tracker

class ska_csp_lmc_common.commands.task_tracker.TaskTracker(task_name: str, total_subtasks: int, completed_callback: Callable | None = None, logger: Logger | None = None, operation: str | None = None, obs_mode: str | None = None, is_root: bool | None = False)

Bases: object

Track subtask state updates and compute an aggregated task outcome.

TaskTracker stores the latest SubtaskResult for each subtask name, updates progress and completion counters, and derives the overall task status/result by delegating to TaskAggregationPolicy.

The tracker is thread-safe: updates are protected by an internal lock and completion is signaled through an event.

Completion reporting:

  • When all subtasks reach a final state (or completed_subtasks meets total_subtasks), the tracker calls completed_callback once.

  • The completion callback payload may include:

    • devices: all devices involved in the command (if known)

    • failed_devices: devices that failed (if known)

    • health_state: optional, e.g. “DEGRADED” or “FAILED”

    • error: optional Tango errors collected during updates

Context propagation:

  • Optional operation and obs_mode values can be provided or later updated; when changed, the tracker rebuilds the policy instance.

__init__(task_name: str, total_subtasks: int, completed_callback: Callable | None = None, logger: Logger | None = None, operation: str | None = None, obs_mode: str | None = None, is_root: bool | None = False)

Create a new tracker for a logical task spanning multiple subtasks.

Parameters:
  • task_name – Logical name of the tracked task.

  • total_subtasks – Expected number of subtasks contributing to the overall completion.

  • completed_callback – Optional callable invoked on completion (and optionally on progress updates if enabled).

  • logger – Optional logger; defaults to module logger.

  • operation – Optional operation context for aggregation policy.

  • obs_mode – Optional observation mode context for policy.

  • is_root – whether the task is the root task. If this case the policy messages include a top-level header.

Notes

The policy instance is created immediately from the provided context and rebuilt if the context changes later.

set_context(operation: str | None = None, obs_mode: str | None = None) None

Update policy context (operation/obs_mode) when it becomes known.

If either value changes, rebuild the internal aggregation policy (TaskAggregationPolicy) so subsequent aggregations apply the correct contextual rules.

Parameters:
  • operation – New operation context. If None, do not change the current value.

  • obs_mode – New observation mode context. If None, do not change the current value.

wait_for_task_completion(timeout_event=None) bool

Block until the task completion event is set.

If timeout_event is provided and becomes set while waiting, return False. Otherwise return True once completion has been signaled.

Parameters:

timeout_event – Optional threading.Event used as an external cancellation/timeout signal. If set while waiting, this method returns False.

Returns:

True if the task completed (internal event set). False if timeout_event was provided and became set first.

signal_task_completion() None

Set the internal completion event to release waiting threads.

update_task(*, task_name: str, status: TaskStatus | None = None, progress: int | None = None, result: Any | None = None, error: tuple[DevError] | None = None, **kwargs: Any) None

Ingest a subtask update and recompute the aggregated outcome.

This method:

  • updates (or creates) a SubtaskResult entry for task_name

  • updates tracker context (operation/obs_mode) if provided

  • aggregates overall status/result/message using the task aggregation policy.

  • recomputes completion counters and progress

  • triggers completion callback once when all subtasks are done

  • signals the completion event after completion propagation

Device fields in kwargs (naming is intentional):

  • devices: list of devices involved/used by the command/subtask

  • device_name: single device identifier (fallback)

  • failed_devices: list of devices that failed for the subtask (used only for failure reporting; not the same as devices)

Safety:

  • The function is designed to never raise: exceptions are caught and converted into a FAILED overall outcome, and the completion event is always signaled to avoid deadlocks.

Parameters:
  • task_name – the name of the subtask notifying a progress

  • status – the subtask’s status

  • progress – the subtask’s progress percentage

  • result – the subtask’s result

  • error – Optional Tango DevError tuple to store on the tracker.

  • **kwargs

    Additional fields propagated to SubtaskResult. Expected keys (when available):

    • device_name: Single device identifier for the subtask.

    • devices: Iterable of devices involved/used by the subtask.

    • failed_devices: Iterable of devices that failed for the subtask (subset of involved devices).

    • operation: Operation context (may update policy context).

    • obs_mode: Observation mode context (may update policy).

    • source: String for logging/debug (origin of update).

Safety: never raises; on internal error marks FAILED and sets event.

get_status() ska_control_model.TaskStatus

Return the current aggregated task status.

get_result() ska_control_model.ResultCode

Return the current aggregated task result code.

_extract_context(kwargs: Dict[str, Any]) _UpdateContext

Extract a normalized update context from keyword arguments.

The returned context groups device and policy-related fields that may appear in update_task(**kwargs).

_upsert_subtask(task_name: str, status: TaskStatus | None, result: Any | None, ctx: _UpdateContext) None

Insert a new SubtaskResult or update the existing one.

Parameters:
  • task_name – Subtask name, used as key in self.subtasks.

  • status – Latest status for the subtask.

  • result – Latest result payload for the subtask.

  • ctx – Normalized context extracted from update kwargs.

_aggregate() None

Compute the aggregated outcome from current subtask results.

The aggregation is delegated to TaskAggregationPolicy and updates the public fields: overall_status, overall_result, overall_resultcode, and overall_health.

_recompute_completion() None

Recompute the number of subtasks that are in a final state.

_record_subtask_progress(task_name: str, progress: int | None) None

Record sanitized per-subtask progress in the 0..100 range.

Parameters:
  • task_name – Subtask key used in self._subtask_progress.

  • progress – Progress percentage for the subtask. If None, the value is ignored.

_recompute_progress() None

Compute task progress as the mean of per-subtask progress.

Rules: - Completed subtasks are treated as 100 - If we have explicit progress for a subtask, use it (capped 0..100) - If unknown progress and not completed, treat as 0

_is_done_locked() bool

Return True if completion criteria is met (lock must be held).

Notes

A task with total_subtasks <= 0 is treated as already complete.

_build_progress_payload() Dict[str, Any] | None

Build a throttled progress callback payload.

Quantizes self.progress to PROGRESS_STEP.

Returns:

the payload only when the quantized value changes since the last notification.

_build_completion_payload(ctx: _UpdateContext) Dict[str, Any]

Build the completion callback payload (emitted exactly once).

Includes the aggregated result/message and, when available, devices, failed devices, health state, and Tango errors.

Returns:

The payload with the context of the completed task.

_chain_message() str

Return the aggregated message for the task.

The message is produced by TaskAggregationPolicy and stored in overall_resultcode[1].

_is_task_already_completed(task_name: str, device_name: str | None = None) bool

Return True if a subtask is already in a final state.

Used to ignore stale or late updates for subtasks that have already reached COMPLETED/FAILED/ABORTED/REJECTED.

Parameters:
  • task_name – Subtask identifier used as key in self.subtasks.

  • device_name – Optional device identifier used only for logging.

Returns:

True if the task has received all notifications from its subtasks; otherwise, False.

_collect_devices_for_completion() tuple[list[str], list[str]]

Build deterministic device lists for completion propagation.

Returns (devices, failed_devices) where:

  • devices: all devices involved in the task (union of each subtask’s devices list, falling back to device_name)

  • failed_devices: devices that failed (prefer sub.failed_devices, otherwise fall back to the subtask’s involved devices)

Notes:

  • Uses sets and returns sorted lists for deterministic ordering, which is useful for tests and stable upstream behavior.