Subarray Component Manager

class ska_csp_lmc_common.subarray.subarray_component_manager.CSPSubarrayComponentManager(*args: Any, **kwargs: Any)

Bases: CSPBaseComponentManager

Component Manager for the Csp Subarray.

__init__(health_model: SubarrayHealthModel, op_state_model: SubarrayOpStateModel, obs_state_model: SubarrayObsStateModel, properties: ComponentManagerConfiguration, update_device_property_cbk: Callable, logger: Logger | None = None, communication_state_changed_callback: Callable[[ska_control_model.CommunicationStatus], None] | None = None, component_state_changed_callback: Callable[[dict[str, Any]], None] | None = None) None

Instantiate the Component Manager for the Csp Subarray.

Parameters:
  • health_model – The Health State Model to evaluate the CSP Subarray health state.

  • op_state_model – The Operational State Model to evaluate the CSP Subarray device operational state.

  • obs_state_model – The Observing State Model to evaluate the CSP Subarray device observing state.

  • properties – A class instance whose properties are the CSP Subarray TANGO Device Properties.

  • update_device_property_cbk – The CSP Subarray method invoked to update the properties. Defaults to None.

  • logger – The device or python logger if default is None.

property obs_modes: List[ska_control_model.ObsMode]

Return the obsmodes supported by the subarray.

Returns:

The list of obsModes supported by the subarray.

property obs_state: ska_control_model.ObsState

Return the subarray obsState

Returns:

The subarray obsState.

property sub_id: int

Return the subarray identification number.

Returns:

The identification number of the CSP Subarray handled by the manager.

property valid_scan_configuration: str

Return the programmed configuration.

property assigned_timing_beams_ids: List[int]

Return the list of assigned timing beams IDs.

property assigned_timing_beams_state: List[int]

Return the state of the assigned timing beams

property assigned_timing_beams_health_state: List[int]

Return the state of the assigned timing beams

property assigned_resources: List[str]

Return the FQDN od the resources assigned to the subsystems

property config_id: str

Return the ID of the received configuration.

Returns:

a string with the ID of the received configuration.

property obs_mode: List[str]

Return the list of observation modes for the subarray.

Returns:

a list of strings with the ObSMode.

property quality_assurance_metrics: str

Return a JSON formatted string with quality attributes.

_create_init_callbacks() BaseInitCallbacks

Create subarray-specific initialization callbacks.

_create_stop_callbacks() BaseStopCallbacks

Create subarray-specific stop callbacks.

_get_health_supervisor_update_callback() Callable[[str, Any], None] | None

Return callback used by the health supervisor for publications.

Subarray updates are routed through update_attribute(), which applies the component-manager attribute lock before invoking the device callback.

Returns:

callback accepting (attribute_name, attribute_value).

_observing_factory(logger: Logger) ObservingComponentFactory

Configure the Factory to create the CSP sub-systems components.

This class is specialized in Mid.CSP and Low.CSP.

Parameters:

logger – the logger for this instance.

Returns:

A factory object to create CSP Subarray sub-systems observing components.

_add_subsystems_components() None

Instantiate a Python class for each CSP sub-system subordinate device.

Each sub-system component works as an adapter and cache for the associated CSP sub-system and provides the methods or calls to operate on the associate sub-system TANGO device.

On failure, the state and health state of the CSP Subarray is forced to FAULT/FAILED.

_task_submitter(task_name: str, task_callback: Callable | None = None, argin: Any | None = None, **kwargs: Any) Tuple[ska_control_model.TaskStatus, str]

Common method to submit a command

Parameters:
  • task_name – the name of the task to be submitted

  • argin – input argument of the task

Returns:

a tuple with TaskStatus and an informative msg

_json_configuration_parser(argin) JsonConfigurationParser

Configure the class that handles the parsing of the input JSON files sent as argument of AssignResources, Configure and Scan.

This class is specialized in Mid.CSP and Low.CSP.

_get_dict_of_devices()

Returns the list of devices to which the subarray device connects during initialization.

Note: Until the connection with the PST capability is added, the list of devices matches the list of all subsystems, including PST beams, and the behavior remains the same as before. Once the connection to the PST capability is established, the subarray will connect only to the beams assigned to it.

_ensure_resource_manager_initialized() bool

Ensure that the Resource Manager proxy is initialized for this Subarray.

If the manager proxy is not yet set, the method attempts to retrieve it.

Returns:

True if the Resource Manager proxy is successfully initialized (either newly set or already available); False if the Resource Manager component could not be found.

_update_valid_scan_configuration(config_json: str) None

Update the attribute with the scan configuration.

This method is called upon successful completion of the subarray configuration.

Parameters:

config_json – Configure command input string.

_get_obs_modes()
_initialize_quality_metrics() None

Initialize the quality metrics dictionary to default values.

_execute_command(task_name: str, task_callback: Callable, argin: Any | None = None, resource_preparer: Callable | None = None, completion_kwargs: Dict[str, Any] | None = None, task_abort_event: Event | None = None) None

