Observation Supervisor

class ska_csp_lmc_common.supervision.generic_supervisor.ObservationSupervisor(*, policy: 'ScanConsistencyPolicy' | None, update_attribute: Callable[[str, Any], None], logger: logging.Logger, debounce_s: float = 0.05, max_latency_s: float = 0.2)

Bases: object

Base class implementing debounce/max-latency supervision timing.

stop() None

Stop the background tick thread and wait for join.

on_observation_update(immediate: bool = False) None

Notify that an event requires a future evaluation.

Schedules an evaluation. The immediate flag will bypass the debounce delay.

Parameters:

immediate – If True, bypass debounce delay.

clear_consistency_fault() None

Reset consistency error messages.

_tick_loop() None

Background loop managing timing logic for evaluations.

_evaluate_and_publish(*, force: bool = False) None

Evaluate the candidate state and publish results.

Must be implemented by subclasses to define specific business logic.

Parameters:

candidate – The state candidate to be evaluated.

Raises:

NotImplementedError – If not overridden by a subclass.