SAT LMC Prototype documentation
This project is developing the Local Monitoring and Control (LMC) prototype for the Square Kilometre Array.
API
Controller subpackage
This subpackage implements sat controller functionality for SAT.LMC.
- class ControllerComponentManager(maser_fqdns, logger, communication_status_changed_callback, maser_health_changed_callback)[source]
A component manager for an SAT LMC controller.
- __init__(maser_fqdns, logger, communication_status_changed_callback, maser_health_changed_callback)[source]
Initialise a new instance.
- Parameters
maser_fqdns (
typing.Iterable
[str
]) – FQDNS of all maser deviceslogger (
logging.Logger
) – the logger to be used by this object.communication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changesmaser_health_changed_callback (
typing.Callable
[[str
,typing.Optional
[ska_tango_base.control_model.HealthState
]],None
]) – callback to be called when the health of the maser changes
- class ControllerHealthModel(maser_fqdns, health_changed_callback)[source]
A health model for a controller.
- __init__(maser_fqdns, health_changed_callback)[source]
Initialise a new instance.
- Parameters
maser_fqdns (
typing.Sequence
[str
]) – the FQDNs of the masershealth_changed_callback (
typing.Callable
[[ska_tango_base.control_model.HealthState
],None
]) – callback to be called whenever there is a change to this health model’s evaluated health state.
- evaluate_health()[source]
Compute overall health of the controller.
The overall health is based on the fault and communication status of the controller overall, together with the health of the subservient devices that it manages.
This implementation simply sets the health of the controller to the health of its least healthy component.
- Return type
- Returns
an overall health of the controller
- maser_health_changed(maser_fqdn, maser_health)[source]
Handle a change in maser health.
- Parameters
maser_fqdn (
str
) – the FQDN of the maser whose health has changedmaser_health (
typing.Optional
[ska_tango_base.control_model.HealthState
]) – the health state of the specified maser, or None if the maser’s admin mode indicates that its health should not be rolled up.
- Return type
- class SatController(*args, **kwargs)[source]
An implementation of a controller Tango device for SatLMC.
- class InitCommand(*args, **kwargs)[source]
A class for
SatController
’s Init command.The
do()
method below is called duringSatController
’s initialisation.
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
ska_sat_lmc.controller.controller_component_manager.ControllerComponentManager
- Returns
a component manager for this device.
- health_changed(health)[source]
Call this method whenever the HealthModel’s health state changes.
Responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
Controller Component Manager
This module implements component management for the SAT.LMC controller.
- class ControllerComponentManager(maser_fqdns, logger, communication_status_changed_callback, maser_health_changed_callback)[source]
A component manager for an SAT LMC controller.
- __init__(maser_fqdns, logger, communication_status_changed_callback, maser_health_changed_callback)[source]
Initialise a new instance.
- Parameters
maser_fqdns (
typing.Iterable
[str
]) – FQDNS of all maser deviceslogger (
logging.Logger
) – the logger to be used by this object.communication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changesmaser_health_changed_callback (
typing.Callable
[[str
,typing.Optional
[ska_tango_base.control_model.HealthState
]],None
]) – callback to be called when the health of the maser changes
Controller Device
This module contains the ska_sat_lmc Controller device prototype.
- class SatController(*args, **kwargs)[source]
An implementation of a controller Tango device for SatLMC.
- class InitCommand(*args, **kwargs)[source]
A class for
SatController
’s Init command.The
do()
method below is called duringSatController
’s initialisation.
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
ska_sat_lmc.controller.controller_component_manager.ControllerComponentManager
- Returns
a component manager for this device.
- health_changed(health)[source]
Call this method whenever the HealthModel’s health state changes.
Responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
Controller Health Model
An implementation of a health model for a controller.
- class ControllerHealthModel(maser_fqdns, health_changed_callback)[source]
A health model for a controller.
- __init__(maser_fqdns, health_changed_callback)[source]
Initialise a new instance.
- Parameters
maser_fqdns (
typing.Sequence
[str
]) – the FQDNs of the masershealth_changed_callback (
typing.Callable
[[ska_tango_base.control_model.HealthState
],None
]) – callback to be called whenever there is a change to this health model’s evaluated health state.
- evaluate_health()[source]
Compute overall health of the controller.
The overall health is based on the fault and communication status of the controller overall, together with the health of the subservient devices that it manages.
This implementation simply sets the health of the controller to the health of its least healthy component.
- Return type
- Returns
an overall health of the controller
- maser_health_changed(maser_fqdn, maser_health)[source]
Handle a change in maser health.
- Parameters
maser_fqdn (
str
) – the FQDN of the maser whose health has changedmaser_health (
typing.Optional
[ska_tango_base.control_model.HealthState
]) – the health state of the specified maser, or None if the maser’s admin mode indicates that its health should not be rolled up.
- Return type
Maser subpackage
This subpackage implements maser functionality for SAT.LMC.
- class MaserComponentManager(initial_simulation_mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
A component manager that handles a Maser simulator and driver.
- __init__(initial_simulation_mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise a new instance.
- Parameters
initial_simulation_mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usemaser_url (
str
) – the URL of the masersimulator_url (
str
) – the URL of the simulated masercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- property simulation_mode: ska_tango_base.control_model.SimulationMode
Return the simulation mode.
- Return type
- Returns
the simulation mode
- property test_mode: ska_tango_base.control_model.TestMode
Return the test mode.
- Return type
- Returns
the test mode
- class MaserDriver(initial_simulation__mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Device Server for the T4Science i3000 Maser for SAT.LMC.
- __init__(initial_simulation__mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise the attributes and properties of the SatMaser.
- Parameters
initial_simulation__mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usemaser_url (
str
) – the URL of the hardware masersimulator_url (
str
) – the URL of the simulated masercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- property ambient_temperature: float
Report the ambient temperature.
- Return type
- Returns
ambient temperature
- property amplitude_405khz_voltage: float
Report the 405 kHz Amplitude.
- Return type
- Returns
the 405 kHz Amplitude
- property battery_current_a: float
Report the current of battery A.
- Return type
- Returns
battery current A
- property battery_current_b: float
Report the current of battery B.
- Return type
- Returns
battery current B
- property battery_voltage_a: float
Report the voltage of battery A.
- Return type
- Returns
battery voltage A
- property battery_voltage_b: float
Report the voltage of battery B.
- Return type
- Returns
battery voltage B
- property boxes_temperature: float
Report the boxes temperature.
- Return type
- Returns
boxes temperature
- property cfield_voltage: float
Report the C-field voltage.
- Return type
- Returns
C-field voltage
- property dissociator_current: float
Report the dissociator current.
- Return type
- Returns
dissociator current
- property dissociator_light: float
Report the dissociator light.
- Return type
- Returns
dissociator light
- property external_bottom_heater_voltage: float
Report the external bottom heater voltage.
- Return type
- Returns
external bottom heater voltage
- property external_high_current_value: float
Report the external high current value.
- Return type
- Returns
external high current value
- property external_high_voltage_value: float
Report the external high voltage value.
- Return type
- Returns
external high voltage value
- property external_side_heater_voltage: float
Report the external side heater voltage.
- Return type
- Returns
external side heater voltage
- property hydrogen_pressure_measured: float
Report the measured hydrogen pressure.
- Return type
- Returns
hydrogen pressure measurement
- property hydrogen_pressure_setting: float
Report the hydrogen pressure setting.
- Return type
- Returns
hydrogen pressure setting
- property hydrogen_storage_heater_voltage: float
Report the hydrogen storage heater voltage.
- Return type
- Returns
hydrogen storage heater voltage
- property hydrogen_storage_pressure: float
Report the hydrogen storage pressure.
- Return type
- Returns
hydrogen storage pressure
- property internal_bottom_heater_voltage: float
Report the internal bottom heater voltage.
- Return type
- Returns
internal bottom heater voltage
- property internal_high_current_value: float
Report the internal high current value.
- Return type
- Returns
internal high current value
- property internal_high_voltage_value: float
Report the internal high voltage value.
- Return type
- Returns
internal high voltage value
- property internal_side_heater_voltage: float
Report the internal side heater voltage.
- Return type
- Returns
internal side heater voltage
- property internal_top_heater_voltage: float
Report the internal top heater voltage.
- Return type
- Returns
internal top heater voltage
- property isolator_heater_voltage: float
Report the isolator heater voltage.
- Return type
- Returns
isolator heater voltage
- property lock100mhz: float
Report the lock 100MHz status.
- Return type
- Returns
the lock 100MHz status
- property negative15vdc: float
Report the -15 V supply voltage.
- Return type
- Returns
-15 V supply voltage
- property negative5vdc: float
Report the -5 V supply voltage.
- Return type
- Returns
-5 V supply voltage
- property oscillator_100mhz_voltage: float
Report the oscillator_voltage 100MHz.
- Return type
- Returns
oscillator_voltage 100MHz
- property oscillator_voltage: float
Report the OCXO varicap voltage.
- Return type
- Returns
the OCXO varicap voltage
- property phase_lock_loop_lockstatus: bool
Report the main PLL lock status.
- Return type
- Returns
main PLL lock status
- property pirani_heater_voltage: float
Report the pirani heater voltage.
- Return type
- Returns
pirani heater voltage
- property positive15vdc: float
Report the +15 V supply voltage.
- Return type
- Returns
+15 V supply voltage
- property positive18vdc: float
Report the +18 V supply voltage.
- Return type
- Returns
+18 V supply voltage
- property positive24vdc: float
Report the +24 V supply voltage.
- Return type
- Returns
+24 V supply voltage
- property positive5vdc: float
Report the +5 V supply voltage.
- Return type
- Returns
+5 V supply voltage
- property positive8vdc: float
Report the +8 V supply voltage.
- Return type
- Returns
+8 V supply voltage
- property purifier_current: float
Report the purifier current.
- Return type
- Returns
purifier current
- property thermal_control_unit_heater_voltage: float
Report the thermal control unit heater voltage.
- Return type
- Returns
thermal control unit heater voltage
- property tube_heater_voltage: float
Report the tube heater voltage.
- Return type
- Returns
tube heater voltage
- property varactor_diode_voltage: float
Report the varactor voltage.
- Return type
- Returns
varactor voltage
- class MaserHealthModel(health_changed_callback)[source]
A health model for a maser.
At present this uses the base health model; this is a placeholder for a future, better implementation.
- class SatMaser(*args, **kwargs)[source]
An implementation of a Maser Tango device for SatLmc.
- GetVersionInfo()[source]
Get the version the device.
- Return type
- Returns
Version details of the device.
- class InitCommand(*args, **kwargs)[source]
Implement the device initialisation for the Maser device.
- Off()[source]
Put the device into off mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- On()[source]
Put the device into on mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- Reset()[source]
Reset the device.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- Standby()[source]
Put the device into standby mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- ambient_temperature()[source]
Report the ambient temperature.
- Return type
- Returns
ambient temperature
- amplitude_405khz_voltage()[source]
Report the 405 kHz Amplitude.
- Return type
- Returns
the 405 kHz Amplitude
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
- Returns
a component manager for this device.
- dissociator_current()[source]
Report the dissociator current.
- Return type
- Returns
dissociator current
- external_bottom_heater_voltage()[source]
Report the external bottom heater voltage.
- Return type
- Returns
external bottom heater voltage
- external_high_current_value()[source]
Report the external high current value.
- Return type
- Returns
external high current value
- external_high_voltage_value()[source]
Report the external high voltage value.
- Return type
- Returns
external high voltage value
- external_side_heater_voltage()[source]
Report the external side heater voltage.
- Return type
- Returns
external side heater voltage
- health_changed(health)[source]
Handle change in this device’s health state.
This is a callback hook, called whenever the HealthModel’s evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
- hydrogen_pressure_measured()[source]
Report the measured hydrogen pressure.
- Return type
- Returns
hydrogen pressure measurement
- hydrogen_pressure_setting()[source]
Report the hydrogen pressure setting.
- Return type
- Returns
hydrogen pressure setting
- hydrogen_storage_heater_voltage()[source]
Report the hydrogen storage heater voltage.
- Return type
- Returns
hydrogen storage heater voltage
- hydrogen_storage_pressure()[source]
Report the hydrogen storage pressure.
- Return type
- Returns
hydrogen storage pressure
- init_command_objects()[source]
Initialise the command handlers for commands supported by this device.
- Return type
- init_device()[source]
Initialise the device.
This is overridden here to change the Tango serialisation model.
- Return type
- internal_bottom_heater_voltage()[source]
Report the internal bottom heater voltage.
- Return type
- Returns
internal bottom heater voltage
- internal_high_current_value()[source]
Report the internal high current value.
- Return type
- Returns
internal high current value
- internal_high_voltage_value()[source]
Report the internal high voltage value.
- Return type
- Returns
internal high voltage value
- internal_side_heater_voltage()[source]
Report the internal side heater voltage.
- Return type
- Returns
internal side heater voltage
- internal_top_heater_voltage()[source]
Report the internal top heater voltage.
- Return type
- Returns
internal top heater voltage
- is_attribute_allowed(attr_req_type)[source]
Protect attribute access before being updated otherwise it reports alarm.
- Parameters
attr_req_type (
tango.AttReqType
) – tango attribute type READ/WRITE- Return type
- Returns
True if the attribute can be read else False
- isolator_heater_voltage()[source]
Report the isolator heater voltage.
- Return type
- Returns
isolator heater voltage
- oscillator_100mhz_voltage()[source]
Report the oscillator_voltage 100MHz.
- Return type
- Returns
oscillator_voltage 100MHz
- oscillator_voltage()[source]
Report the OCXO varicap voltage.
- Return type
- Returns
the OCXO varicap voltage
- phase_lock_loop_lockstatus()[source]
Report the main PLL lock status.
- Return type
- Returns
main PLL lock status
- pirani_heater_voltage()[source]
Report the pirani heater voltage.
- Return type
- Returns
pirani heater voltage
- simulationMode(value)[source]
Set the simulation mode.
- Parameters
value (
ska_tango_base.control_model.SimulationMode
) – The simulation mode, as a SimulationMode value- Return type
- testMode(value)[source]
Set the test mode.
- Parameters
value (
ska_tango_base.control_model.TestMode
) – The test mode, as a TestMode value- Return type
- thermal_control_unit_heater_voltage()[source]
Report the thermal control unit heater voltage.
- Return type
- Returns
thermal control unit heater voltage
Maser Component Manager
This module implements the maser component manager.
- class MaserComponentManager(initial_simulation_mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
A component manager that handles a Maser simulator and driver.
- __init__(initial_simulation_mode, initial_test_mode, logger, maser_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise a new instance.
- Parameters
initial_simulation_mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usemaser_url (
str
) – the URL of the masersimulator_url (
str
) – the URL of the simulated masercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- property simulation_mode: ska_tango_base.control_model.SimulationMode
Return the simulation mode.
- Return type
- Returns
the simulation mode
- property test_mode: ska_tango_base.control_model.TestMode
Return the test mode.
- Return type
- Returns
the test mode
Maser Device
This module implements the SatMaser device.
- class SatMaser(*args, **kwargs)[source]
An implementation of a Maser Tango device for SatLmc.
- GetVersionInfo()[source]
Get the version the device.
- Return type
- Returns
Version details of the device.
- class InitCommand(*args, **kwargs)[source]
Implement the device initialisation for the Maser device.
- Off()[source]
Put the device into off mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- On()[source]
Put the device into on mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- Reset()[source]
Reset the device.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- Standby()[source]
Put the device into standby mode.
The Maser is always on so this does nothing
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- ambient_temperature()[source]
Report the ambient temperature.
- Return type
- Returns
ambient temperature
- amplitude_405khz_voltage()[source]
Report the 405 kHz Amplitude.
- Return type
- Returns
the 405 kHz Amplitude
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
- Returns
a component manager for this device.
- dissociator_current()[source]
Report the dissociator current.
- Return type
- Returns
dissociator current
- external_bottom_heater_voltage()[source]
Report the external bottom heater voltage.
- Return type
- Returns
external bottom heater voltage
- external_high_current_value()[source]
Report the external high current value.
- Return type
- Returns
external high current value
- external_high_voltage_value()[source]
Report the external high voltage value.
- Return type
- Returns
external high voltage value
- external_side_heater_voltage()[source]
Report the external side heater voltage.
- Return type
- Returns
external side heater voltage
- health_changed(health)[source]
Handle change in this device’s health state.
This is a callback hook, called whenever the HealthModel’s evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
- hydrogen_pressure_measured()[source]
Report the measured hydrogen pressure.
- Return type
- Returns
hydrogen pressure measurement
- hydrogen_pressure_setting()[source]
Report the hydrogen pressure setting.
- Return type
- Returns
hydrogen pressure setting
- hydrogen_storage_heater_voltage()[source]
Report the hydrogen storage heater voltage.
- Return type
- Returns
hydrogen storage heater voltage
- hydrogen_storage_pressure()[source]
Report the hydrogen storage pressure.
- Return type
- Returns
hydrogen storage pressure
- init_command_objects()[source]
Initialise the command handlers for commands supported by this device.
- Return type
- init_device()[source]
Initialise the device.
This is overridden here to change the Tango serialisation model.
- Return type
- internal_bottom_heater_voltage()[source]
Report the internal bottom heater voltage.
- Return type
- Returns
internal bottom heater voltage
- internal_high_current_value()[source]
Report the internal high current value.
- Return type
- Returns
internal high current value
- internal_high_voltage_value()[source]
Report the internal high voltage value.
- Return type
- Returns
internal high voltage value
- internal_side_heater_voltage()[source]
Report the internal side heater voltage.
- Return type
- Returns
internal side heater voltage
- internal_top_heater_voltage()[source]
Report the internal top heater voltage.
- Return type
- Returns
internal top heater voltage
- is_attribute_allowed(attr_req_type)[source]
Protect attribute access before being updated otherwise it reports alarm.
- Parameters
attr_req_type (
tango.AttReqType
) – tango attribute type READ/WRITE- Return type
- Returns
True if the attribute can be read else False
- isolator_heater_voltage()[source]
Report the isolator heater voltage.
- Return type
- Returns
isolator heater voltage
- oscillator_100mhz_voltage()[source]
Report the oscillator_voltage 100MHz.
- Return type
- Returns
oscillator_voltage 100MHz
- oscillator_voltage()[source]
Report the OCXO varicap voltage.
- Return type
- Returns
the OCXO varicap voltage
- phase_lock_loop_lockstatus()[source]
Report the main PLL lock status.
- Return type
- Returns
main PLL lock status
- pirani_heater_voltage()[source]
Report the pirani heater voltage.
- Return type
- Returns
pirani heater voltage
- simulationMode(value)[source]
Set the simulation mode.
- Parameters
value (
ska_tango_base.control_model.SimulationMode
) – The simulation mode, as a SimulationMode value- Return type
- testMode(value)[source]
Set the test mode.
- Parameters
value (
ska_tango_base.control_model.TestMode
) – The test mode, as a TestMode value- Return type
- thermal_control_unit_heater_voltage()[source]
Report the thermal control unit heater voltage.
- Return type
- Returns
thermal control unit heater voltage
Phase Micro Stepper subpackage
This module contains the phase microstepper functionality.
- class PhaseMicroStepperComponentManager(initial_simulation_mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
A component manager that handles a PhaseMicroStepper simulator and driver.
- __init__(initial_simulation_mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise a new instance.
- Parameters
initial_simulation_mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usephase_url (
str
) – the URL of the PhaseMicroSteppersimulator_url (
str
) – the URL of the simulated PhaseMicroSteppercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- property simulation_mode: ska_tango_base.control_model.SimulationMode
Return the simulation mode.
- Return type
- Returns
the simulation mode
- property test_mode: ska_tango_base.control_model.TestMode
Return the test mode.
- Return type
- Returns
the test mode
- class PhaseMicroStepperDriver(initial_simulation__mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Device Server for the SAT.LMC PhaseMicrostepper (100MHz).
- __init__(initial_simulation__mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise the attributes and properties of the PhaseMicroStepper.
- Parameters
initial_simulation__mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usephase_url (
str
) – the URL of the PhaseMicroSteppersimulator_url (
str
) – the URL of the simulated PhaseMicroSteppercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- advance_10megahertz(advance)[source]
Advance the output phase of 10MHz OUT signals.
Value in units of 10ns. Acceptable range is from 0 to +9.
- advance_phase(advance)[source]
Advance the output phase of all OUT signals.in units of 1e-15s.
The resolution is about 1e-13. Acceptable range is from -5000000 to +5000000.
- advance_pps(advance)[source]
Advance the phase of PPS OUT signals in units of 10ns.
The acceptable range is limited from -99000000 to 30000000.
- property first_internal_oscillator: float
Return the status of internal oscillator 1.
value 1 is fixed point 6 decimal digits
- Return type
- Returns
the status of internal oscillator 1
- get_offset_frequency()[source]
Return the offset frequency set in the last setF command.
- Return type
- Returns
the command and offset frequency in units of 1e-20
- property internal_temperature: float
Return the status of internal Celsius temperature.
value 3 is fixed point 2 decimal digits.
- Return type
- Returns
the internal temperature
- phase_thread_running()[source]
Return the state of the PhaseMicroStepper hardware thread.
- Return type
- Returns
True if the PhaseMicroStepper thread is running
- property second_internal_oscillator: float
Return the status of internal oscillator 2.
value 2 is fixed point 6 decimal digits.
- Return type
- Returns
the status of internal oscillator 2
- property serial_number: int
Return the serial number of the EOG.
- Return type
- Returns
the serial number
- set_offset_frequency(offset)[source]
Set the offset frequency.
Sets the OUT signals respect to the frequency of 100MHz IN signal, in units of 1e-20. Resolution is better than 3e-20. The acceptable range is from -1000000000000 to 1000000000000.
- set_udp_address(udp_address)[source]
Set the UDP destination address.
If network address is 255.255.255.255, the UDP message is broadcast to all address.
- simulator_thread_running()[source]
Return the state of the PhaseMicroStepper simulator thread.
- Return type
- Returns
True if the simulator thread is running
- sync_pps()[source]
Shift the rising edge of 1PPS OUT signals in order to align it with 1PPS IN.
The value in the second string is 0 if the operation is OK, 1 in case of error. In most of the cases, the error is due to the absence of a valid signal on the 1PPS IN connector.
- Returns
the command string and second string as “syncPPS done, Error = 0”
- class PhaseMicroStepperHealthModel(health_changed_callback)[source]
A health model for a phase microstepper.
At present this uses the base health model; this is a placeholder for a future, better implementation.
- class SatPhaseMicroStepper(*args, **kwargs)[source]
An implementation of a PhaseMicroStepper Tango device for SatLmc.
- Advance10M(argin)[source]
Advance the output phase of 10MHz OUT signals, in units of 10ns.
Acceptable range is from 0 to +9.
- Parameters
argin (
int
) – the phase advance in units of 10ns- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class Advance10MCommand(*args, **kwargs)[source]
Class for handling the Advance10M command.
- AdvanceAllPhase(argin)[source]
Advance the output phase of all OUT signals, in units of 1e-15s.
The resolution is about 1e-13. Acceptable range is from -5000000 to +5000000.
- Parameters
argin (
int
) – the advance in units of 1e-15s- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class AdvanceAllPhaseCommand(*args, **kwargs)[source]
Class for handling the AdvanceAllPhase command.
- AdvancePPS(argin)[source]
Advance the phase of PPS OUT signals in units of 10ns.
The acceptable range is limited from -99000000 to 30000000.AdvancePPS.
- Parameters
argin (
int
) – the advance in units of 10ns- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class AdvancePPSCommand(*args, **kwargs)[source]
Class for handling the AdvancePPS command.
- GetOffsetFrequency()[source]
Get the offset frequency, in units of 1e-20.
- Return type
- Returns
the offset frequency send in the last setOffsetFrequencu command.
- class GetOffsetFrequencyCommand(*args, **kwargs)[source]
Class for handling the GetOffsetFrequency() command.
- class InitCommand(*args, **kwargs)[source]
Implement the device initialisation for the PhaseMicroStepper device.
- class SetIPAddressCommand(*args, **kwargs)[source]
Class for handling the SetIPAddress command.
- SetIPaddress(argin)[source]
Set the IP address of the EOG and closes the TCP connection.
- Parameters
argin (
str
) – the new IP address- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- SetOffsetFrequency(offset)[source]
Set the offset frequency.
Set the offset frequency of OUT signals respect to the frequency of 100MHz IN signal, in units of 1e-20. 7 Resolution is better than 3e-20. The acceptable range is from -1000000000000 to 1000000000000
- Parameters
offset (
int
) – the offset frequency- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
the command + the converted offset frequency
- class SetOffsetFrequencyCommand(*args, **kwargs)[source]
Class for handling the SetOffsetFrequency(argin) command.
- class SetUDPAddressCommand(*args, **kwargs)[source]
Class for handling the SetUDPAddress command.
- SetUDPaddress(argin)[source]
Set the UDP address destination.
- Parameters
argin (
str
) – the str- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- SyncPPS()[source]
External sync of the 1PPS outputs.
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class SyncPPSCommand(*args, **kwargs)[source]
Class for handling the SyncPPS(argin) command.
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
ska_sat_lmc.phasemicrostepper.phasemicrostepper_driver.PhaseMicroStepperDriver
- Returns
a component manager for this device.
- first_internal_oscillator()[source]
Report the status of the first internal oscillator.
- Return type
- Returns
oscillator 1 value (fixed point 6 decimal digits)
- health_changed(health)[source]
Handle change in this device’s health state.
This is a callback hook, called whenever the HealthModel’s evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
- init_command_objects()[source]
Initialise the command handlers for commands supported by this device.
- Return type
- init_device()[source]
Initialise the device.
This is overridden here to change the Tango serialisation model.
- Return type
- internal_temperature()[source]
Report the internal temperature of the devicec.
- Return type
- Returns
temperature in celcius (fixed point 2 decimal digits)
- is_Advance10M_allowed()[source]
Check if command Advance10M is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_AdvanceAllPhase_allowed()[source]
Check if command AdvanceAllPhase is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_AdvancePPS_allowed()[source]
Check if command AdvancePPS is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SetIPaddress_allowed()[source]
Check if command SetIPaddress is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SetUDPaddress_allowed()[source]
Check if command SetUDPaddress is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SyncPPS_allowed()[source]
Check if command SyncPPS is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_attribute_allowed(attr_req_type)[source]
Protect attribute access before being updated otherwise it reports alarm.
- Parameters
attr_req_type (
tango.AttReqType
) – tango attribute type READ/WRITE- Return type
- Returns
True if the attribute can be read else False
- second_internal_oscillator()[source]
Report the status of the second internal oscillator.
- Return type
- Returns
oscillator 2 value (fixed point 6 decimal digits)
- serial_number()[source]
Report the serial number of the device.
- Return type
- Returns
the serial number
- simulationMode(value)[source]
Set the simulation mode.
- Parameters
value (
ska_tango_base.control_model.SimulationMode
) – The simulation mode, as a SimulationMode value- Return type
- testMode(value)[source]
Set the test mode.
- Parameters
value (
ska_tango_base.control_model.TestMode
) – The test mode, as a TestMode value- Return type
Phase Micro Stepper Component Manager
This module implements phasemicrostepper component management.
- class PhaseMicroStepperComponentManager(initial_simulation_mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
A component manager that handles a PhaseMicroStepper simulator and driver.
- __init__(initial_simulation_mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise a new instance.
- Parameters
initial_simulation_mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usephase_url (
str
) – the URL of the PhaseMicroSteppersimulator_url (
str
) – the URL of the simulated PhaseMicroSteppercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- property simulation_mode: ska_tango_base.control_model.SimulationMode
Return the simulation mode.
- Return type
- Returns
the simulation mode
- property test_mode: ska_tango_base.control_model.TestMode
Return the test mode.
- Return type
- Returns
the test mode
Phase Micro Stepper Device
This module implements the SATPhaseMicroStepper device.
- class SatPhaseMicroStepper(*args, **kwargs)[source]
An implementation of a PhaseMicroStepper Tango device for SatLmc.
- Advance10M(argin)[source]
Advance the output phase of 10MHz OUT signals, in units of 10ns.
Acceptable range is from 0 to +9.
- Parameters
argin (
int
) – the phase advance in units of 10ns- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class Advance10MCommand(*args, **kwargs)[source]
Class for handling the Advance10M command.
- AdvanceAllPhase(argin)[source]
Advance the output phase of all OUT signals, in units of 1e-15s.
The resolution is about 1e-13. Acceptable range is from -5000000 to +5000000.
- Parameters
argin (
int
) – the advance in units of 1e-15s- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class AdvanceAllPhaseCommand(*args, **kwargs)[source]
Class for handling the AdvanceAllPhase command.
- AdvancePPS(argin)[source]
Advance the phase of PPS OUT signals in units of 10ns.
The acceptable range is limited from -99000000 to 30000000.AdvancePPS.
- Parameters
argin (
int
) – the advance in units of 10ns- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class AdvancePPSCommand(*args, **kwargs)[source]
Class for handling the AdvancePPS command.
- GetOffsetFrequency()[source]
Get the offset frequency, in units of 1e-20.
- Return type
- Returns
the offset frequency send in the last setOffsetFrequencu command.
- class GetOffsetFrequencyCommand(*args, **kwargs)[source]
Class for handling the GetOffsetFrequency() command.
- class InitCommand(*args, **kwargs)[source]
Implement the device initialisation for the PhaseMicroStepper device.
- class SetIPAddressCommand(*args, **kwargs)[source]
Class for handling the SetIPAddress command.
- SetIPaddress(argin)[source]
Set the IP address of the EOG and closes the TCP connection.
- Parameters
argin (
str
) – the new IP address- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- SetOffsetFrequency(offset)[source]
Set the offset frequency.
Set the offset frequency of OUT signals respect to the frequency of 100MHz IN signal, in units of 1e-20. 7 Resolution is better than 3e-20. The acceptable range is from -1000000000000 to 1000000000000
- Parameters
offset (
int
) – the offset frequency- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
the command + the converted offset frequency
- class SetOffsetFrequencyCommand(*args, **kwargs)[source]
Class for handling the SetOffsetFrequency(argin) command.
- class SetUDPAddressCommand(*args, **kwargs)[source]
Class for handling the SetUDPAddress command.
- SetUDPaddress(argin)[source]
Set the UDP address destination.
- Parameters
argin (
str
) – the str- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- SyncPPS()[source]
External sync of the 1PPS outputs.
- Return type
typing.Tuple
[typing.List
[ska_tango_base.commands.ResultCode
],typing.List
[typing.Optional
[str
]]]- Returns
A tuple containing a return code and a string message indicating status. The message is for information purpose only.
- class SyncPPSCommand(*args, **kwargs)[source]
Class for handling the SyncPPS(argin) command.
- create_component_manager()[source]
Create and return a component manager for this device.
- Return type
ska_sat_lmc.phasemicrostepper.phasemicrostepper_driver.PhaseMicroStepperDriver
- Returns
a component manager for this device.
- first_internal_oscillator()[source]
Report the status of the first internal oscillator.
- Return type
- Returns
oscillator 1 value (fixed point 6 decimal digits)
- health_changed(health)[source]
Handle change in this device’s health state.
This is a callback hook, called whenever the HealthModel’s evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.
- Parameters
health (
ska_tango_base.control_model.HealthState
) – the new health value- Return type
- init_command_objects()[source]
Initialise the command handlers for commands supported by this device.
- Return type
- init_device()[source]
Initialise the device.
This is overridden here to change the Tango serialisation model.
- Return type
- internal_temperature()[source]
Report the internal temperature of the devicec.
- Return type
- Returns
temperature in celcius (fixed point 2 decimal digits)
- is_Advance10M_allowed()[source]
Check if command Advance10M is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_AdvanceAllPhase_allowed()[source]
Check if command AdvanceAllPhase is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_AdvancePPS_allowed()[source]
Check if command AdvancePPS is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SetIPaddress_allowed()[source]
Check if command SetIPaddress is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SetUDPaddress_allowed()[source]
Check if command SetUDPaddress is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_SyncPPS_allowed()[source]
Check if command SyncPPS is allowed in the current device state.
- Return type
- Returns
True
if the command is allowed
- is_attribute_allowed(attr_req_type)[source]
Protect attribute access before being updated otherwise it reports alarm.
- Parameters
attr_req_type (
tango.AttReqType
) – tango attribute type READ/WRITE- Return type
- Returns
True if the attribute can be read else False
- second_internal_oscillator()[source]
Report the status of the second internal oscillator.
- Return type
- Returns
oscillator 2 value (fixed point 6 decimal digits)
- serial_number()[source]
Report the serial number of the device.
- Return type
- Returns
the serial number
- simulationMode(value)[source]
Set the simulation mode.
- Parameters
value (
ska_tango_base.control_model.SimulationMode
) – The simulation mode, as a SimulationMode value- Return type
- testMode(value)[source]
Set the test mode.
- Parameters
value (
ska_tango_base.control_model.TestMode
) – The test mode, as a TestMode value- Return type
Phase Micro Stepper Driver
This module implements the PhaseMicrostepper driver.
- class PhaseMicroStepperDriver(initial_simulation__mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Device Server for the SAT.LMC PhaseMicrostepper (100MHz).
- __init__(initial_simulation__mode, initial_test_mode, logger, phase_url, simulator_url, communication_status_changed_callback, component_fault_callback)[source]
Initialise the attributes and properties of the PhaseMicroStepper.
- Parameters
initial_simulation__mode (
ska_tango_base.control_model.SimulationMode
) – the simulation mode that the component should start ininitial_test_mode (
ska_tango_base.control_model.TestMode
) – the simulation mode that the component should start inlogger (
logging.Logger
) – a logger for this object to usephase_url (
str
) – the URL of the PhaseMicroSteppersimulator_url (
str
) – the URL of the simulated PhaseMicroSteppercommunication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Callable
[[bool
],None
]) – callback to be called when the component faults (or stops faulting)
- advance_10megahertz(advance)[source]
Advance the output phase of 10MHz OUT signals.
Value in units of 10ns. Acceptable range is from 0 to +9.
- advance_phase(advance)[source]
Advance the output phase of all OUT signals.in units of 1e-15s.
The resolution is about 1e-13. Acceptable range is from -5000000 to +5000000.
- advance_pps(advance)[source]
Advance the phase of PPS OUT signals in units of 10ns.
The acceptable range is limited from -99000000 to 30000000.
- property first_internal_oscillator: float
Return the status of internal oscillator 1.
value 1 is fixed point 6 decimal digits
- Return type
- Returns
the status of internal oscillator 1
- get_offset_frequency()[source]
Return the offset frequency set in the last setF command.
- Return type
- Returns
the command and offset frequency in units of 1e-20
- property internal_temperature: float
Return the status of internal Celsius temperature.
value 3 is fixed point 2 decimal digits.
- Return type
- Returns
the internal temperature
- phase_thread_running()[source]
Return the state of the PhaseMicroStepper hardware thread.
- Return type
- Returns
True if the PhaseMicroStepper thread is running
- property second_internal_oscillator: float
Return the status of internal oscillator 2.
value 2 is fixed point 6 decimal digits.
- Return type
- Returns
the status of internal oscillator 2
- property serial_number: int
Return the serial number of the EOG.
- Return type
- Returns
the serial number
- set_offset_frequency(offset)[source]
Set the offset frequency.
Sets the OUT signals respect to the frequency of 100MHz IN signal, in units of 1e-20. Resolution is better than 3e-20. The acceptable range is from -1000000000000 to 1000000000000.
- set_udp_address(udp_address)[source]
Set the UDP destination address.
If network address is 255.255.255.255, the UDP message is broadcast to all address.
- simulator_thread_running()[source]
Return the state of the PhaseMicroStepper simulator thread.
- Return type
- Returns
True if the simulator thread is running
- sync_pps()[source]
Shift the rising edge of 1PPS OUT signals in order to align it with 1PPS IN.
The value in the second string is 0 if the operation is OK, 1 in case of error. In most of the cases, the error is due to the absence of a valid signal on the 1PPS IN connector.
- Returns
the command string and second string as “syncPPS done, Error = 0”
Phase Micro Stepper Health Model
An implementation of a health model for a phase microstepper.
Component subpackage
This module implements infrastructure for component management in SAT.LMC.
- class CommunicationStatus(value)[source]
The status of a component manager’s communication with its component.
- DISABLED = 1
The component manager is not trying to establish/maintain a channel of communication with its component. For example:
if communication with the component is connection-oriented, then there is no connection, and the component manager is not trying to establish a connection.
if communication with the component is by event subscription, then the component manager is unsubscribed from events.
if communication with the component is by periodic connectionless polling, then the component manager is not performing that polling.
- ESTABLISHED = 3
The component manager has established a channel of communication with its component. For example:
if communication with the component is connection-oriented, then the component manager has connected to its component.
- NOT_ESTABLISHED = 2
The component manager is trying to establish/maintain a channel of communication with its component, but that channel is not currently established. For example:
if communication with the component is connection-oriented, then the component manager has failed to establish/maintain the connection.
- class DeviceComponentManager(fqdn, logger, communication_status_changed_callback, component_fault_callback, health_changed_callback=None)[source]
An abstract component manager for a Tango device component.
- __init__(fqdn, logger, communication_status_changed_callback, component_fault_callback, health_changed_callback=None)[source]
Initialise a new instance.
- Parameters
fqdn (
str
) – the FQDN of the devicelogger (
logging.Logger
) – the logger to be used by this object.communication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Optional
[typing.Callable
[[bool
],None
]]) – callback to be called when the component faults (or stops faulting)health_changed_callback (
typing.Optional
[typing.Callable
[[typing.Optional
[ska_tango_base.control_model.HealthState
]],None
]]) – callback to be called when the health state of the device changes. The value it is called with will normally be a HealthState, but may be None if the admin mode of the device indicates that the device’s health should not be included in upstream health rollup.
- property health: Optional[ska_tango_base.control_model.HealthState]
Return the evaluated health state of the device.
This will be either the health state that the device reports, or None if the device is in an admin mode that indicates that its health should not be rolled up.
- Return type
- Returns
the evaluated health state of the device.
- class SatComponentManager(logger, communication_status_changed_callback, component_fault_callback, *args, **kwargs)[source]
A base component manager for Sat LMC.
This class exists to modify the interface of the
ska_tango_base.base.component_manager.BaseComponentManager
. TheBaseComponentManager
accepts anop_state_model` argument, and is expected to interact directly with it. This is not a very good design decision. It is better to leave the ``op_state_model
behind in the device, and drive it indirectly through callbacks.Therefore this class accepts two callback arguments: one for when communication with the component changes and one for when the component fault status changes. In the last case, callback hooks are provided so that the component can indicate the change to this component manager.
- __init__(logger, communication_status_changed_callback, component_fault_callback, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
logger (
logging.Logger
) – a logger for this instance to usecommunication_status_changed_callback (
typing.Optional
[typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]]) – callback to be called when the status of communications between the component manager and its component changes.component_fault_callback (
typing.Optional
[typing.Callable
[[bool
],None
]]) – callback to be called when the fault status of the component changes.args (
typing.Any
) – other positional argskwargs (
typing.Any
) – other keyword args
- property communication_status: ska_sat_lmc.component.component_manager.CommunicationStatus
Return the communication status of this component manager.
This is implemented as a replacement for the
is_communicating
property, which should be deprecated.- Return type
- Returns
status of the communication channel with the component.
- component_fault_changed(faulty)[source]
Handle notification that the component’s fault status has changed.
This is a callback hook, to be passed to the managed component.
- property faulty: Optional[bool]
Return whether this component manager is currently experiencing a fault.
- Return type
- Returns
whether this component manager is currently experiencing a fault.
- property is_communicating: bool
Return communication with the component is established.
SatLmc uses the more expressive
communication_status
for this, but this is still needed as a base classes hook.- Return type
- Returns
whether communication with the component is established.
- update_communication_status(communication_status)[source]
Handle a change in communication status.
This is a helper method for use by subclasses.
- Parameters
communication_status (
ska_sat_lmc.component.component_manager.CommunicationStatus
) – the new communication status of the component manager.- Return type
- update_component_fault(faulty)[source]
Update the component fault status, calling callbacks as required.
This is a helper method for use by subclasses.
- Parameters
faulty (
typing.Optional
[bool
]) – whether the component has faulted. IfFalse
, then this is a notification that the component has recovered from a fault.- Return type
- check_communicating(func)[source]
Return a function that checks component communication before calling a function.
The component manager needs to have established communications with the component, in order for the function to be called.
This function is intended to be used as a decorator:
@check_communicating def scan(self): ...
- Parameters
func (
typing.TypeVar
(Wrapped
, bound=typing.Callable
[...
,typing.Any
])) – the wrapped function- Return type
typing.TypeVar
(Wrapped
, bound=typing.Callable
[...
,typing.Any
])- Returns
the wrapped function
Component Manager
This module implements a functionality for component managers in SAT.LMC.
- class CommunicationStatus(value)[source]
The status of a component manager’s communication with its component.
- DISABLED = 1
The component manager is not trying to establish/maintain a channel of communication with its component. For example:
if communication with the component is connection-oriented, then there is no connection, and the component manager is not trying to establish a connection.
if communication with the component is by event subscription, then the component manager is unsubscribed from events.
if communication with the component is by periodic connectionless polling, then the component manager is not performing that polling.
- ESTABLISHED = 3
The component manager has established a channel of communication with its component. For example:
if communication with the component is connection-oriented, then the component manager has connected to its component.
- NOT_ESTABLISHED = 2
The component manager is trying to establish/maintain a channel of communication with its component, but that channel is not currently established. For example:
if communication with the component is connection-oriented, then the component manager has failed to establish/maintain the connection.
- class SatComponentManager(logger, communication_status_changed_callback, component_fault_callback, *args, **kwargs)[source]
A base component manager for Sat LMC.
This class exists to modify the interface of the
ska_tango_base.base.component_manager.BaseComponentManager
. TheBaseComponentManager
accepts anop_state_model` argument, and is expected to interact directly with it. This is not a very good design decision. It is better to leave the ``op_state_model
behind in the device, and drive it indirectly through callbacks.Therefore this class accepts two callback arguments: one for when communication with the component changes and one for when the component fault status changes. In the last case, callback hooks are provided so that the component can indicate the change to this component manager.
- __init__(logger, communication_status_changed_callback, component_fault_callback, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
logger (
logging.Logger
) – a logger for this instance to usecommunication_status_changed_callback (
typing.Optional
[typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]]) – callback to be called when the status of communications between the component manager and its component changes.component_fault_callback (
typing.Optional
[typing.Callable
[[bool
],None
]]) – callback to be called when the fault status of the component changes.args (
typing.Any
) – other positional argskwargs (
typing.Any
) – other keyword args
- property communication_status: ska_sat_lmc.component.component_manager.CommunicationStatus
Return the communication status of this component manager.
This is implemented as a replacement for the
is_communicating
property, which should be deprecated.- Return type
- Returns
status of the communication channel with the component.
- component_fault_changed(faulty)[source]
Handle notification that the component’s fault status has changed.
This is a callback hook, to be passed to the managed component.
- property faulty: Optional[bool]
Return whether this component manager is currently experiencing a fault.
- Return type
- Returns
whether this component manager is currently experiencing a fault.
- property is_communicating: bool
Return communication with the component is established.
SatLmc uses the more expressive
communication_status
for this, but this is still needed as a base classes hook.- Return type
- Returns
whether communication with the component is established.
- update_communication_status(communication_status)[source]
Handle a change in communication status.
This is a helper method for use by subclasses.
- Parameters
communication_status (
ska_sat_lmc.component.component_manager.CommunicationStatus
) – the new communication status of the component manager.- Return type
- update_component_fault(faulty)[source]
Update the component fault status, calling callbacks as required.
This is a helper method for use by subclasses.
- Parameters
faulty (
typing.Optional
[bool
]) – whether the component has faulted. IfFalse
, then this is a notification that the component has recovered from a fault.- Return type
Device Component Manager
This module implements an abstract component manager for simple object components.
- class DeviceComponentManager(fqdn, logger, communication_status_changed_callback, component_fault_callback, health_changed_callback=None)[source]
An abstract component manager for a Tango device component.
- __init__(fqdn, logger, communication_status_changed_callback, component_fault_callback, health_changed_callback=None)[source]
Initialise a new instance.
- Parameters
fqdn (
str
) – the FQDN of the devicelogger (
logging.Logger
) – the logger to be used by this object.communication_status_changed_callback (
typing.Callable
[[ska_sat_lmc.component.component_manager.CommunicationStatus
],None
]) – callback to be called when the status of the communications channel between the component manager and its component changescomponent_fault_callback (
typing.Optional
[typing.Callable
[[bool
],None
]]) – callback to be called when the component faults (or stops faulting)health_changed_callback (
typing.Optional
[typing.Callable
[[typing.Optional
[ska_tango_base.control_model.HealthState
]],None
]]) – callback to be called when the health state of the device changes. The value it is called with will normally be a HealthState, but may be None if the admin mode of the device indicates that the device’s health should not be included in upstream health rollup.
- property health: Optional[ska_tango_base.control_model.HealthState]
Return the evaluated health state of the device.
This will be either the health state that the device reports, or None if the device is in an admin mode that indicates that its health should not be rolled up.
- Return type
- Returns
the evaluated health state of the device.
Util
This module implements utils for component managers in SAT.LMC.
- check_communicating(func)[source]
Return a function that checks component communication before calling a function.
The component manager needs to have established communications with the component, in order for the function to be called.
This function is intended to be used as a decorator:
@check_communicating def scan(self): ...
- Parameters
func (
typing.TypeVar
(Wrapped
, bound=typing.Callable
[...
,typing.Any
])) – the wrapped function- Return type
typing.TypeVar
(Wrapped
, bound=typing.Callable
[...
,typing.Any
])- Returns
the wrapped function
Testing subpackage
This subpackage contains modules for helper classes in the SKA SAT LMC tests.
- class TangoHarness(*args, **kwargs)[source]
Abstract base class for Tango test harnesses.
This does very little, because it needs to support both harnesses that directly interact with Tango, and wrapper harnesses that add functionality to another harness.
The one really important thing it does do, is ensure that
ska_sat_lmc.device_proxy.SatDeviceProxy
uses this harness’sconnection factory
to make connections.- __init__(*args, **kwargs)[source]
Initialise a new instance.
- Parameters
args (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- property connection_factory: Callable[[str], tango.DeviceProxy]
Establish connections to devices with this factory.
- Raises
NotImplementedError – because this method is abstract
- Return type
- property fqdns: list[str]
Return FQDNs of devices in this harness.
- Raises
NotImplementedError – because this method is abstract
- get_device(fqdn)[source]
Create and return a proxy to the device at the given FQDN.
- Parameters
fqdn (
str
) – FQDN of the device for which a proxy is required- Raises
NotImplementedError – because this method is abstract
- Return type
Tango harness
This module implements a SatLMC test harness for Tango devices.
- class BaseTangoHarness(device_info, logger, *args, **kwargs)[source]
A basic test harness for Tango devices.
This harness doesn’t stand up any device; it assumes that devices are already running. It is thus useful for testing against deployed devices.
- __init__(device_info, logger, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
device_info (
typing.Optional
[ska_sat_lmc.testing.tango_harness.SatDeviceInfo
]) – object that makes device info availablelogger (
logging.Logger
) – a logger for the harnessargs (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- property connection_factory: Callable[[str], tango.DeviceProxy]
Establish connections to devices with this factory.
This class uses
tango.DeviceProxy
as its connection factory.- Return type
- Returns
a DeviceProxy for use in establishing connections.
- property fqdns: list[str]
Return the FQDNs of devices in this harness.
- Returns
a list of FQDNs of devices in this harness.
- class ClientProxyTangoHarness(device_info, logger, *args, **kwargs)[source]
A test harness for Tango devices that can return tailored client proxies.
- __init__(device_info, logger, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
device_info (
typing.Optional
[ska_sat_lmc.testing.tango_harness.SatDeviceInfo
]) – object that makes device info availablelogger (
logging.Logger
) – a logger for the harnessargs (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- class MockingTangoHarness(harness, mock_factory, initial_mocks, *args, **kwargs)[source]
A Tango test harness that mocks out devices not under test.
This harness wraps another harness, but only uses that harness for a specified set of devices under test, and mocks out all others.
- __init__(harness, mock_factory, initial_mocks, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
harness – the wrapped harness
mock_factory – the factory to be used to build mocks
initial_mocks – a pre-build dictionary of mocks to be used for particular
args – additional positional arguments
kwargs – additional keyword arguments
- property connection_factory: Callable[[str], tango.DeviceProxy]
Establish connections to devices with this factory.
This is where we check whether the requested device is on our list. Devices on the list are passed to the connection factory of the wrapped harness. Devices not on the list are intercepted and given a mock factory instead.
- Return type
- Returns
a factory that putatively provides device connections, but might actually provide mocks.
- class SatDeviceInfo(path, package, devices=None)[source]
Data structure class that loads and holds information about devices.
It can provide that information in the format required by
tango.test_context.MultiDeviceTestContext
.- __init__(path, package, devices=None)[source]
Create a new instance.
- Parameters
path – the path to the configuration file that contains information about all available devices.
package – name of the package from which to draw classes
devices – option specification of devices. If not provided, then devices can be added via the
include_device()
method.
- as_mdtc_device_info()[source]
Return this device info in a format required by MultiDeviceTestContext.
- Returns
device info in a format required by
tango.test_context.MultiDeviceTestContext
.
- property fqdn_map: dict[str, str]
Return a dictionary that maps device names onto FQDNs.
- Returns
a mapping from device names to FQDNs
- property fqdns: Iterable[str]
Return a list of device fqdns.
- Return type
- Returns
a list of device FQDNs
- include_device(name, proxy, patch=None)[source]
Include a device in this specification.
- Parameters
name – the name of the device to be included. The source data must contain configuration information for a device listed under this name
proxy – the proxy class to use to access the device.
patch – an optional device class with which to patch the named device
- Raises
ValueError – if the named device does not exist in the source configuration data
- property proxy_map: dict[str, type[SatDeviceProxy]]
Return a map from FQDN to proxy type.
- Returns
a map from FQDN to proxy type
- class StartingStateTangoHarness(harness, bypass_cache=True, check_ready=True, set_test_mode=True, *args, **kwargs)[source]
A test harness for testing Tango devices.
It provides for certain actions and checks that ensure that devices are in a desired initial state prior to testing.
Specifically, it can:
Tell devices to bypass their attribute cache, so that written values can be read back immediately
Check that devices have completed initialisation and transitioned out of the INIT state
Set device testMode to TestMode.TEST
- __init__(harness, bypass_cache=True, check_ready=True, set_test_mode=True, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
harness (
ska_sat_lmc.testing.tango_harness.TangoHarness
) – the wrapped harnessbypass_cache (
bool
) – whether to tell each device to bypass its attribute cache so that written attribute values can be read back again immediatelycheck_ready (
bool
) – whether to check whether each device has completed initialisation and transitioned out of INIT state before allowing tests to be run.set_test_mode (
bool
) – whether to set the device into test mode before allowing tests to be run.args (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- class TangoHarness(*args, **kwargs)[source]
Abstract base class for Tango test harnesses.
This does very little, because it needs to support both harnesses that directly interact with Tango, and wrapper harnesses that add functionality to another harness.
The one really important thing it does do, is ensure that
ska_sat_lmc.device_proxy.SatDeviceProxy
uses this harness’sconnection factory
to make connections.- __init__(*args, **kwargs)[source]
Initialise a new instance.
- Parameters
args (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- property connection_factory: Callable[[str], tango.DeviceProxy]
Establish connections to devices with this factory.
- Raises
NotImplementedError – because this method is abstract
- Return type
- property fqdns
Return FQDNs of devices in this harness.
- Raises
NotImplementedError – because this method is abstract
- get_device(fqdn)[source]
Create and return a proxy to the device at the given FQDN.
- Parameters
fqdn (
str
) – FQDN of the device for which a proxy is required- Raises
NotImplementedError – because this method is abstract
- Return type
- class TestContextTangoHarness(device_info, logger, process=False, *args, **kwargs)[source]
A test harness for testing SatLMC Tango devices in a lightweight test context.
It stands up a
tango.test_context.MultiDeviceTestContext
with the specified devices.- __init__(device_info, logger, process=False, *args, **kwargs)[source]
Initialise a new instance.
- Parameters
device_info (
typing.Optional
[ska_sat_lmc.testing.tango_harness.SatDeviceInfo
]) – object that makes device info availablelogger (
logging.Logger
) – a logger for the harnessprocess (
bool
) – whether to run the test context in a separate process or notargs (
typing.Any
) – additional positional argumentskwargs (
typing.Any
) – additional keyword arguments
- property connection_factory: Callable[[str], tango.DeviceProxy]
Establish connections to devices with this factory.
This class uses
tango.DeviceProxy
but patches it to use the long-form FQDN, as a workaround to an issue withtango.test_context.MultiDeviceTestContext
.- Return type
- Returns
a DeviceProxy for use in establishing connections.
Testing subpackage
This subpackage contains modules for test mocking in the SKA SatLMC tests.
- class MockCallable(return_value=None, called_timeout=5.0, not_called_timeout=1.0)[source]
This class implements a mock callable.
It is useful for when you want to assert that a callable is called, but the callback is called asynchronously, so that you might have to wait a short time for the call to occur.
If you use a regular mock for the callback, your tests will end up littered with sleeps:
antenna_apiu_proxy.start_communicating() communication_status_changed_callback.assert_called_once_with( CommunicationStatus.NOT_ESTABLISHED ) time.sleep(0.1) communication_status_changed_callback.assert_called_once_with( CommunicationStatus.ESTABLISHED )
These sleeps waste time, slow down the tests, and they are difficult to tune: maybe you only need to sleep 0.1 seconds on your development machine, but what if the CI pipeline deploys the tests to an environment that needs 0.2 seconds for this?
This class solves that by putting each call to the callback onto a queue. Then, each time we assert that a callback was called, we get a call from the queue, waiting if necessary for the call to arrive, but with a timeout:
antenna_apiu_proxy.start_communicating() communication_status_changed_callback.assert_next_call( CommunicationStatus.NOT_ESTABLISHED ) communication_status_changed_callback.assert_next_call( CommunicationStatus.ESTABLISHED )
- __init__(return_value=None, called_timeout=5.0, not_called_timeout=1.0)[source]
Initialise a new instance.
- Parameters
return_value (
typing.Optional
[typing.Any
]) – what to return when calledcalled_timeout (
float
) – how long to wait for a call to occur when we are expecting one. It makes sense to wait a long time for the expected call, as it will generally arrive much much sooner anyhow, and failure for the call to arrive in time will cause the assertion to fail. The default is 5 seconds.not_called_timeout (
float
) – how long to wait for a callback when we are not expecting one. Since we need to wait the full timeout period in order to determine that a callback has not arrived, asserting that a call has not been made can severely slow down your tests. By keeping this timeout quite short, we can speed up our tests, at the risk of prematurely passing an assertion. The default is 0.5
- assert_last_call(*args, **kwargs)[source]
Assert the arguments of the last call to this mock callback.
The “last” call is the last call before an attempt to get the next event times out.
This is useful for situations where we know a device may call a callback several time, and we don’t care too much about the exact order of calls, but we do know what the final call should be.
- Parameters
args (
typing.Any
) – positional args that the call is asserted to havekwargs (
typing.Any
) – keyword args that the call is asserted to have
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_next_call(*args, **kwargs)[source]
Assert the arguments of the next call to this mock callback.
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Parameters
args (
typing.Any
) – positional args that the call is asserted to havekwargs (
typing.Any
) – keyword args that the call is asserted to have
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_not_called(timeout=None)[source]
Assert that the callback still has not been called after the timeout period.
This is a slow method because it has to wait the full timeout period in order to determine that the call is not coming. An optional timeout parameter is provided for the situation where you are happy for the assertion to pass after a shorter wait time.
- Parameters
timeout (
typing.Optional
[float
]) – optional timeout for the check. If not provided, the default is the class setting- Return type
- get_next_call()[source]
Return the arguments of the next call to this mock callback.
This is useful for situations where you do not know exactly what the arguments of the next call will be, so you cannot use the
assert_next_call()
method. Instead you want to assert some specific properties on the arguments:(args, kwargs) = mock_callback.get_next_call() event_data = args[0].attr_value assert event_data.name == "healthState" assert event_data.value == HealthState.UNKNOWN assert event_data.quality == tango.AttrQuality.ATTR_VALID
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Raises
AssertionError – if the callback has not been called
- Return type
typing.Tuple
[typing.Sequence
[typing.Any
],typing.Sequence
[typing.Any
]]- Returns
an (args, kwargs) tuple
- class MockChangeEventCallback(event_name, called_timeout=5.0, not_called_timeout=0.5)[source]
This class implements a mock change event callback.
It is a special case of a
MockCallable
where the callable expects to be called with event_name, event_value and event_quality arguments (which is how DeviceProxy calls its change event callbacks).- __init__(event_name, called_timeout=5.0, not_called_timeout=0.5)[source]
Initialise a new instance.
- Parameters
event_name (
str
) – the name of the event for which this callable is a callbackcalled_timeout (
float
) – how long to wait for a call to occur when we are expecting one. It makes sense to wait a long time for the expected call, as it will generally arrive much much sooner anyhow, and failure for the call to arrive in time will cause the assertion to fail. The default is 5 seconds.not_called_timeout (
float
) – how long to wait for a callback when we are not expecting one. Since we need to wait the full timeout period in order to determine that a callback has not arrived, asserting that a call has not been made can severely slow down your tests. By keeping this timeout quite short, we can speed up our tests, at the risk of prematurely passing an assertion. The default is 0.5
- assert_last_change_event(value, quality=tango.AttrQuality.ATTR_VALID)[source]
Assert the arguments of the last call to this mock callback.
The “last” call is the last call before an attempt to get the next event times out.
This is useful for situations where we know a device may fire several events, and we don’t know or care about the exact order of events, but we do know what the final event should be. For example, when we tell a Controller to turn on, it has to turn many devices on, which have to turn many devices on, etc. With so m
- Parameters
value (
typing.Any
) – the asserted value of the change eventquality (
tango.AttrQuality
) – the asserted quality of the change event. This is optional, with a default of ATTR_VALID.
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_next_change_event(value, quality=tango.AttrQuality.ATTR_VALID)[source]
Assert the arguments of the next call to this mock callback.
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Parameters
value (
typing.Any
) – the asserted value of the change eventquality (
tango.AttrQuality
) – the asserted quality of the change event. This is optional, with a default of ATTR_VALID.
- Raises
AssertionError – if the callback has not been called.
- Return type
- class MockDeviceBuilder(from_factory=<class 'unittest.mock.Mock'>)[source]
This module implements a mock builder for tango devices.
- __init__(from_factory=<class 'unittest.mock.Mock'>)[source]
Create a new instance.
- Parameters
from_factory – an optional factory from which to draw the original mock
- add_attribute(name, value)[source]
Tell this builder to build mocks with a given attribute.
TODO: distinguish between read-only and read-write attributes
- Parameters
name (
str
) – name of the attributevalue (
typing.Any
) – the value of the attribute
- Return type
- add_command(name, return_value)[source]
Tell this builder to build mocks with a specified command.
And that the command returns the provided value.
- Parameters
name (
str
) – name of the commandreturn_value (
typing.Any
) – what the command should return
- Return type
- add_result_command(name, result_code, status='Mock information-only message')[source]
Tell this builder to build mocks with a specified command.
And that the command returns (ResultCode, [message, message_uid]) or (ResultCode, message) tuples as required.
- Parameters
name (
str
) – the name of the commandresult_code (
ska_tango_base.commands.ResultCode
) – theska_tango_base.commands.ResultCode
that the command should returnstatus (
str
) – an information-only message for the command to return
- Return type
- set_state(state)[source]
Tell this builder to build mocks with the state set as specified.
- Parameters
state (
tango.DevState
) – the state of the mock- Return type
Mock callable
This module implements infrastructure for mocking callbacks and other callables.
- class MockCallable(return_value=None, called_timeout=5.0, not_called_timeout=1.0)[source]
This class implements a mock callable.
It is useful for when you want to assert that a callable is called, but the callback is called asynchronously, so that you might have to wait a short time for the call to occur.
If you use a regular mock for the callback, your tests will end up littered with sleeps:
antenna_apiu_proxy.start_communicating() communication_status_changed_callback.assert_called_once_with( CommunicationStatus.NOT_ESTABLISHED ) time.sleep(0.1) communication_status_changed_callback.assert_called_once_with( CommunicationStatus.ESTABLISHED )
These sleeps waste time, slow down the tests, and they are difficult to tune: maybe you only need to sleep 0.1 seconds on your development machine, but what if the CI pipeline deploys the tests to an environment that needs 0.2 seconds for this?
This class solves that by putting each call to the callback onto a queue. Then, each time we assert that a callback was called, we get a call from the queue, waiting if necessary for the call to arrive, but with a timeout:
antenna_apiu_proxy.start_communicating() communication_status_changed_callback.assert_next_call( CommunicationStatus.NOT_ESTABLISHED ) communication_status_changed_callback.assert_next_call( CommunicationStatus.ESTABLISHED )
- __init__(return_value=None, called_timeout=5.0, not_called_timeout=1.0)[source]
Initialise a new instance.
- Parameters
return_value (
typing.Optional
[typing.Any
]) – what to return when calledcalled_timeout (
float
) – how long to wait for a call to occur when we are expecting one. It makes sense to wait a long time for the expected call, as it will generally arrive much much sooner anyhow, and failure for the call to arrive in time will cause the assertion to fail. The default is 5 seconds.not_called_timeout (
float
) – how long to wait for a callback when we are not expecting one. Since we need to wait the full timeout period in order to determine that a callback has not arrived, asserting that a call has not been made can severely slow down your tests. By keeping this timeout quite short, we can speed up our tests, at the risk of prematurely passing an assertion. The default is 0.5
- assert_last_call(*args, **kwargs)[source]
Assert the arguments of the last call to this mock callback.
The “last” call is the last call before an attempt to get the next event times out.
This is useful for situations where we know a device may call a callback several time, and we don’t care too much about the exact order of calls, but we do know what the final call should be.
- Parameters
args (
typing.Any
) – positional args that the call is asserted to havekwargs (
typing.Any
) – keyword args that the call is asserted to have
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_next_call(*args, **kwargs)[source]
Assert the arguments of the next call to this mock callback.
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Parameters
args (
typing.Any
) – positional args that the call is asserted to havekwargs (
typing.Any
) – keyword args that the call is asserted to have
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_not_called(timeout=None)[source]
Assert that the callback still has not been called after the timeout period.
This is a slow method because it has to wait the full timeout period in order to determine that the call is not coming. An optional timeout parameter is provided for the situation where you are happy for the assertion to pass after a shorter wait time.
- Parameters
timeout (
typing.Optional
[float
]) – optional timeout for the check. If not provided, the default is the class setting- Return type
- get_next_call()[source]
Return the arguments of the next call to this mock callback.
This is useful for situations where you do not know exactly what the arguments of the next call will be, so you cannot use the
assert_next_call()
method. Instead you want to assert some specific properties on the arguments:(args, kwargs) = mock_callback.get_next_call() event_data = args[0].attr_value assert event_data.name == "healthState" assert event_data.value == HealthState.UNKNOWN assert event_data.quality == tango.AttrQuality.ATTR_VALID
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Raises
AssertionError – if the callback has not been called
- Return type
typing.Tuple
[typing.Sequence
[typing.Any
],typing.Sequence
[typing.Any
]]- Returns
an (args, kwargs) tuple
- class MockChangeEventCallback(event_name, called_timeout=5.0, not_called_timeout=0.5)[source]
This class implements a mock change event callback.
It is a special case of a
MockCallable
where the callable expects to be called with event_name, event_value and event_quality arguments (which is how DeviceProxy calls its change event callbacks).- __init__(event_name, called_timeout=5.0, not_called_timeout=0.5)[source]
Initialise a new instance.
- Parameters
event_name (
str
) – the name of the event for which this callable is a callbackcalled_timeout (
float
) – how long to wait for a call to occur when we are expecting one. It makes sense to wait a long time for the expected call, as it will generally arrive much much sooner anyhow, and failure for the call to arrive in time will cause the assertion to fail. The default is 5 seconds.not_called_timeout (
float
) – how long to wait for a callback when we are not expecting one. Since we need to wait the full timeout period in order to determine that a callback has not arrived, asserting that a call has not been made can severely slow down your tests. By keeping this timeout quite short, we can speed up our tests, at the risk of prematurely passing an assertion. The default is 0.5
- assert_last_change_event(value, quality=tango.AttrQuality.ATTR_VALID)[source]
Assert the arguments of the last call to this mock callback.
The “last” call is the last call before an attempt to get the next event times out.
This is useful for situations where we know a device may fire several events, and we don’t know or care about the exact order of events, but we do know what the final event should be. For example, when we tell a Controller to turn on, it has to turn many devices on, which have to turn many devices on, etc. With so m
- Parameters
value (
typing.Any
) – the asserted value of the change eventquality (
tango.AttrQuality
) – the asserted quality of the change event. This is optional, with a default of ATTR_VALID.
- Raises
AssertionError – if the callback has not been called.
- Return type
- assert_next_change_event(value, quality=tango.AttrQuality.ATTR_VALID)[source]
Assert the arguments of the next call to this mock callback.
If the call has not been made, this method will wait up to the specified timeout for a call to arrive.
- Parameters
value (
typing.Any
) – the asserted value of the change eventquality (
tango.AttrQuality
) – the asserted quality of the change event. This is optional, with a default of ATTR_VALID.
- Raises
AssertionError – if the callback has not been called.
- Return type
Mock device
This module implements infrastructure for mocking tango devices.
- class MockDeviceBuilder(from_factory=<class 'unittest.mock.Mock'>)[source]
This module implements a mock builder for tango devices.
- __init__(from_factory=<class 'unittest.mock.Mock'>)[source]
Create a new instance.
- Parameters
from_factory – an optional factory from which to draw the original mock
- add_attribute(name, value)[source]
Tell this builder to build mocks with a given attribute.
TODO: distinguish between read-only and read-write attributes
- Parameters
name (
str
) – name of the attributevalue (
typing.Any
) – the value of the attribute
- Return type
- add_command(name, return_value)[source]
Tell this builder to build mocks with a specified command.
And that the command returns the provided value.
- Parameters
name (
str
) – name of the commandreturn_value (
typing.Any
) – what the command should return
- Return type
- add_result_command(name, result_code, status='Mock information-only message')[source]
Tell this builder to build mocks with a specified command.
And that the command returns (ResultCode, [message, message_uid]) or (ResultCode, message) tuples as required.
- Parameters
name (
str
) – the name of the commandresult_code (
ska_tango_base.commands.ResultCode
) – theska_tango_base.commands.ResultCode
that the command should returnstatus (
str
) – an information-only message for the command to return
- Return type
- set_state(state)[source]
Tell this builder to build mocks with the state set as specified.
- Parameters
state (
tango.DevState
) – the state of the mock- Return type
Comms
This module implements the Socket communication.
- class TcpSocket(host=None, port=None, eol=b'\\n', timeout=30.0, logger=None)[source]
Socket communication class to access raw socket layer using TCP protocol.
- Example
from comms import TcpSocket sock = TcpSocket(“tcp://127.0.0.1:45678”)
- class UdpSocket(host=None, port=None, eol=b'\\n', timeout=30.0, logger=None)[source]
Socket communication class to access raw socket layer using UDP protocol.
- Example
from comms import UdpSocket sock = UdpSocket(“udp://127.0.0.1:45678”)
- parse_url(url)[source]
Parse the url string into component host & port.
- Parameters
url – an encoded string containing host, port and socket type
- Raises
ValueError – invalid URL specified
- Returns
the decoded host & port
Device Proxy
This module implements a base device proxy for Sat LMC devices.
- class SatDeviceProxy(fqdn, logger, connect=True, connection_factory=None, pass_through=True)[source]
This class implements a base device proxy for Sat LMC devices.
At present it supports:
deferred connection: we can create the proxy without immediately trying to connect to the proxied device.
a :py:meth:
connect
method, for establishing that connection latera :py:meth:
check_initialised
method, for checking that / waiting until the proxied device has transitioned out of INIT state.Ability to subscribe to change events via the :py:meth:
add_change_event_callback
method.
- __init__(fqdn, logger, connect=True, connection_factory=None, pass_through=True)[source]
Create a new instance.
- Parameters
fqdn (
str
) – fqdn of the device to be proxiedlogger (
logging.Logger
) – a logger for this proxy to useconnection_factory (
typing.Optional
[typing.Callable
[[str
],tango.DeviceProxy
]]) – how we obtain a connection to the device we are proxying. By default this istango.DeviceProxy
, but occasionally this needs to be changed. For example, when testing against atango.test_context.MultiDeviceTestContext
, we obtain connections to the devices under test viatest_context.get_device(fqdn)
.connect (
bool
) – whether to connect immediately to the device. If False, then the device may be connected later by calling theconnect()
method.pass_through (
bool
) – whether to pass unrecognised attribute accesses through to the underlying connection. Defaults toTrue
but this will likely change in future once our proxies are more mature.
- add_change_event_callback(attribute_name, callback, stateless=True)[source]
Register a callback for change events being pushed by the device.
- Parameters
attribute_name (
str
) – the name of the attribute for which change events are subscribed.callback (
typing.Callable
[[str
,typing.Any
,tango.AttrQuality
],None
]) – the function to be called when a change event arrives.stateless (
bool
) – whether to use Tango’s stateless subscription feature
- Return type
- check_initialised(max_time=120.0)[source]
Check that the device has completed initialisation.
That is, check that the device is no longer in state INIT.
- Parameters
max_time (
float
) – the (optional) maximum time, in seconds, to wait for the device to complete initialisation. The default is 120.0 i.e. two minutes. If set to 0 or None, the device is checked once and the call returns immediately.- Return type
- Returns
whether the device is initialised yet
- classmethod set_default_connection_factory(connection_factory)[source]
Set the default connection factory for this class.
This is super useful for unit testing: we can mock out
tango.DeviceProxy
altogether, by simply setting this class’s default connection factory to a mock factory.- Parameters
connection_factory (
typing.Callable
[[str
],tango.DeviceProxy
]) – default factory to use to establish a connection to the device- Return type
Health
This module implements infrastructure for health management.
- class HealthModel(health_changed_callback)[source]
A simple health model the supports.
HealthState.UNKNOWN – when communication with the component is not established.
HealthState.FAILED – when the component has faulted
HealthState.OK – when neither of the above conditions holds.
This health model does not support HealthState.DEGRADED. It is up to subclasses to implement support for DEGRADED if required.
- __init__(health_changed_callback)[source]
Initialise a new instance.
- Parameters
health_changed_callback (
typing.Callable
[[ska_tango_base.control_model.HealthState
],None
]) – callback to be called whenever there is a change to this this health model’s evaluated health state.
- component_fault(faulty)[source]
Handle a component experiencing or recovering from a fault.
This is a callback hook that is called when the component goes into or out of FAULT state.
- evaluate_health()[source]
Re-evaluate the health state.
This method contains the logic for evaluating the health. It is this method that should be extended by subclasses in order to define how health is evaluated by their particular device.
- Return type
- Returns
the new health state.
- property health_state: ska_tango_base.control_model.HealthState
Return the health state.
- Return type
- Returns
the health state.
Release
Release information for SKA SAT LMC Python Package.
Utils
Module for utils.
- class ThreadsafeCheckingMeta(name: str, bases: tuple[type], attrs: dict)[source]
Metaclass that checks for methods being run by multiple concurrent threads.
- call_with_json(func, **kwargs)[source]
Call a command with a json string.
Allows the calling of a command that accepts a JSON string as input, with the actual unserialised parameters.
For example, suppose you need to use Allocate(resources) command to tell a controller device to allocate certain stations and tiles to a subarray. Allocate accepts a single JSON string argument. Instead of
Example:
parameters={"id": id, "stations": stations, "tiles": tiles} json_string=json.dumps(parameters) controller.Allocate(json_string)
save yourself the trouble and
Example:
call_with_json(controller.Allocate, id=id, stations=stations, tiles=tiles)
- Parameters
func (
typing.Callable
) – the function handle to callkwargs (
typing.Any
) – parameters to be jsonified and passed to func
- Return type
- Returns
the return value of func
- class json_input(schema_path=None)[source]
Parse and validate json string input.
Method decorator that parses and validates JSON input into a python dictionary, which is then passed to the method as kwargs. The wrapped method is thus called with a JSON string, but can be implemented as if it had been passed a sequence of named arguments.
If the string cannot be parsed as JSON, an exception is raised.
For example, conceptually, Controller.Allocate() takes as arguments a subarray id, an array of stations, and an array of tiles. In practice, however, these arguments are encoded into a JSON string. Implement the function with its conceptual parameters, then wrap it in this decorator:
Example:
@json_input def Controller.Allocate(id, stations, tiles):
The decorator will provide the JSON interface and handle the decoding for you.
- __init__(schema_path=None)[source]
Initialise a callable json_input object.
To function as a device method generator.
- Parameters
schema_path (
typing.Optional
[str
]) – an optional path to a schema against which the JSON should be validated. Not working at the moment, so leave it None.
- threadsafe(func)[source]
Use this method as a decorator for marking a method as threadsafe.
This tells the
ThreadsafeCheckingMeta
metaclass that it is okay for the decorated method to have more than one thread in it at a time. The metaclass will still raise an exception if the same thread enters the method multiple times, because re-entry is a common cause of deadlock.- Parameters
func (
typing.Callable
) – the method to be marked as threadsafe- Return type
- Returns
the method, marked as threadsafe