A centralized, reusable method to handle command execution flow.

This method encapsulates the common steps of:

  1. Preparing resources for a command.

  2. Creating a command completion handler.

  3. Composing a main task.

  4. Handling the “no-operation” case where no sub-tasks are generated.

  5. Submitting the task to the main executor.

  6. Handling exceptions and reporting failures.

The original task_callback provided by the caller is not invoked directly. Instead, it is wrapped by a CommandCompletionHandler, which:

  • Receives enriched completion metadata from the TaskTracker

    (e.g. task_name, source, devices, failed_devices, health_state).

  • Performs command-specific post-processing (event flushing,

    observation state updates, side effects).

  • Finally invokes the original task_callback with a stable,

    backward-compatible signature (status, result).

This design allows the internal task execution framework to propagate detailed execution context while preserving compatibility with existing command callbacks.

Parameters:
  • task_name – The name of the command task (e.g., “configure”).

  • task_callback – The callback to report task status and results.

  • argin – The input argument for the command.

  • resource_preparer – A function that prepares self.resources and returns any command-specific data. It may raise an exception on validation failure.

  • completion_kwargs – A dict of keyword arguments for the completion handler’s command_completed method.

  • task_abort_event – external abort event propagated by the TaskExecutor

  • no_op_msg – The message to return if no sub-tasks are composed.

_prepare_online_resources(_: Any | None = None) None

Prepare resources for commands that target all online components.

_prepare_assign_resources(argin: dict) None

Prepare resources for the AssignResources command.

_prepare_release_resources(argin: dict) None

Prepare resources for the ReleaseResources command.

_prepare_release_all_resources(_: Any | None = None) None

Prepare resources for the ReleaseAllResources command.

_prepare_configure_resources(argin: dict) dict

Prepare resources for the Configure command.

_prepare_scan_resources(argin: str) dict

Prepare resources for the Scan command.

_scan_preparation(json_input: dict) bool

Preliminary check for the scan command and store scan id. It is overwritten in Low.

Parameters:

json_input – scan input json (dict)

Returns:

True when parsing succeeds, False otherwise.

_store_scan_id(json_input: dict) None

Extract and store the Scan Id from the scan input json.

Parameters:

json_input – scan input json (dict)

