Migrating to 1.7
ska-tango-base 1.7.0 introduces a new ska_tango_base.future subpackage that provides a simplified, opinionated API intended to replace the existing base class hierarchy in a future major release.
This guide describes how to migrate devices that currently inherit from SKABaseDevice, SKAObsDevice, SKASubarray, or SKAController to the corresponding future interfaces.
Both APIs can coexist in the same project during migration.
Class inheritance
The following table shows the direct class equivalents:
Old class |
New class |
|---|---|
|
|
|
|
|
|
|
PoweredInterface is new. It sits between BaseInterface and ObsInterface and should be used whenever the device controls a component that has power states (ON/OFF/STANDBY).
Example class inheritance with the old API:
from ska_tango_base import SKABaseDevice
class MyDevice(SKABaseDevice["MyComponentManager"]):
InitCommand = None
...
New class inheritance needed for the same device with the new API:
from ska_tango_base import future # Needs to imported explicitly
class MyDevice(
future.ComponentManagerLRCMixin["MyComponentManager"],
future.BaseInterface,
):
...
For devices that control a powered component:
from ska_tango_base import future # Needs to imported explicitly
class MyDevice(
future.ComponentManagerLRCMixin["MyComponentManager"],
future.PoweredInterface,
):
...
ComponentManagerLRCMixin is a convenience mixin that combines component-manager attachment with LRC support. Use ComponentManagerMixin instead if you do not need LRCs.
InitCommand removed
The InitCommand class object pattern has been removed. A device using ska-tango-base 1.4 or newer should have already acknowledged the deprecation by assigning InitCommand = None and overriding init_device() directly. There is no need to set InitCommand = None with the future classes, but it does no harm if to leave it in.
Component manager
Using the BaseComponentManager with SKABaseDevice requires communication_state_callback and component_state_callback constructor arguments and uses CommunicationStatus to gate the Tango device operational state.
The new API removes this concept entirely. Component managers should instead inherit from the appropriate observer mixin and call the appropriate state update methods directly:
Old base class |
New base class |
|---|---|
|
|
(powered variant) |
|
|
To retain the task executor, also inherit from TaskExecutorComponentManager:
import ska_tango_base as stb
import ska_control_model as scm
# Old
class MyComponentManager(stb.base.BaseComponentManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def on(self, task_callback=None):
return self._task_executor.submit(self._do_on, task_callback=task_callback)
def _communication_state_changed(self, communication_state):
# old pattern — not needed in the new API
...
# New
class MyComponentManager(
stb.future.PoweredOpStateEmitMixin,
stb.executor.TaskExecutorComponentManager,
):
@stb.executor.TaskExecutor.task
def do_on(self, progress_callback, task_abort_event):
self.component_on()
return scm.ResultCode.OK, "On completed"
The key differences:
No constructor arguments for state callbacks — the signal bus propagates state automatically once the component manager is attached to the device via
ComponentManagerMixin.No
CommunicationStatus. The device starts inINITstate and transitions toON(or the appropriate power state) as soon asinit_completed()is called.
State management
The state machine objects (op_state_model, obs_state_model) are removed as decided with ADR-124. State is now driven entirely by calling the appropriate state update methods on either the device or the component manager, which will automatically update the Tango device state and send change and archive events as needed. The mapping from method arguments to resulting Tango device state is as follows:
Interface |
Method |
Resulting |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Similarly, for the observation state (ObsState), the appropriate state update methods should be called directly:
self.component_resourced() # → ObsState.IDLE
self.component_configured() # → ObsState.READY
self.component_scanning() # → ObsState.SCANNING
self.component_obsfault() # → ObsState.FAULT
Device initialisation
For BaseInterface (and all subclasses) the initialisation sequence is:
super().init_device()— sets the device toINITstate.Set up any device-specific state.
self.init_completed()— transitions the device out ofINIT.
If using a component manager via ComponentManagerMixin, the component manager is created automatically inside on_new_shared_bus (before init_device returns from the super() call). You do not need to call create_component_manager() yourself.
For PoweredInterface the device does not enter ON state automatically after init_completed(). Instead, explicitly set the power state by calling either component_on(), component_standby() or component_off() before calling init_completed():
def init_device(self) -> None:
super().init_device()
# Establish the power state and then call init_completed()
self.component_off()
self.init_completed()
Optional attributes: controlMode, simulationMode, testMode
The optional attributes controlMode, simulationMode, and testMode are no longer included by default. Add them explicitly using the factory functions, which return a (signal, attribute) pair to unpack at class scope:
import ska_tango_base as stb
class MyDevice(stb.future.BaseInterface):
_control_mode, controlMode = stb.future.standard_control_mode()
_simulation_mode, simulationMode = stb.future.standard_simulation_mode()
_test_mode, testMode = stb.future.standard_test_mode()
These factory functions are also available at standard_control_mode(), standard_simulation_mode(), and standard_test_mode() for use with the old API (unchanged since 1.4).
Optional attribute: obsMode
The obsMode attribute is not included by
default in ObsInterface. Add it with:
class MyObsDevice(stb.future.ObsInterface):
_obs_mode, obsMode = stb.future.standard_obs_mode()
Long running commands
The LRC pattern is unchanged. The @submit_lrc_task and @long_running_command decorators work identically on the new interfaces. The execute_<Cmd> override pattern is the same:
import ska_tango_base as stb
class MyDevice(
stb.future.ComponentManagerLRCMixin["MyComponentManager"],
stb.future.PoweredInterface,
):
@stb.long_running_commands.submit_lrc_task
def execute_On(self) -> stb.type_hints.TaskFunctionType:
return self.component_manager.do_on
@stb.long_running_commands.submit_lrc_task
def execute_Off(self) -> stb.type_hints.TaskFunctionType:
return self.component_manager.do_off
Subarray devices
Replace SKASubarray with SubarrayInterface. All subarray commands (AssignResources, ReleaseResources, Configure, Scan, EndScan, End, Abort, ObsReset, Restart, ReleaseAllResources) are already defined on the interface and enabled once the corresponding execute_<Cmd> method is overridden.
Unlike the old API, the obsState transitions during command execution are handled automatically by the interface via started_<Cmd> / completed_<Cmd> callbacks — you do not need to set _obs_state in your execute_<Cmd> methods. Override started_<Cmd> or completed_<Cmd> only if the default transition is wrong for your use case.
import ska_tango_base as stb
class MySubarray(
stb.future.ComponentManagerLRCMixin["MySubarrayComponentManager"],
stb.future.SubarrayInterface,
):
def create_component_manager(self):
return MySubarrayComponentManager(self.logger)
def read_assignedResources(self) -> list[str]:
return self.component_manager.assigned_resources
def read_configuredCapabilities(self) -> list[str]:
return self.component_manager.configured_capabilities
@stb.long_running_commands.submit_lrc_task
def execute_AssignResources(self, argin: str):
resources = set(json.loads(argin).get("resources", []))
return functools.partial(self.component_manager.do_assign, resources)
The Abort command does not have a corresponding execute_Abort override. Override schedule_abort_task() instead, as in the old API.
Controller devices
Replace SKAController or the old ControllerInterface with the new ControllerInterface. The MaxCapabilities device property and the maxCapabilities, availableCapabilities, and IsCapabilityAchievable interface members are provided unchanged.
Full example
A minimal powered device with a component manager and LRCs:
import ska_control_model as scm
import ska_tango_base as stb
from ska_tango_base import future # Needs to imported explicitly
class MyComponentManager(
future.PoweredOpStateEmitMixin,
stb.executor.TaskExecutorComponentManager,
):
def _on_unhandled_exception(self, exception: Exception) -> None:
self.software_fault(str(exception))
@stb.executor.TaskExecutor.task
def do_on(self, progress_callback, task_abort_event):
... # talk to hardware
self.component_on()
return scm.ResultCode.OK, "On completed"
@stb.executor.TaskExecutor.task
def do_off(self, progress_callback, task_abort_event):
...
self.component_off()
return scm.ResultCode.OK, "Off completed"
class MyDevice(
future.ComponentManagerLRCMixin["MyComponentManager"],
future.PoweredInterface,
):
_control_mode, controlMode = future.standard_control_mode()
_simulation_mode, simulationMode = future.standard_simulation_mode()
_test_mode, testMode = future.standard_test_mode()
def create_component_manager(self) -> MyComponentManager:
return MyComponentManager(self.logger)
def init_device(self) -> None:
super().init_device()
self.component_off()
self.init_completed()
@stb.long_running_commands.submit_lrc_task
def execute_On(self) -> stb.type_hints.TaskFunctionType:
return self.component_manager.do_on
@stb.long_running_commands.submit_lrc_task
def execute_Off(self) -> stb.type_hints.TaskFunctionType:
return self.component_manager.do_off
def report_health_from_monitoring(self) -> None:
# Called from your monitoring loop
issues = ...
if issues:
self.report_health(scm.HealthState.DEGRADED, issues)
else:
self.report_health(scm.HealthState.OK, [])