Health State Supervision
Health Supervisor
- class ska_csp_lmc_common.supervision.health.health_supervisor.CspHealthSupervisor(*, model: HealthStateModel, update_attribute: Callable[[str, Any], None], policy: HealthEvaluationPolicy | None, logger: Logger, debounce_s: float = 0.4, max_latency_s: float = 0.7)
Bases:
ObservationSupervisorCSP-specific health supervision implementation.
This supervisor collects subsystem events, updates a snapshot store and periodically evaluates the aggregated health outputs with debouncing to limit update frequency.
It publishes: - aggregated
HealthStatethrough the provided model - aggregatedhealthInfothroughHealthInfoManager- property health_state_info: dict[str, list[str]]
Return the current aggregated
healthInfosnapshot.- Returns:
Dictionary mapping device FQDNs to a list of diagnostic message strings.
- update_subsystems_healthinfo(subsystem_fqdn: str, health_info_value: Any) None
Queue forwarded subsystem
healthInfofor debounced merge.The latest payload for each subsystem source is buffered and then merged during the debounced evaluation cycle. This reduces event bursts when multiple subsystem
healthInfoupdates arrive in quick succession.- Parameters:
subsystem_fqdn – Fully qualified name of the subsystem device providing the update.
health_info_value – Raw healthInfo payload received from the subsystem. May be a JSON string, dict or
None.
- on_subsystem_event(component: ObservingComponent, *, health: ska_control_model.HealthState | None = None, state: tango.DevState | None = None, admin_mode: ska_control_model.AdminMode | None = None, immediate: bool = False) None
Ingest a subsystem update and schedule a supervised evaluation.
The component snapshot store is updated with the provided values. Any invalid or incomplete updates are ignored. An evaluation is then scheduled via the debounced supervision loop.
- Parameters:
component – Observing component emitting the update.
health – Optional HealthState update for the component.
state – Optional device DevState update for the component.
admin_mode – Optional AdminMode update for the component.
immediate – If true, request an immediate evaluation instead of waiting for the debounce window.
- clear_consistency_fault() None
No-op for now (kept for interface symmetry).
- _evaluate_and_publish(*, force: bool = False, reconcile_timeout: bool = False) EvaluationOutcome
Evaluate and publish health outputs from the latest snapshot.
The evaluation is skipped if the snapshot revision has not changed since the last run, unless
forceis true. The method:Evaluates aggregated
HealthStateusing the model.Produces diagnostics messages (
healthInfo).Publishes
HealthStatevia the model.Publishes
healthInfoviaHealthInfoManager.
- Parameters:
force – If true, evaluate and publish even when the snapshot revision is unchanged.
- _drain_pending_subsystem_healthinfo() dict[str, Any]
Drain buffered subsystem
healthInfoupdates.- Returns:
Latest pending source->payload updates.
Health Diagnostics
- class ska_csp_lmc_common.supervision.health.health_diagnostics.HealthDiagnostics
Bases:
objectGenerate
healthInfomessages from a snapshot and a final health state.This component does not compute the aggregated health state. It only produces explanatory messages consistent with the already computed
final_health.- _NO_CBF_MESSAGE = 'No CBF component device detected!'
- evaluate(snapshot: Dict[str, ComponentSnapshot], final_health: ska_control_model.HealthState, *, context: Dict[str, Any] | None = None) Dict[str, List[str]]
Build
healthInfomessages coherent with the final HealthState.The returned payload is keyed by the device FQDN from
contextand contains a list of human-readable diagnostic messages.Behaviour overview: if
final_healthis OK, return{device_fqdn: []}when a target is available. If context forces a message (faulty or disabled), prepend it to the snapshot diagnostics. Iffinal_healthis FAILED and no critical component exists in snapshot, report a missing-critical-component reason. Otherwise, collect messages from the snapshot and return them under the target device FQDN.- Parameters:
snapshot – Mapping
fqdn -> HealthSamplefor subsystem health data used to derive explanatory messages.final_health – Aggregated health state already computed by the health model.
context – Optional diagnostics context provided by the health model. Expected keys are
device_fqdn(str),faulty(bool),fault_reason(str),disabled(bool) anddisabled_reason(str).
- Returns:
Mapping
device_fqdn -> [messages...]. If no target device is available in context, returns an empty mapping.
- _normalise_context(context: Dict[str, Any] | None) Dict[str, Any]
Normalise optional context to a dict with the expected keys.
Missing keys are replaced with sensible defaults.
- Parameters:
context – Optional mapping coming from the health model.
- Returns:
Normalised context with keys:
device_fqdn,faulty,disabled,fault_reason,disabled_reason.
- _forced_message(context: Dict[str, Any]) Dict[str, List[str]] | None
Return a forced
healthInfopayload when context flags require it.If the model marked the device as faulty/disabled and provided a corresponding reason, that reason is returned as the only message.
- Parameters:
context – Normalised context dictionary produced by
_normalise_context.- Returns:
A
{device_fqdn: [reason]}mapping if forced, otherwise None.
- _collect_messages(snapshot: Dict[str, ComponentSnapshot], *, target_fqdn: str = '') List[str]
Collect unique diagnostic messages from snapshot samples.
Messages are de-duplicated while preserving first-seen order. Components are traversed in deterministic order: target FQDN first (if present), then remaining FQDNs in alphabetical order.
- Parameters:
snapshot – Mapping
fqdn -> HealthSample.target_fqdn – Optional target device FQDN that, when present in snapshot, is processed before other entries.
- Returns:
List of unique diagnostic messages.
- _should_skip_component(fqdn: str, sample: ComponentSnapshot) bool
Decide whether a sample must be ignored by diagnostics.
Current rules: ignore any component whose FQDN includes
manager; ignore PST devices unless a subarray context is present (that is,sample.subarray_idis truthy).- Parameters:
fqdn – Fully qualified device name for the sample.
sample – Health sample associated with the device.
- Returns:
True if the component should be ignored.
- _component_messages(fqdn: str, sample: ComponentSnapshot) List[str]
Build diagnostic messages for a single component sample.
Weighted (critical) components always produce diagnostics when in problematic state/health. Non-weighted components only contribute diagnostics when they are in ONLINE/ENGINEERING admin mode.
- Parameters:
fqdn – Fully qualified device name for the sample.
sample – Health sample containing state/admin/health metadata.
- Returns:
List of messages for this component (possibly empty).
- _critical_component_messages(fqdn: str, state: tango.DevState, health: ska_control_model.HealthState | None) List[str]
Build diagnostics for weighted/critical components.
For critical devices, report problematic states including DISABLE, and any non-OK health.
- Parameters:
fqdn – Fully qualified device name.
state – Current device state.
health – Parsed health state (may be None if unavailable).
- Returns:
List of diagnostic messages (possibly empty).
- static _append_unique(messages: List[str], message: str) None
Append a non-empty message if not already present.
- Parameters:
messages – Target list, modified in-place.
message – Candidate message to append.
- Returns:
None.
- static _has_cbf(snapshot: Dict[str, ComponentSnapshot]) bool
Return True if at least one critical/weighted component is present.
A “critical” component is identified by a sample with
weight > 0.- Parameters:
snapshot – Mapping
fqdn -> HealthSamplerepresenting the latest health snapshot for all known subsystem devices.- Returns:
True if at least one sample has
weight > 0.
Health Info Manager
- class ska_csp_lmc_common.supervision.health.health_info_manager.HealthInfoManager(*, device_fqdn: str, callback: Callable[[str, Any], None], logger: Logger | None = None)
Bases:
objectManage aggregation and publication of the
healthInfoattribute.The manager maintains an aggregated view composed of:
local healthInfo produced by the owning device
forwarded healthInfo received from subsystem devices
Subsystem contributions are merged per FQDN, deduplicated while preserving message order, and cleaned up when no longer reported.
The owning device FQDN entry is guaranteed to appear first in the published dictionary.
Updates are emitted via the provided callback only when the aggregated content changes.
- _snapshot_locked() dict[str, list[str]]
Build a snapshot of the full aggregated healthInfo (local + subsystem).
The returned dictionary preserves insertion order and guarantees that the entry corresponding to this device FQDN appears first, if present. All other entries follow in their current order.
- Returns:
A copy of the current healthInfo dictionary with the owning device entry placed first.
- _store_messages_locked(fqdn: str, messages: list[str]) None
Store messages for a given fqdn under lock.
- Parameters:
fqdn – Fully qualified device name whose messages are updated.
messages – List of messages to associate with the device. If empty, the entry is removed.
- snapshot() dict[str, list[str]]
Return a thread-safe snapshot of current healthInfo.
- Returns:
A copy of the current healthInfo state.
- _emit_health_info_if_changed(prev: dict[str, Any], new: dict[str, Any]) None
Emit callback if healthInfo changed.
- Parameters:
prev – Previous healthInfo snapshot.
new – New healthInfo snapshot.
- update_health_info(new_info: dict[str, list[str]] | None = None, *, subsystem_updates_by_source: dict[str, Any] | None = None) None
Update aggregated
healthInfowith local and/or subsystem data.This keeps the historical API name while supporting coalesced updates (single change detection + single emit) when both local diagnostics and forwarded subsystem payloads are updated in the same cycle.
- Parameters:
new_info – Optional local diagnostics payload. Only this manager’s device FQDN entry is applied.
subsystem_updates_by_source – Optional mapping from source subsystem FQDN to raw
healthInfopayload. Example:{"sim-low-pst/beam/01": {"sim-low-pst/beam/01": ["..."]}}.
- _normalize_subsystem_healthinfo(subsystem_fqdn: str, value: Any) dict[str, list[str]]
Normalize subsystem healthInfo into a canonical structure.
Summary of normalization steps: - accept JSON string payloads; ignore blank strings; - decode JSON strings and reject invalid JSON; - reject unsupported top-level payload types (must be dict); - coerce keys/messages to strings and drop empty values; - return only entries that contain at least one message.
- Parameters:
subsystem_fqdn – Fully qualified name of the subsystem device.
value – Raw healthInfo value received from the subsystem. It may be a JSON string, dict, or other type.
- Returns:
A dictionary mapping fqdn to a list of non-empty message strings. Invalid or unsupported payloads return an empty dictionary.
- update_subsystems_healthinfo(subsystem_fqdn: str, health_info_value: Any) None
Process a subsystem
healthInfochange event.This method is typically invoked by the event subsystem when a subsystem device updates its
healthInfoattribute.For each subsystem source: - The latest normalized payload is stored. - A full merge across all subsystem sources is recomputed. - Messages are deduplicated per FQDN preserving order. - Keys no longer reported by any subsystem are removed.
The aggregated healthInfo is published only if its content differs from the previous snapshot. Internally this is a single-source convenience wrapper around
update_health_info(subsystem_updates_by_source=...).- Parameters:
subsystem_fqdn – Fully qualified name of the subsystem device providing the update.
health_info_value – Raw healthInfo payload received from the subsystem. May be a JSON string, dict,
Noneor another type (unsupported payloads are ignored).
- _apply_subsystem_updates_locked(normalized_by_source: dict[str, dict[str, list[str]]]) None
Merge normalized subsystem-only payloads into the aggregated snapshot.
Caller must hold
self._lock.- Parameters:
normalized_by_source – Mapping from subsystem source FQDN to normalized forwarded
healthInfopayloads. This input contains subsystem data only (no local device entry).
Component Snapshot store
- class ska_csp_lmc_common.supervision.component_snapshot_store.ComponentSnapshotStore
Bases:
objectThread-safe store of component snapshots with revision tracking.
Revisions increase only when the effective snapshot value changes.
- set(component: ObservingComponent, *, state: tango.DevState | None = None, health: ska_control_model.HealthState | None = None, admin_mode: ska_control_model.AdminMode | None = None, obs_state: ska_control_model.ObsState | None = None) int
Upsert a component snapshot with partial field updates.
Non-None arguments overwrite previous values, while None keeps the previously stored value for that field.
- Parameters:
component – Component object used as key and metadata source.
state – Updated operational state value.
health – Updated health state value.
admin_mode – Updated admin mode value.
obs_state – Updated observation state value.
- Returns:
Current store revision after the update.
- snapshot() Dict[str, ComponentSnapshot]
Return a point-in-time map of fqdn to snapshot values.
- Returns:
Shallow copy of the current snapshots.
- snapshot_samples() Dict[str, Sample[ComponentSnapshot]]
Return raw samples including metadata from the underlying state store.
- Returns:
Mapping of fqdn to Sample[ComponentSnapshot].
- revision() int
Return the current monotonically increasing store revision.
- Returns:
Current store revision.
Health Store
- class ska_csp_lmc_common.supervision.health.health_store.HealthStateStore
Bases:
ComponentSnapshotStoreHealth-specialized view of ComponentSnapshotStore.
This class provides a typed and semantically clearer interface for storing and updating component snapshots in the context of health supervision. It does not alter the underlying storage logic, but constrains and documents usage around health-related state updates.
- set(component: ObservingComponent, *, health: ska_control_model.HealthState | None = None, state: tango.DevState | None = None, admin_mode: ska_control_model.AdminMode | None = None, obs_state: ska_control_model.ObsState | None = None) int
Update the stored snapshot for a component.
This method forwards the provided values to the underlying ComponentSnapshotStore implementation. Only parameters that are not None are applied to the stored snapshot.
- Parameters:
component – The observing component whose snapshot is to be updated.
health – Optional HealthState to store for the component.
state – Optional DevState to store for the component.
admin_mode – Optional AdminMode to store for the component.
obs_state – Optional ObsState to store for the component.
- Returns:
An integer representing the updated snapshot version, as returned by the base implementation.