Station subpackage

This subpackage implements station functionality for MCCS.

class MccsStation(*args: Any, **kwargs: Any)[source]

An implementation of a station beam Tango device for MCCS.

AcquireDataForCalibration(channel: int) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Instruct the SpsStation to start acquiring calibration data from the tiles.

Parameters:

channel – the frequency channel to calibrate for.

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ApplyConfiguration(transaction_id: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Apply the aggregated channel table to this Station’s SpsStation.

Parameters:

transaction_id – transaction id for the configuration

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> dp.command_inout("ApplyConfiguration")
ApplyPointingDelays(argin: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Set the pointing delay parameters of this Station’s Tiles.

Parameters:

argin – switch time, in ISO formatted time. Default: now

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> time_string = switch time as ISO formatted time
>>> dp.command_inout("ApplyPointingDelays", time_string)
ConfigureChannels(argin: list[int]) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Set the beamformer table entries for a station beam.

Entries are defined as a flattened 2D array, for a maximum of 48 entries Each entry is 7 channels long and corresponds to 8 consecutive frequency channels.

Parameters:

argin – list of channel block description. Elements are: * channel block index: value in range 0:47 for the channel block to set * start_channel - (int) region starting channel, even in range 0 to 510 * beam_index - (int) beam used for this region with range 0 to 47 * subarray_id - (int) Subarray: 0 is reserved for unallocated entry * subarray_logical_channel - (int) logical channel # in the subarray * subarray_beam_id - (int) ID of the subarray beam * substation_id - (int) Substation * aperture_id: ID of the aperture (station*100+substation?)

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> dp.command_inout("ConfigureChannels", block_table)
ConfigureSemiStatic(argin: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Configure semi static information like position of antennas.

Parameters:

argin – Semi static information

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> config = json.dumps({
    "station": {
        "StationId": 1,
        "ref_latitude": 1.0,
        "ref_longitude": 1.0,
        "ref_height": 1.0,
    }
})
>>> dp.command_inout("ConfigureSemiStatic", config)
DeallocateSubarray(subarray_id: int) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Deallocates entries relative to a subarray in aggregate tables.

Parameters:

subarray_id – the ID of the subarray to deallocate

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

EndScan(subarray_id: int) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Stop the current scan associated with the station_beam.

Parameters:

subarray_id – the subarray for which the command applies

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

GetPointingDelays(argin: str) ndarray[source]

Get Pointing Coefficients.

Parameters:

argin – stringified dict of args

Returns:

The pointing delays as pairs of (delay, delay rate) in EEP order.

class GetPointingDelaysCommand(*args: Any, **kwargs: Any)[source]

Class for handling the GetPointingDelays() command.

__init__(component_manager: StationComponentManager, logger: Logger | None = None) None[source]

Initialise a new instance.

Parameters:
  • component_manager – the device to which this command belongs.

  • logger – a logger for this command to use.

do(*args: Any, **kwargs: Any) ndarray[source]

Implement MccsStation.GetPointingDelaysCommand() command.

Parameters:
  • args – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs – unspecified keyword arguments. This should be empty and is provided for type hinting only

Returns:

json encoded string containing list of dictionaries

class InitCommand(*args: Any, **kwargs: Any)[source]

A class for MccsStation’s Init command.

The do() method below is called upon MccsStation’s initialisation.

do(*args: Any, **kwargs: Any) tuple[ska_control_model.ResultCode, str][source]

Initialise the MccsStation.

Parameters:
  • args – positional args to the component manager method

  • kwargs – keyword args to the component manager method

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Initialise() tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Initialise this station’s tiles.

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

LoadPointingDelays(argin: ndarray) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Set the pointing delay parameters of this Station’s Tiles.

Parameters:

argin – an array containing a beam index followed by antenna delays in antenna EEP order. 1 + 256*2 = 513 elements.

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> dp.command_inout("LoadPointingDelays", delay_list)
Scan(argin: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Start the scan associated with the station_beam.

Parameters:

argin – Configuration parameters encoded in a json string

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StartAcquisition(argin: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Start data acquisition.

Parameters:

argin – json dictionary with optional keywords:

  • start_time - (ISO UTC time) start time

  • delay - (int) delay start if StartTime is not specified, default 2s

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

class StartAcquisitionCommand(*args: Any, **kwargs: Any)[source]

Class for handling the StartAcquisition() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/Mccs_StartAcquisition.json",
    "title": "StartAcquisition schema",
    "description": "Schema for StartAcquisition command",
    "type": "object",
    "properties": {
        "start_time": {
            "description": "",
            "type": "string",
            "format": "time"
        },
        "delay": {
            "description": "An acquisition start delay",
            "type": "integer"
        }
    }
}
__init__(command_tracker: ska_tango_base.base.CommandTracker, component_manager: StationComponentManager, callback: Callable | None = None, logger: Logger | None = None) None[source]

Initialise a new instance.

Parameters:
  • command_tracker – the device’s command tracker

  • component_manager – the device’s component manager

  • callback – an optional callback to be called when this command starts and finishes.

  • logger – a logger for this command to log with.

StopTracking(track_id: int) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Stop tracking an object.

Parameters:

track_id – The ID of the thread you wish to stop tracking.

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopTrackingAll() tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Stop all tracking.

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

TrackObject(argin: str) tuple[list[ska_control_model.ResultCode], list[Optional[str]]][source]

Track an object through the sky.

Parameters:

argin – Configuration parameters encoded in a json string

Returns:

The id of the tracking thread.

__init__(*args: Any, **kwargs: Any) None[source]

Initialise this device object.

Parameters:
  • args – positional args to the init

  • kwargs – keyword args to the init

beamTrls() list[str][source]

Return the TRLs of station beams associated with this station.

Returns:

the TRLs of station beams associated with this station

beamformerTable() list[list[int]][source]

Return the ids of the channels configured for this beam.

Returns:

channel table

calibrationCoefficients() list[float][source]

Return the calibration coefficients for the station.

Todo:

How big should this array be? 4 complex values (Jones matrix) per channel. This station can have up to 16 tiles of up to 16 antennas, so that is 8 x 16 x 16 = 2048 coefficients per channel. But how many channels? 384 channels, 786432 elements per station (402M for SKA Low)

Returns:

the calibration coefficients

calibrationJobId() int[source]

Return the calibration job id.

Returns:

the calibration job id

create_component_manager() StationComponentManager[source]

Create and return a component manager for this device.

Returns:

a component manager for this device.

daqJobId() int[source]

Return the DAQ job id.

Returns:

the DAQ job id

dataDirectory() str[source]

Return the data directory.

(the parent directory for all files generated by this station)

Returns:

the data directory

dataReceivedResult() tuple[str, str] | None[source]

Read the result of the receiving of data.

Returns:

A tuple containing the data mode of transmission and a json string with any additional data about the data such as the file name.

delayCentre(value: list[float]) None[source]

Set the delay centre of the station.

Parameters:

value – WGS84 position

healthModelParams(argin: str) None[source]

Set the params for health transition rules.

Parameters:

argin – JSON-string of dictionary of health states

healthReport() str[source]

Get the health report.

Returns:

the health report.

health_changed(health: ska_control_model.HealthState) None[source]

Handle change in this device’s health state.

This is a callback hook, called whenever the HealthModel’s evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed.

Parameters:

health – the new health value

init_command_objects() None[source]

Set up the handler objects for Commands.

init_device() None[source]

Initialise the device.

This is overridden here to change the Tango serialisation model.

isCalibrated() bool[source]

Return a flag indicating whether this station is currently calibrated or not.

Returns:

a flag indicating whether this station is currently calibrated or not.

isConfigured() bool[source]

Return a flag indicating whether this station is currently configured or not.

Returns:

a flag indicating whether this station is currently configured or not.

isInitialised() bool[source]

Return true if all tiles in the SpsStation are initialised.

Returns:

true if all tiles in the SpsStation are initialised.

isSynchronised() bool[source]

Return true if all tiles in the SpsStation are synchronised.

Returns:

true if all tiles in the SpsStation are synchronised.

is_On_allowed() bool[source]

Check if command Off is allowed in the current device state.

Returns:

True if the command is allowed

lastPointingDelays() list[float][source]

Return last pointing delays applied to the tiles.

Values are initialised to 0.0 if they haven’t been set. These values are in antenna EEP order.

Returns:

last pointing delays applied to the tiles.

numberOfChannels() int[source]

Return the total number of channels in the beamformer.

Returns:

the total number of channels

outsideTemperature() float | None[source]

Return the OutsideTemperature.

Returns:

the OutsideTemperature.

refHeight() float[source]

Return the refHeight attribute.

Returns:

the ellipsoidal height of the station reference position

refLatitude() float[source]

Return the refLatitude attribute.

Returns:

the WGS84 Latitude of the station reference position

refLongitude() float[source]

Return the refLongitude attribute.

Returns:

the WGS84 Longitude of the station reference position

transientBufferTrl() str[source]

Return the TRL of the TANGO device that managers the transient buffer.

Returns:

the TRL of the TANGO device that managers the transient buffer

class StationComponentManager(*args: Any, **kwargs: Any)[source]

A component manager for a station.

__init__(station_id: int, ref_latitude: float, ref_longitude: float, ref_height: float, field_station_trl: str, antenna_trls: Sequence[str], antenna_station_locations: ndarray, antenna_element_ids: list[int], station_calibrator_trl: str, sps_station_trl: str, logger: Logger, communication_state_callback: Callable[[ska_control_model.CommunicationStatus], None], component_state_callback: Callable[[...], None]) None[source]

Initialise a new instance.

Parameters:
  • station_id – the id of this station

  • ref_latitude – reference latitude of the station.

  • ref_longitude – reference longitude of the station.

  • ref_height – reference ellipsoidal height of the station.

  • field_station_trl – TRL of the Tango device that manages this station’s FieldStation

  • antenna_trls – TRLs of the Tango devices and manage this station’s antennas

  • antenna_station_locations – array of the x, y, z positions of the antennas

  • antenna_element_ids – list of the element IDs of the antennas

  • station_calibrator_trl – TRL of the Tango devices and manage this station’s station calibrator

  • sps_station_trl – TRL of the Tango devices and manage this station’s Spshw station

  • logger – the logger to be used by this object.

  • communication_state_callback – callback to be called when the status of the communications channel between the component manager and its component changes

  • component_state_callback – callback to be called when the component state changes

acquire_data_for_calibration(channel: int, task_callback: Callable | None = None, task_abort_event: Event | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the AcquireDataForCalibration slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • channel – The channel to acquire data for

  • task_callback – Update task state, defaults to None

  • task_abort_event – Check for abort, defaults to None

Returns:

Task status and response message

apply_configuration(transaction_id: str, task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Apply the beamformer and calibration configuration to SpsStation.

Parameters:
  • transaction_id – the transaction id for the configuration

  • task_callback – Update task state, defaults to None

Returns:

a result code and response string

apply_pointing_delays(load_time: str, task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the apply_pointing method.

This method returns immediately after it submitted self._apply_pointing for execution.

Parameters:
  • load_time – time at which to load the pointing delay

  • task_callback – Update task state, defaults to None

Returns:

a task status and response message

property beamformer_table: list[list[int]]

Return the channel table reformatted as would be needed by ConfigureChannels.

Returns:

reformatted channel table

configure_channels(channel_blocks: list[int]) ska_control_model.ResultCode[source]

Configure channels for a station beam in the channel table.

Parameters:

channel_blocks – List of channel table entries

Returns:

a result code and response string

configure_semi_static(task_callback: Callable | None = None, *, interface: str | None = None, station_config: dict, field_station_config: dict | None, antenna_config: dict | None) tuple[ska_control_model.TaskStatus, str][source]

Submit the configure method.

TODO Check if this is required anymore This method returns immediately after it submitted self._configure_semi_static for execution.

Parameters:
  • interface – the schema version this is running against.

  • station_config – Configuration specification for the station device.

  • field_station_config – Configuration specification for the field station device.

  • antenna_config – Configuration specification for the antenna deviced.

  • task_callback – Update task state, defaults to None

Returns:

a result code and response string

deallocate_subarray(subarray_id: int) ska_control_model.ResultCode[source]

Clear channels for a station beam in the channel table.

Parameters:

subarray_id – subarray_id to clear

Returns:

a result code and response string

end_scan(subarray_id: int, task_callback: Callable | None = None, task_abort_event: Event | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the EndScan slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • subarray_id – The subarray for which the command applies

  • task_callback – Update task state, defaults to None

  • task_abort_event – Check for abort, defaults to None

Returns:

Task status and response message

get_pointing_delays(task_callback: Callable | None = None, *, interface: str | None = None, pointing_type: str, values: dict, time_step: float = 10.0, reference_time: str | None = None) ndarray[source]

Get the pointing delays for this station.

Parameters:
  • interface – the schema version this is running against.

  • pointing_type – the type of pointing requested

  • values – the pointing values, either in alt_az or ra_dec

  • reference_time – time in which coordinates are equal, in ISO8601 formatted astropy.Time time

  • time_step – How long between each time step in seconds

  • task_callback – callback to signal end of command

Returns:

list of pointing delays

initialise(task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the initialise method.

This method returns immediately after it submitted self._initialise for execution.

Parameters:

task_callback – Update task state, defaults to None

Returns:

a task staus and response message

property is_configured: bool

Return whether this station component manager is configured.

Returns:

whether this station component manager is configured.

load_pointing_delays(delays: ndarray, task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Load the pointing delays for this station.

Parameters:
  • delays – list of delays

  • task_callback – Update task state, defaults to None

Returns:

a result code and list of pointing delays

property number_of_channels: int

Return the total number of channels in the beamformer.

Returns:

the total numebr of channels

off(task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the _off method.

This method returns immediately after it submitted self._off for execution.

Parameters:

task_callback – Update task state, defaults to None

Returns:

a result code and response message

on(task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Submit the _on method.

This method returns immediately after it submitted self._on for execution.

Parameters:

task_callback – Update task state, defaults to None

Returns:

a task staus and response message

property power_state: ska_control_model.PowerState | None

Return my power state.

Returns:

my power state

property power_state_lock: RLock

Return the power state lock of this component manager.

Returns:

the power state lock of this component manager.

property ref_height: float

Return whether this stations height.

Returns:

this stations height.

property ref_latitude: float

Return whether this stations latitude.

Returns:

this stations latitude.

property ref_longitude: float

Return whether this stations longitude.

Returns:

this stations longitude.

scan(task_callback: Callable | None = None, *, interface: str | None = None, subarray_id: int, scan_id: int, start_time: str | None = None, duration: float | None = 0.0) tuple[ska_control_model.TaskStatus, str][source]

Submit the Scan slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • interface – the schema version this is running against.

  • subarray_id – The subarray for whic the command applies

  • scan_id – The ID for this scan

  • start_time – UTC time for begin of scan, None for immediate start

  • duration – Scan duration in seconds. 0.0 or omitted means forever

  • task_callback – Update task state, defaults to None

Returns:

Task status and response message

property scan_ids: list[int]

Return the current scan IDs for each subarray.

Returns:

list of scan IDs starting from subarray 1, 0 = subarray not scanning

setup_pointing_helper() None[source]

Set up the pointing helper.

start_acquisition(task_callback: Callable | None = None, *, start_time: str | None = None, delay: int | None = 2) tuple[ska_control_model.TaskStatus, str][source]

Submit the start acquisition method.

This method returns immediately after it submitted self._start_acquisition for execution.

Parameters:
  • start_time – the time at which to start data acquisition, defaults to None

  • delay – delay start, defaults to 2

  • task_callback – Update task state, defaults to None

Returns:

a task staus and response message

start_communicating() None[source]

Establish communication with the station components.

stop_communicating() None[source]

Break off communication with the station components.

stop_tracking(track_id: int, task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Stop a tracking thread.

Parameters:
  • track_id – the id of the tracking you wish to stop.

  • task_callback – Update task state, defaults to None

Returns:

a result code and list of pointing delays

stop_tracking_all(task_callback: Callable | None = None) tuple[ska_control_model.TaskStatus, str][source]

Stop all tracking threads.

Parameters:

task_callback – Update task state, defaults to None

Returns:

a result code and list of pointing delays

property tileprogrammingstate: tuple[str]

Return the tileprogrammingstate of the SpsStation.

Returns:

the tileprogrammingstate of the SpsStation.

track_object(task_callback: Callable | None = None, task_abort_event: Event | None = None, *, interface: str | None = None, pointing_type: str, values: dict, scan_time: float, reference_time: str | None = None, station_beam_number: int | None = 0, time_step: float | None = 1.0) tuple[ska_control_model.TaskStatus, str][source]

Submit the track_object slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • task_callback – Update task state, defaults to None

  • task_abort_event – Check for abort, defaults to None

  • interface – the schema version this is running against.

  • pointing_type – the type of pointing requested

  • values – Coordinates for object to be tracked

  • scan_time – Time to scan object in seconds

  • reference_time – time in which coordinates are equal, in ISO8601 formatted astropy.Time time

  • station_beam_number – The station beam number to be used

  • time_step – How long between each time step in seconds

Returns:

A return code and a unique command ID.

update_communication_state(communication_state: ska_control_model.CommunicationStatus) None[source]

Update the status of communication with the component.

Overridden here to fire the “is configured” callback whenever communication is freshly established

Parameters:

communication_state – the status of communication with the component

class StationHealthModel(*args: Any, **kwargs: Any)[source]

A health model for a station.

__init__(field_station_trl: str, sps_station_trl: str, antenna_trls: Sequence[str], health_changed_callback: ska_low_mccs_common.health.HealthChangedCallbackProtocol, thresholds: dict[str, float] | None = None) None[source]

Initialise a new instance.

Parameters:
  • field_station_trl – the TRL of this station’s FieldStation

  • sps_station_trl – the TRL of this MccsStation’s SpsStation.

  • antenna_trls – the TRLs of this station’s antennas

  • health_changed_callback – callback to be called whenever there is a change to this this health model’s evaluated health state.

  • thresholds – the threshold parameters for the health rules

antenna_health_changed(antenna_trl: str, antenna_health: ska_control_model.HealthState | None) None[source]

Handle a change in antenna health.

Parameters:
  • antenna_trl – the TRL of the antenna whose health has changed

  • antenna_health – the health state of the specified antenna, or None if the antenna’s admin mode indicates that its health should not be rolled up.

evaluate_health() tuple[ska_control_model.HealthState, str][source]

Compute overall health of the station.

The overall health is based on the fault and communication status of the station overall, together with the health of the FieldStation, antennas and SpsStation that it manages.

This implementation simply sets the health of the station to the health of its least healthy component.

Returns:

an overall health of the station

field_station_health_changed(field_station_trl: str | None = None, field_station_health: ska_control_model.HealthState | None = None) None[source]

Handle a change in FieldStation health.

Parameters:
  • field_station_health – the health state of the FieldStation, or None if the FieldStation’s admin mode indicates that its health should not be rolled up.

  • field_station_trl – the TRL of the FieldStation.

sps_station_health_changed(sps_station_trl: str, sps_station_health: ska_control_model.HealthState | None) None[source]

Handle a change in SpsStation health.

Parameters:
  • sps_station_trl – the TRL of the SpsStation

  • sps_station_health – the health state of the specified SpsStation, or None if the SpsStation’s admin mode indicates that its health should not be rolled up.

class StationObsStateModel(logger: Logger, obs_state_changed_callback: Callable[[ska_control_model.ObsState], None])[source]

An observation state model for a station.

__init__(logger: Logger, obs_state_changed_callback: Callable[[ska_control_model.ObsState], None]) None[source]

Initialise a new instance.

Parameters:
  • logger – a logger for this model to use

  • obs_state_changed_callback – callback to be called when there is a change to this model’s evaluated observation state.

is_configured_changed(is_configured: bool) None[source]

Handle a change in whether the station is configured.

Parameters:

is_configured – whether the station is configured

update_obs_state() None[source]

Update the observation state, ensuring that the callback is called.