Health State Supervision

Health Supervisor

class ska_csp_lmc_common.supervision.health.health_supervisor.CspHealthSupervisor(*, model: HealthStateModel, update_attribute: Callable[[str, Any], None], policy: HealthEvaluationPolicy | None, logger: Logger, debounce_s: float = 0.4, max_latency_s: float = 0.7)

Bases: ObservationSupervisor

CSP-specific health supervision implementation.

This supervisor collects subsystem events, updates a snapshot store and periodically evaluates the aggregated health outputs with debouncing to limit update frequency.

It publishes: - aggregated HealthState through the provided model - aggregated healthInfo through HealthInfoManager

property health_state_info: dict[str, list[str]]

Return the current aggregated healthInfo snapshot.

Returns:

Dictionary mapping device FQDNs to a list of diagnostic message strings.

update_subsystems_healthinfo(subsystem_fqdn: str, health_info_value: Any) None

Queue forwarded subsystem healthInfo for debounced merge.

The latest payload for each subsystem source is buffered and then merged during the debounced evaluation cycle. This reduces event bursts when multiple subsystem healthInfo updates arrive in quick succession.

Parameters:
  • subsystem_fqdn – Fully qualified name of the subsystem device providing the update.

  • health_info_value – Raw healthInfo payload received from the subsystem. May be a JSON string, dict or None.

on_subsystem_event(component: ObservingComponent, *, health: ska_control_model.HealthState | None = None, state: tango.DevState | None = None, admin_mode: ska_control_model.AdminMode | None = None, immediate: bool = False) None

Ingest a subsystem update and schedule a supervised evaluation.

The component snapshot store is updated with the provided values. Any invalid or incomplete updates are ignored. An evaluation is then scheduled via the debounced supervision loop.

Parameters:
  • component – Observing component emitting the update.

  • health – Optional HealthState update for the component.

  • state – Optional device DevState update for the component.

  • admin_mode – Optional AdminMode update for the component.

  • immediate – If true, request an immediate evaluation instead of waiting for the debounce window.

clear_consistency_fault() None

No-op for now (kept for interface symmetry).

_evaluate_and_publish(*, force: bool = False, reconcile_timeout: bool = False) EvaluationOutcome

Evaluate and publish health outputs from the latest snapshot.

The evaluation is skipped if the snapshot revision has not changed since the last run, unless force is true. The method:

  1. Evaluates aggregated HealthState using the model.

  2. Produces diagnostics messages (healthInfo).

  3. Publishes HealthState via the model.

  4. Publishes healthInfo via HealthInfoManager.

Parameters:

force – If true, evaluate and publish even when the snapshot revision is unchanged.

_drain_pending_subsystem_healthinfo() dict[str, Any]

Drain buffered subsystem healthInfo updates.

Returns:

Latest pending source->payload updates.

_must_evaluate_snapshot(*, force: bool) bool

Decide whether snapshot-based health evaluation is required.

Parameters:

force – Bypass revision gating.

Returns:

True when evaluation should proceed.

_get_diagnostics_context() dict[str, Any]

Return diagnostics context from the model, if available.

Returns:

Context dictionary consumed by HealthDiagnostics.

Health Diagnostics

class ska_csp_lmc_common.supervision.health.health_diagnostics.HealthDiagnostics

Bases: object

Generate healthInfo messages from a snapshot and a final health state.

This component does not compute the aggregated health state. It only produces explanatory messages consistent with the already computed final_health.

_NO_CBF_MESSAGE = 'No CBF component device detected!'
evaluate(snapshot: Dict[str, ComponentSnapshot], final_health: ska_control_model.HealthState, *, context: Dict[str, Any] | None = None) Dict[str, List[str]]

Build healthInfo messages coherent with the final HealthState.

The returned payload is keyed by the device FQDN from context and contains a list of human-readable diagnostic messages.

Behaviour overview: if final_health is OK, return {device_fqdn: []} when a target is available. If context forces a message (faulty or disabled), prepend it to the snapshot diagnostics. If final_health is FAILED and no critical component exists in snapshot, report a missing-critical-component reason. Otherwise, collect messages from the snapshot and return them under the target device FQDN.

Parameters:
  • snapshot – Mapping fqdn -> HealthSample for subsystem health data used to derive explanatory messages.

  • final_health – Aggregated health state already computed by the health model.

  • context – Optional diagnostics context provided by the health model. Expected keys are device_fqdn (str), faulty (bool), fault_reason (str), disabled (bool) and disabled_reason (str).

Returns:

Mapping device_fqdn -> [messages...]. If no target device is available in context, returns an empty mapping.

_normalise_context(context: Dict[str, Any] | None) Dict[str, Any]

Normalise optional context to a dict with the expected keys.

