Task Tracker
- class ska_csp_lmc_common.commands.task_tracker.TaskTracker(task_name: str, total_subtasks: int, completed_callback: Callable | None = None, logger: Logger | None = None, operation: str | None = None, obs_mode: str | None = None, is_root: bool | None = False)
Bases:
objectTrack subtask state updates and compute an aggregated task outcome.
TaskTracker stores the latest SubtaskResult for each subtask name, updates progress and completion counters, and derives the overall task status/result by delegating to TaskAggregationPolicy.
The tracker is thread-safe: updates are protected by an internal lock and completion is signaled through an event.
Completion reporting:
When all subtasks reach a final state (or completed_subtasks meets total_subtasks), the tracker calls completed_callback once.
The completion callback payload may include:
devices: all devices involved in the command (if known)
failed_devices: devices that failed (if known)
health_state: optional, e.g. “DEGRADED” or “FAILED”
error: optional Tango errors collected during updates
Context propagation:
Optional operation and obs_mode values can be provided or later updated; when changed, the tracker rebuilds the policy instance.
- __init__(task_name: str, total_subtasks: int, completed_callback: Callable | None = None, logger: Logger | None = None, operation: str | None = None, obs_mode: str | None = None, is_root: bool | None = False)
Create a new tracker for a logical task spanning multiple subtasks.
- Parameters:
task_name – Logical name of the tracked task.
total_subtasks – Expected number of subtasks contributing to the overall completion.
completed_callback – Optional callable invoked on completion (and optionally on progress updates if enabled).
logger – Optional logger; defaults to module logger.
operation – Optional operation context for aggregation policy.
obs_mode – Optional observation mode context for policy.
is_root – whether the task is the root task. If this case the policy messages include a top-level header.
Notes
The policy instance is created immediately from the provided context and rebuilt if the context changes later.
- set_context(operation: str | None = None, obs_mode: str | None = None) None
Update policy context (operation/obs_mode) when it becomes known.
If either value changes, rebuild the internal aggregation policy (TaskAggregationPolicy) so subsequent aggregations apply the correct contextual rules.
- Parameters:
operation – New operation context. If None, do not change the current value.
obs_mode – New observation mode context. If None, do not change the current value.
- wait_for_task_completion(timeout_event=None) bool
Block until the task completion event is set.
If timeout_event is provided and becomes set while waiting, return False. Otherwise return True once completion has been signaled.
- Parameters:
timeout_event – Optional threading.Event used as an external cancellation/timeout signal. If set while waiting, this method returns False.
- Returns:
True if the task completed (internal event set). False if timeout_event was provided and became set first.
- signal_task_completion() None
Set the internal completion event to release waiting threads.
- update_task(*, task_name: str, status: TaskStatus | None = None, progress: int | None = None, result: Any | None = None, error: tuple[DevError] | None = None, **kwargs: Any) None
Ingest a subtask update and recompute the aggregated outcome.
This method:
updates (or creates) a SubtaskResult entry for task_name
updates tracker context (operation/obs_mode) if provided
aggregates overall status/result/message using the task aggregation policy.
recomputes completion counters and progress
triggers completion callback once when all subtasks are done
signals the completion event after completion propagation
Device fields in kwargs (naming is intentional):
devices: list of devices involved/used by the command/subtask
device_name: single device identifier (fallback)
failed_devices: list of devices that failed for the subtask (used only for failure reporting; not the same as devices)
Safety:
The function is designed to never raise: exceptions are caught and converted into a FAILED overall outcome, and the completion event is always signaled to avoid deadlocks.
- Parameters:
task_name – the name of the subtask notifying a progress
status – the subtask’s status
progress – the subtask’s progress percentage
result – the subtask’s result
error – Optional Tango DevError tuple to store on the tracker.
**kwargs –
Additional fields propagated to SubtaskResult. Expected keys (when available):
device_name: Single device identifier for the subtask.
devices: Iterable of devices involved/used by the subtask.
failed_devices: Iterable of devices that failed for the subtask (subset of involved devices).
operation: Operation context (may update policy context).
obs_mode: Observation mode context (may update policy).
source: String for logging/debug (origin of update).
Safety: never raises; on internal error marks FAILED and sets event.
- get_status() ska_control_model.TaskStatus
Return the current aggregated task status.
- get_result() ska_control_model.ResultCode
Return the current aggregated task result code.
- _extract_context(kwargs: Dict[str, Any]) _UpdateContext
Extract a normalized update context from keyword arguments.
The returned context groups device and policy-related fields that may appear in update_task(**kwargs).
- _upsert_subtask(task_name: str, status: TaskStatus | None, result: Any | None, ctx: _UpdateContext) None
Insert a new SubtaskResult or update the existing one.
- Parameters:
task_name – Subtask name, used as key in self.subtasks.
status – Latest status for the subtask.
result – Latest result payload for the subtask.
ctx – Normalized context extracted from update kwargs.
- _aggregate() None
Compute the aggregated outcome from current subtask results.
The aggregation is delegated to TaskAggregationPolicy and updates the public fields: overall_status, overall_result, overall_resultcode, and overall_health.
- _recompute_completion() None
Recompute the number of subtasks that are in a final state.
- _record_subtask_progress(task_name: str, progress: int | None) None
Record sanitized per-subtask progress in the 0..100 range.
- Parameters:
task_name – Subtask key used in self._subtask_progress.
progress – Progress percentage for the subtask. If None, the value is ignored.
- _recompute_progress() None
Compute task progress as the mean of per-subtask progress.
Rules: - Completed subtasks are treated as 100 - If we have explicit progress for a subtask, use it (capped 0..100) - If unknown progress and not completed, treat as 0
- _is_done_locked() bool
Return True if completion criteria is met (lock must be held).
Notes
A task with total_subtasks <= 0 is treated as already complete.
- _build_progress_payload() Dict[str, Any] | None
Build a throttled progress callback payload.
Quantizes self.progress to PROGRESS_STEP.
- Returns:
the payload only when the quantized value changes since the last notification.
- _build_completion_payload(ctx: _UpdateContext) Dict[str, Any]
Build the completion callback payload (emitted exactly once).
Includes the aggregated result/message and, when available, devices, failed devices, health state, and Tango errors.
- Returns:
The payload with the context of the completed task.
- _chain_message() str
Return the aggregated message for the task.
The message is produced by TaskAggregationPolicy and stored in overall_resultcode[1].
- _is_task_already_completed(task_name: str, device_name: str | None = None) bool
Return True if a subtask is already in a final state.
Used to ignore stale or late updates for subtasks that have already reached COMPLETED/FAILED/ABORTED/REJECTED.
- Parameters:
task_name – Subtask identifier used as key in self.subtasks.
device_name – Optional device identifier used only for logging.
- Returns:
True if the task has received all notifications from its subtasks; otherwise, False.
- _collect_devices_for_completion() tuple[list[str], list[str]]
Build deterministic device lists for completion propagation.
Returns (devices, failed_devices) where:
devices: all devices involved in the task (union of each subtask’s devices list, falling back to device_name)
failed_devices: devices that failed (prefer sub.failed_devices, otherwise fall back to the subtask’s involved devices)
Notes:
Uses sets and returns sorted lists for deterministic ordering, which is useful for tests and stable upstream behavior.