_on(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the On command using the central executor.

_off(task_name, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the Off command using the central executor.

_assign(task_name: str, argin: dict, task_callback: Callable | None = None, task_abort_event: Event | None = None)

Execute the AssignResources command using the central executor.

_configure(task_name: str, argin: dict, task_callback: Callable = None, task_abort_event: Event | None = None) None

Execute the Configure command using the central executor.

_scan(task_name: str, argin: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the Scan command using the central executor.

_end_scan(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the EndScan command using the central executor.

_release(task_name: str, argin: dict, task_callback: Callable | None = None, task_abort_event: Event | None = None)

Execute the ReleaseResources command using the central executor.

_release_all(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None)

Execute the ReleaseAllResources command using the central executor.

_gotoidle(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the GoToIdle command using the central executor.

_abort(task_callback: Callable | None = None)
Create a thread pool to perform the following action:
  • create abort command for the CSP subarrays

  • send abort command to any running stoppable CSP subarray task

  • shutdown the executors (CSPexecutor, subarray_executor, clean priority queue)

  • re-init the executors (CSPexecutor, subarray_executor, priority queue)

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with ResultCode and an informative message string.

_obsreset(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Method submitted into thread executor for obsreset command

Currently the command is rejected because not used by the sub-systems.

Parameters:
  • task_name – the name of the task

  • task_callback – method called to update the task result related attributes

  • task_abort_event – flag to signal an abort request pending

_restart(task_name: str, task_callback: Callable | None = None, task_abort_event: Event | None = None) None

Execute the Restart command.

Parameters:
  • task_name – Name of the command task.

  • task_callback – Callback used to report task completion.

  • task_abort_event – Optional abort event propagated by the executor.

init()

Initialize the Subarray ComponentManager. Instantiate:

  • the command factory

  • the json parser for the configuration files

  • the event manager

  • an ObservingComponent object for each CSP sub-system defined in the CSP Subarray Device Properties.

_build_planned_device_aliases() dict[str, str]

Build a lookup from readable device aliases to canonical FQDNs.

start_communicating()

Method to start the monitoring of the CSP subordinate devices. It establishes the connection with the CSP subordinate sub-systems observing devices. On success, the main sub-systems attributes are subscribed for events.

It is invoked when the adminMode of the device is OFFLINE or ENGINEERING.

stop_communicating() None

Method invoked when the CSP Subarray TANGO Device is set OFFLINE.

In this case the CspSubarrayComponentManager stop monitoring the subordinate devices.

The admin mode is forwarded to all the online subordinate components. If the request fails, only the component offline are disconnected.

connect_to_pst() Tuple[ska_control_model.TaskStatus, Tuple[ska_control_model.ResultCode, str]]

Connect to (and subscribe) PST beams assigned to this subarray during resource assignment.

Returns:

(status, (result_code, message))

disconnect_from_all_pst()

Method invoked to disconnect from all Pst beams allocated to the subarray. This method is just a wrapper of the method used to disconnect a single pst beam, but the self.resources attributed updated to disconnect from all of them

disconnect_from_pst()

Method invoked to disconnect from Pst Beam when they are released. This method is invoked to:

  • update the Pst beams allocated to the subarray

  • unsubscribe the released Pst Beams attributes

  • update the list of allocated resources on all subsystems: cbf, pst, pss

on(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Enable the CSP Subarray to perform observing tasks. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or _create_abort_cmd beams) that are currently online.

Parameters:

task_callback – method called to update the task result related attributes

:return a tuple with TaskStatus and an informative message.

off(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Disable the CSP Subarray to perform observing tasks. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

The command can be invoked from any observing state with the result that any ongoing process, is aborted and all the assigned resources released.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

configure(task_callback: Callable[[], None] | None = None, **argin: dict) Tuple[ska_control_model.TaskStatus, str]

Configure the CSP Subarray to perform observing tasks. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:
  • argin – The CSP Subarray configuration data

  • task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

abort(task_callback: TaskCallbackType | None = None) Tuple[TaskStatus, str]

Abort any running stoppable CSP subarray task. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

obsreset(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Reset a FAULT or ABORTED condition of the CSP Subarray. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online and in FAULT or ABORTED observing state.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

restart(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Restart a CSP Subarray in FAULT or ABORTED observing state.

The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

assign(task_callback: Callable[[], None] | None = None, **argin: dict) Tuple[ska_control_model.TaskStatus, str]

Load the configuration from file specifying the resources for subsystems. Forward the command to all the subsystems.

Parameters:
  • argin – the information with the resources to allocate to the CSP Subarray

  • task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

update_assigned_resources(resource_type: str, resource_value: List[str]) None

Update the list of the FQDN resources assigned to the subarray.

Parameters:
  • resource_type – the type of the resources handled by the capabilities

  • resource_value – the list of assigned resources’ FQDNs.

release(task_callback: Callable[[], None] | None, **argin: dict) Tuple[ska_control_model.TaskStatus, str]

Forward the command to release assigned resounces to all the subsystems.

Parameters:
  • argin – the information with the resources to remove from the CSP Subarray

  • task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

release_all(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Forward the command to release all assigned resources to all the subsystems.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

scan(task_callback: Callable[[], None] | None = None, **argin: dict) Tuple[ska_control_model.TaskStatus, str]

Start a Scan. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:
  • task_callback – method called to update the task result related attributes

  • argin – the scan information.

Returns:

a tuple with TaskStatus and an informative message.

end_scan(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Stop gracefully the execution of a scan.

The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that in scanning.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

update_attribute(property_name, property_value)

Update a device attribute via the registered callback, thread-safely.

Parameters:
  • property_name – Name of the device attribute to update.

  • property_value – Value to assign to the device attribute.

gotoidle(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Gotoidle the CSP Subarray to perform observing tasks. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

deconfigure(task_callback: Callable[[], None] | None = None) Tuple[ska_control_model.TaskStatus, str]

Gotoidle the CSP Subarray to perform observing tasks. The command is forwarded to all the CSP Sub-systems observing devices (subarrays or beams) that are currently online.

Parameters:

task_callback – method called to update the task result related attributes

Returns:

a tuple with TaskStatus and an informative message.

reset(task_callback: Callable[[], None] | None = None, **kwargs: Any) Tuple[ska_control_model.TaskStatus, str]
force_health_state(faulty_flag: bool, reason: str) None

Method to set/unset the subarray healthState to FAILED.

Parameters:
  • faulty_flag – whether to force the healthstate to FAILED or not.

  • reason – information related to the reason of the HealthState modification.

update_json()

Internal function TBD

Update the configuration script adding the PST Beam(s) addresses.

Parameters:
  • original_dict – the original input configuration.

  • updated_info – the updated configuration.

update_obs_mode(resources_to_send: Dict)

Method to update obsmode based on the obsstate and configuration

reset_obs_mode()

Reset the obsmode to IDLE

update_quality_metrics(subsystem_fqdn: str, attr_name: str, attr_value: Any, valid: bool = True) None

Callback to update the QualityAssuranceMetrics attribute for a subsystem.

Parameters:
  • subsystem_fqdn – FQDN of the subsystem that published the metric update.

  • attr_name – Name of the quality metric to update.

  • attr_value – Value received for the quality metric.

  • valid – Whether the received metric value is valid.

_init_evt_handler() None

Instantiate and wire the CSP handler for this manager.

_instantiate_evt_handler(supervisor_health_event_cb, supervisor_health_info_cb) None

Instantiate the event handler for the subarray. This method is extended in the Mid subarray.

Parameters:
  • supervisor_health_event_cb – Callback to handle health events from the supervisor.

  • supervisor_health_info_cb – Callback to handle detailed health information from the supervisor.