Observation Supervisor
- class ska_csp_lmc_common.supervision.generic_supervisor.ObservationSupervisor(*, policy: 'ScanConsistencyPolicy' | None, update_attribute: Callable[[str, Any], None], logger: logging.Logger, debounce_s: float = 0.05, max_latency_s: float = 0.2)
Bases:
objectBase class implementing debounce/max-latency supervision timing.
- stop() None
Stop the background tick thread and wait for join.
- on_observation_update(immediate: bool = False) None
Notify that an event requires a future evaluation.
Schedules an evaluation. The immediate flag will bypass the debounce delay.
- Parameters:
immediate – If True, bypass debounce delay.
- clear_consistency_fault() None
Reset consistency error messages.
- _tick_loop() None
Background loop managing timing logic for evaluations.
- _evaluate_and_publish(*, force: bool = False) None
Evaluate the candidate state and publish results.
Must be implemented by subclasses to define specific business logic.
- Parameters:
candidate – The state candidate to be evaluated.
- Raises:
NotImplementedError – If not overridden by a subclass.