Missing keys are replaced with sensible defaults.

Parameters:

context – Optional mapping coming from the health model.

Returns:

Normalised context with keys: device_fqdn, faulty, disabled, fault_reason, disabled_reason.

_forced_message(context: Dict[str, Any]) Dict[str, List[str]] | None

Return a forced healthInfo payload when context flags require it.

If the model marked the device as faulty/disabled and provided a corresponding reason, that reason is returned as the only message.

Parameters:

context – Normalised context dictionary produced by _normalise_context.

Returns:

A {device_fqdn: [reason]} mapping if forced, otherwise None.

_collect_messages(snapshot: Dict[str, ComponentSnapshot], *, target_fqdn: str = '') List[str]

Collect unique diagnostic messages from snapshot samples.

Messages are de-duplicated while preserving first-seen order. Components are traversed in deterministic order: target FQDN first (if present), then remaining FQDNs in alphabetical order.

Parameters:
  • snapshot – Mapping fqdn -> HealthSample.

  • target_fqdn – Optional target device FQDN that, when present in snapshot, is processed before other entries.

Returns:

List of unique diagnostic messages.

_should_skip_component(fqdn: str, sample: ComponentSnapshot) bool

Decide whether a sample must be ignored by diagnostics.

Current rules: ignore any component whose FQDN includes manager; ignore PST devices unless a subarray context is present (that is, sample.subarray_id is truthy).

Parameters:
  • fqdn – Fully qualified device name for the sample.

  • sample – Health sample associated with the device.

Returns:

True if the component should be ignored.

_component_messages(fqdn: str, sample: ComponentSnapshot) List[str]

Build diagnostic messages for a single component sample.

Weighted (critical) components always produce diagnostics when in problematic state/health. Non-weighted components only contribute diagnostics when they are in ONLINE/ENGINEERING admin mode.

Parameters:
  • fqdn – Fully qualified device name for the sample.

  • sample – Health sample containing state/admin/health metadata.

Returns:

List of messages for this component (possibly empty).

_critical_component_messages(fqdn: str, state: tango.DevState, health: ska_control_model.HealthState | None) List[str]

Build diagnostics for weighted/critical components.

For critical devices, report problematic states including DISABLE, and any non-OK health.

Parameters:
  • fqdn – Fully qualified device name.

  • state – Current device state.

  • health – Parsed health state (may be None if unavailable).

Returns:

List of diagnostic messages (possibly empty).

static _append_unique(messages: List[str], message: str) None

Append a non-empty message if not already present.

Parameters:
  • messages – Target list, modified in-place.

  • message – Candidate message to append.

Returns:

None.

static _has_cbf(snapshot: Dict[str, ComponentSnapshot]) bool

Return True if at least one critical/weighted component is present.

A “critical” component is identified by a sample with weight > 0.

Parameters:

snapshot – Mapping fqdn -> HealthSample representing the latest health snapshot for all known subsystem devices.

Returns:

True if at least one sample has weight > 0.

Health Info Manager

class ska_csp_lmc_common.supervision.health.health_info_manager.HealthInfoManager(*, device_fqdn: str, callback: Callable[[str, Any], None], logger: Logger | None = None)

Bases: object

Manage aggregation and publication of the healthInfo attribute.

The manager maintains an aggregated view composed of:

  • local healthInfo produced by the owning device

  • forwarded healthInfo received from subsystem devices

Subsystem contributions are merged per FQDN, deduplicated while preserving message order, and cleaned up when no longer reported.

The owning device FQDN entry is guaranteed to appear first in the published dictionary.

Updates are emitted via the provided callback only when the aggregated content changes.

_snapshot_locked() dict[str, list[str]]

Build a snapshot of the full aggregated healthInfo (local + subsystem).

The returned dictionary preserves insertion order and guarantees that the entry corresponding to this device FQDN appears first, if present. All other entries follow in their current order.

Returns:

A copy of the current healthInfo dictionary with the owning device entry placed first.

_store_messages_locked(fqdn: str, messages: list[str]) None

Store messages for a given fqdn under lock.

Parameters:
  • fqdn – Fully qualified device name whose messages are updated.

  • messages – List of messages to associate with the device. If empty, the entry is removed.

snapshot() dict[str, list[str]]

Return a thread-safe snapshot of current healthInfo.

Returns:

A copy of the current healthInfo state.

_emit_health_info_if_changed(prev: dict[str, Any], new: dict[str, Any]) None

Emit callback if healthInfo changed.

Parameters:
  • prev – Previous healthInfo snapshot.

  • new – New healthInfo snapshot.

update_health_info(new_info: dict[str, list[str]] | None = None, *, subsystem_updates_by_source: dict[str, Any] | None = None) None

Update aggregated healthInfo with local and/or subsystem data.

