Task Aggregation Policy

class ska_csp_lmc_common.commands.aggregation.policy.TaskAggregationPolicy(task_name: str, operation: str | None = None, obs_mode: str | None = None, is_root: bool = False)

Bases: object

Aggregate subtask results into a single task outcome.

The policy inspects a list of SubtaskResult objects and returns an AggregationOutcome with:

  • a final TaskStatus and ResultCode

  • a message summarizing failures and normalized “Causes”

  • optional health state (e.g., FAILED / DEGRADED)

Device failures are grouped by kind (CBF/PST/PSS/OTHER). Failure causes are extracted from subtask messages, flattened from bullet or multi-line formats, and de-duplicated while preserving order.

__init__(task_name: str, operation: str | None = None, obs_mode: str | None = None, is_root: bool = False)
static _kind(dev: str) str

Classify a device name into one of: CBF, PST, PSS, OTHER.

The classification is based on substring matching on the lower-cased device name.

Parameters:

dev – the symbolic device name.

Returns:

One of "CBF", "PST", "PSS", "OTHER".

static _devices(sub: SubtaskResult) List[str]

Return the devices relevant for failure reporting.

If sub.failed_devices is present, return it as a list. Otherwise, fall back to sub.device_name and return it as a single-item list when available.

Parameters:

sub – The subtask result.

Returns:

List of device names (may be empty).

_is_pst_group_task() bool

Determine whether this task is a PST group task based on naming conventions.

For example: - “configure_pst” - “configureScan_pst” - “allocate_resources_pst”

PST group tasks are aggregated with partial-success semantics: if at least one PST subtask succeeds, the overall task may still be marked COMPLETED but with a non-OK result and degraded health.

Returns:

True if the name matches a PST-group convention.

_find_cbf_subtask(subs: List[SubtaskResult]) SubtaskResult | None

Return the first subtask associated with a CBF device, if any.

CBF association is detected by inspecting devices reported by the subtask (failed_devices / device_name) and applying the subsystem classifier.

Parameters:

subs – List of subtask results.

Returns:

The first CBF-associated subtask, or None if not found.

static _already_prefixed(line: str, name: str) bool

Return True if a cause line already includes the subtask name.

The check is tolerant to common formats produced during aggregation, such as: - name: message - name - message - [name - message] - - name - message

This helper prevents duplicated prefixes like name - name - message when normalizing failure causes.

Parameters:
  • line – A single normalized or raw cause line.

  • name – The subtask name used as prefix identifier.

Returns:

True if the line already starts with the given name, False otherwise.

static _is_final_problem(s: SubtaskResult) bool

Return True if the subtask represents a terminal problem state.

A terminal problem is detected when: - the subtask status is FAILED or REJECTED; or - the result code is FAILED (including cases such as COMPLETED with FAILED result code).

Parameters:

s – The subtask result.

Returns:

True if the subtask should be considered problematic.

static _strip_outer_causes_header(msg: str) str

Remove a leading ‘Causes:’ header from a message, if present.

Also remove a leading blank line after the header and trim outer newlines.

Parameters:

msg – Raw message text.

Returns:

Message text without an outer "Causes:" header.

static _flatten_bullets(msg: str) List[str]

Turn a multi-line message into a list of “cause lines”.

  • Lines starting with - are treated as bullet points and

    stripped of the bullet marker.

  • Other non-empty lines are kept as-is (trimmed).

  • Empty lines are removed.

Parameters:

msg – Message text (possibly multi-line).

Returns:

List of non-empty, trimmed cause lines.

_extract_causes_from_subtask(s: SubtaskResult) List[str]

Extract and normalize failure cause lines from a problematic subtask.

If the subtask message already looks structured (e.g. it contains a "Causes:" header, multiple lines, or bullet points), the message is flattened into individual cause lines and returned without adding extra structure.

Otherwise, a single cause line is produced by prefixing the message with the subtask name (to retain origin context).

Duplicate cause lines are removed while preserving order.

Parameters:

s – The subtask result.

Returns:

List of normalized cause lines (may be empty).

_gather_failed_devices_and_causes(subs: List[SubtaskResult]) tuple[Dict[str, List[str]], List[str]]

Build failure summaries from a list of subtasks.

Returns: - failed_by_kind: mapping CBF/PST/PSS/OTHER -> list of failed devices (deduplicated, order-preserving) - causes: flat list of normalized cause lines (deduplicated, order-preserving)

A subtask is considered problematic if _is_final_problem() returns True.

Parameters:

subs – List of subtask results.

Returns:

(failed_by_kind, causes).

aggregate(subs: List[SubtaskResult]) AggregationOutcome

Aggregate subtasks into a single AggregationOutcome.

Priority rules: - If any subtask is ABORTED, the outcome is ABORTED. - If any subtask is IN_PROGRESS, the outcome is IN_PROGRESS. - CBF subtask failures/rejections dominate the final result. - For PST-group tasks, apply quorum logic across PST beams. - Remaining failures become FAILED (severity) or DEGRADED. - If no failures, return COMPLETED+OK.

_message_degraded(failed_by_kind: Dict[str, List[str]], causes: List[str]) str

Build a degraded completion message.

_message_failed(title: str, causes: List[str]) str

Build a human-readable failure summary message.

Task Aggregation Outcome

class ska_csp_lmc_common.commands.aggregation.base.AggregationOutcome(status: ska_control_model.TaskStatus, result: ska_control_model.ResultCode, message: str, health_state: str | None = None)

Bases: object

Container for task outcome information.

Result of aggregating multiple SubtaskResult objects.

status

Final task status.

Type:

TaskStatus

result_code

Final task result code.

Type:

ResultCode

message

Human-readable summary propagated to the caller.

The message follows a structured but textual format:

  • A single-line summary (for root tasks), e.g.: “Task <task_name> failed” “Task <task_name> completed in DEGRADED state” “Task <task_name> aborted”

  • Optionally followed by a “Causes:” section: Causes: - <cause line 1> - <cause line 2> …

The message is intended for operators and logs, not for machine parsing. Its structure is stable but not part of the formal API.

Type:

str

status: ska_control_model.TaskStatus
result: ska_control_model.ResultCode
message: str
health_state: str | None = None
__init__(status: ska_control_model.TaskStatus, result: ska_control_model.ResultCode, message: str, health_state: str | None = None) None