"""Helper functions for working with SKA Low subarrays."""
from __future__ import annotations
import json
import logging
from ska_control_model import ObsState
from tango import Database, DevFailed, DeviceProxy, DevState
from toolz import get_in
from ska_oso_scripting.engineering.low.utils.aiv_utils import (
get_cbf_devices,
get_device,
get_mccs_device,
get_sdp_devices,
get_tmc_devices,
station_label_from_id,
)
from ska_oso_scripting.engineering.low.utils.constants import (
ABORTABLE_STATES,
RESTARTABLE_STATES,
)
from ska_oso_scripting.engineering.low.utils.tango_utils import (
member,
restart_devices,
single_prop,
wait_for,
)
from ska_oso_scripting.engineering.low.workarounds.csp import (
ensure_cbf_processors_registered,
ensure_pst_routing,
)
from ska_oso_scripting.engineering.low.workarounds.mccs import ensure_sps_csp_endpoints
__all__ = [
"back_to_empty",
"pre_obs_checks",
]
LOG = logging.getLogger(__name__)
def build_device_tree_for_subarray(
tm_subarray: DeviceProxy | int, assign_resources_block: dict | None = None
) -> dict:
"""
Build the device hierarchy used by abort_and_restart_tree for a given TMC
subarray.
Returns a nested dict mapping DeviceProxy -> subtree dict.
"""
if isinstance(tm_subarray, int):
tmc = get_tmc_devices()
[tm_subarray] = (
sa for sa in tmc.tmc_subarray_nodes if int(member(sa)) == tm_subarray
)
subarray_id = int(member(tm_subarray))
# get the subarray devices corresponding to the supplied TMC subarray
mccs_subarray = get_device(single_prop(tm_subarray, "MccsSubarrayFQDN"))
csp_subarray = get_device(single_prop(tm_subarray, "CspSubarrayFQDN"))
sdp_subarray = get_device(single_prop(tm_subarray, "SdpSubarrayFQDN"))
mccs_resources = json.loads(mccs_subarray.assignedResources)
mccs_subarray_beam_ids = set(mccs_resources["subarray_beam_ids"])
if assign_resources_block:
mccs_subarray_beam_ids |= set(
f"{subarray_id:02}-{beam['subarray_beam_id']:02}"
for beam in get_in(["mccs", "subarray_beams"], assign_resources_block, [])
)
mccs_subarray_beams = [
get_mccs_device(f"low-mccs/subarraybeam/{subarray_beam_id}")
for subarray_beam_id in mccs_subarray_beam_ids
]
mccs_station_beams = [
get_mccs_device(f"low-mccs/beam/{station_beam_id}")
for station_beam_id in mccs_resources["station_beam_ids"]
]
device_tree = {
tm_subarray: {
mccs_subarray: {
dev: {} for dev in mccs_station_beams + mccs_subarray_beams
},
csp_subarray: {
get_device(csp_subarray.cbfSubarrayAddress): {},
},
sdp_subarray: {},
},
}
return device_tree
[docs]
def back_to_empty(
tm_subarray: DeviceProxy | int, assign_resources_block: dict | None = None
):
"""Take a TMC subarray and do our best to bring it back to EMPTY.
Split into 2 parts - 1) building the device tree then 2) operate on the
device tree - to making integration testing using a shallower device tree
easier.
"""
device_tree = build_device_tree_for_subarray(tm_subarray, assign_resources_block)
abort_and_restart_tree(device_tree)
[docs]
def pre_obs_checks(assign_resources_block: dict):
"""Make sure all devices relevant to an AssignResources are EMPTY."""
# get DeviceProxys relevant to current obs
cbf = get_cbf_devices()
sdp = get_sdp_devices()
sps_stations = [
get_mccs_device(f"low-mccs/spsstation/{station_name}")
for beam in assign_resources_block["mccs"]["subarray_beams"]
for station_name in {
station_label_from_id(aperture["station_id"])
for aperture in beam["apertures"]
}
]
tmc = get_tmc_devices()
tmc_subarray = next(
s
for s in tmc.tmc_subarray_nodes
if int(member(s)) == assign_resources_block["subarray_id"]
)
LOG.info(
"Ensure stations' CSP endpoints are all configured "
"according to their Tango properties"
)
ensure_sps_csp_endpoints(sps_stations)
LOG.info("Ensure CBF processors are registered with the CBF allocator")
ensure_cbf_processors_registered(cbf)
# TODO remove when SKB-852 is resolved
LOG.info("Ensure SDP devices are all ON")
for dev in [sdp.controller, *sdp.subarrays]:
state = dev.State()
if state in {DevState.STANDBY, DevState.OFF}:
LOG.warning(f"{dev.dev_name()} in state {state}, issuing On() - SKB-852")
dev.On()
LOG.info(f"Ensure {tmc_subarray.dev_name()} is EMPTY before using it")
back_to_empty(tmc_subarray, assign_resources_block)
LOG.info("Ensure that allocated PST beams are IDLE")
# TODO this needs an SKB
# Check that the PST beam is not in ABORTED. This is relevant to every
# obs even if it's not a PST scan, because CSP chokes when it's not healthy
pst_beams = [
get_device(d) for d in Database().get_device_exported_for_class("PstBeam")
]
# TODO likewise, PST beams we actually intend to use must not be READY
pst_cfg = assign_resources_block["csp"].get("pst")
used_pst_beam_ids = pst_cfg["pst_beam_ids"] if pst_cfg else []
faulty_pst_beams = {
beam
for beam in pst_beams
if beam.obsState in {ObsState.ABORTED, ObsState.FAULT}
or beam.State() == DevState.OFF
or (beam.obsState != ObsState.IDLE and beam.deviceID in used_pst_beam_ids)
}
for beam in faulty_pst_beams:
if beam.State() == DevState.OFF:
LOG.warning(
f"Turning On() PST beam {beam.dev_name()} "
f"in state {beam.obsState.name}"
)
beam.On()
if beam.obsState == ObsState.READY:
LOG.warning(
f"Abort()ing PST beam {beam.dev_name()} "
f"in state {beam.obsState.name}"
)
beam.Abort()
if beam.obsState in {ObsState.ABORTED, ObsState.FAULT}:
LOG.warning(
f"obsreset()ting PST beam {beam.dev_name()} "
f"in state {beam.obsState.name}"
)
beam.obsreset()
LOG.info("Ensure that CBF's PST routing is correctly configured")
# TODO should not have to do this after CBF X?
ensure_pst_routing(cbf, used_pst_beam_ids)
def abort_and_restart_tree(dev_tree: dict, abort=True, restart=True, timeout=5):
"""
Ensure `devs` supporting ObsState are in EMPTY.
First attempts to Abort() and Restart(), and if that doesn't succeed,
restart the devices via the device server interfaces.
"""
if abort:
abortable_devs = {d for d in dev_tree if d.obsState in ABORTABLE_STATES}
for dev in abortable_devs:
LOG.warning(f"Abort()ing {dev.dev_name()} in obsState {dev.obsState!r}")
try:
dev.Abort()
except DevFailed as ex:
LOG.warning(f"{dev}.Abort() failed: {ex.args[0].desc}")
abortable_devs |= {d for d in dev_tree if d.obsState == ObsState.ABORTING}
try:
wait_for(abortable_devs, "obsState", RESTARTABLE_STATES, timeout=timeout)
except ValueError as ex:
LOG.warning(f"Devices {ex.failed_devices} failed to Abort()")
for subtree in dev_tree.values():
abort_and_restart_tree(subtree, restart=False)
if restart:
restartable_devs = {d for d in dev_tree if d.obsState in RESTARTABLE_STATES}
for dev in restartable_devs:
LOG.warning(f"Restart()ing {dev.dev_name()} in obsState {dev.obsState!r}")
try:
dev.Restart()
except DevFailed as ex:
LOG.warning(f"{dev}.Restart() failed: {ex.args[0].desc}")
restartable_devs |= {d for d in dev_tree if d.obsState == ObsState.RESTARTING}
try:
wait_for(restartable_devs, "obsState", ObsState.EMPTY, timeout=timeout)
except ValueError as ex:
LOG.warning(f"Devices {ex.failed_devices} failed to Restart()")
for subtree in dev_tree.values():
abort_and_restart_tree(subtree, abort=False)
try:
wait_for(list(dev_tree), "obsState", ObsState.EMPTY, timeout=timeout)
except ValueError as ex:
LOG.warning(f"Devices {ex.failed_devices} failed to reach EMPTY")
restart_devices(ex.failed_devices)
wait_for(ex.failed_devices, "obsState", ObsState.EMPTY)