This keeps the historical API name while supporting coalesced updates (single change detection + single emit) when both local diagnostics and forwarded subsystem payloads are updated in the same cycle.

Parameters:
  • new_info – Optional local diagnostics payload. Only this manager’s device FQDN entry is applied.

  • subsystem_updates_by_source – Optional mapping from source subsystem FQDN to raw healthInfo payload. Example: {"sim-low-pst/beam/01": {"sim-low-pst/beam/01": ["..."]}}.

_normalize_subsystem_healthinfo(subsystem_fqdn: str, value: Any) dict[str, list[str]]

Normalize subsystem healthInfo into a canonical structure.

Summary of normalization steps: - accept JSON string payloads; ignore blank strings; - decode JSON strings and reject invalid JSON; - reject unsupported top-level payload types (must be dict); - coerce keys/messages to strings and drop empty values; - return only entries that contain at least one message.

Parameters:
  • subsystem_fqdn – Fully qualified name of the subsystem device.

  • value – Raw healthInfo value received from the subsystem. It may be a JSON string, dict, or other type.

Returns:

A dictionary mapping fqdn to a list of non-empty message strings. Invalid or unsupported payloads return an empty dictionary.

update_subsystems_healthinfo(subsystem_fqdn: str, health_info_value: Any) None

Process a subsystem healthInfo change event.

This method is typically invoked by the event subsystem when a subsystem device updates its healthInfo attribute.

For each subsystem source: - The latest normalized payload is stored. - A full merge across all subsystem sources is recomputed. - Messages are deduplicated per FQDN preserving order. - Keys no longer reported by any subsystem are removed.

The aggregated healthInfo is published only if its content differs from the previous snapshot. Internally this is a single-source convenience wrapper around update_health_info(subsystem_updates_by_source=...).

Parameters:
  • subsystem_fqdn – Fully qualified name of the subsystem device providing the update.

  • health_info_value – Raw healthInfo payload received from the subsystem. May be a JSON string, dict, None or another type (unsupported payloads are ignored).

_apply_subsystem_updates_locked(normalized_by_source: dict[str, dict[str, list[str]]]) None

Merge normalized subsystem-only payloads into the aggregated snapshot.

Caller must hold self._lock.

Parameters:

normalized_by_source – Mapping from subsystem source FQDN to normalized forwarded healthInfo payloads. This input contains subsystem data only (no local device entry).

Component Snapshot store

class ska_csp_lmc_common.supervision.component_snapshot_store.ComponentSnapshotStore

Bases: object

Thread-safe store of component snapshots with revision tracking.

Revisions increase only when the effective snapshot value changes.

set(component: ObservingComponent, *, state: tango.DevState | None = None, health: ska_control_model.HealthState | None = None, admin_mode: ska_control_model.AdminMode | None = None, obs_state: ska_control_model.ObsState | None = None) int

Upsert a component snapshot with partial field updates.

Non-None arguments overwrite previous values, while None keeps the previously stored value for that field.

Parameters:
  • component – Component object used as key and metadata source.

  • state – Updated operational state value.

  • health – Updated health state value.

  • admin_mode – Updated admin mode value.

  • obs_state – Updated observation state value.

Returns:

Current store revision after the update.

snapshot() Dict[str, ComponentSnapshot]

Return a point-in-time map of fqdn to snapshot values.

Returns:

Shallow copy of the current snapshots.

snapshot_samples() Dict[str, Sample[ComponentSnapshot]]

Return raw samples including metadata from the underlying state store.

Returns:

Mapping of fqdn to Sample[ComponentSnapshot].

revision() int

Return the current monotonically increasing store revision.

Returns:

Current store revision.

Health Store

class ska_csp_lmc_common.supervision.health.health_store.HealthStateStore

Bases: ComponentSnapshotStore

Health-specialized view of ComponentSnapshotStore.

This class provides a typed and semantically clearer interface for storing and updating component snapshots in the context of health supervision. It does not alter the underlying storage logic, but constrains and documents usage around health-related state updates.

set(component: ObservingComponent, *, health: ska_control_model.HealthState | None = None, state: tango.DevState | None = None, admin_mode: ska_control_model.AdminMode | None = None, obs_state: ska_control_model.ObsState | None = None) int

Update the stored snapshot for a component.

This method forwards the provided values to the underlying ComponentSnapshotStore implementation. Only parameters that are not None are applied to the stored snapshot.

Parameters:
  • component – The observing component whose snapshot is to be updated.

  • health – Optional HealthState to store for the component.

  • state – Optional DevState to store for the component.

  • admin_mode – Optional AdminMode to store for the component.

  • obs_state – Optional ObsState to store for the component.

Returns:

An integer representing the updated snapshot version, as returned by the base implementation.