Source code for ska_oso_scripting.engineering.low.workarounds.subarray

"""Helper functions for working with SKA Low subarrays."""

from __future__ import annotations

import json
import logging

from ska_control_model import ObsState
from tango import Database, DevFailed, DeviceProxy, DevState
from toolz import get_in

from ska_oso_scripting.engineering.low.utils.aiv_utils import (
    get_cbf_devices,
    get_device,
    get_mccs_device,
    get_sdp_devices,
    get_tmc_devices,
    station_label_from_id,
)
from ska_oso_scripting.engineering.low.utils.constants import (
    ABORTABLE_STATES,
    RESTARTABLE_STATES,
)
from ska_oso_scripting.engineering.low.utils.tango_utils import (
    member,
    restart_devices,
    single_prop,
    wait_for,
)
from ska_oso_scripting.engineering.low.workarounds.csp import (
    ensure_cbf_processors_registered,
    ensure_pst_routing,
)
from ska_oso_scripting.engineering.low.workarounds.mccs import ensure_sps_csp_endpoints

__all__ = [
    "back_to_empty",
    "pre_obs_checks",
]

LOG = logging.getLogger(__name__)


def build_device_tree_for_subarray(
    tm_subarray: DeviceProxy | int, assign_resources_block: dict | None = None
) -> dict:
    """
    Build the device hierarchy used by abort_and_restart_tree for a given TMC
    subarray.

    Returns a nested dict mapping DeviceProxy -> subtree dict.
    """

    if isinstance(tm_subarray, int):
        tmc = get_tmc_devices()
        [tm_subarray] = (
            sa for sa in tmc.tmc_subarray_nodes if int(member(sa)) == tm_subarray
        )

    subarray_id = int(member(tm_subarray))

    # get the subarray devices corresponding to the supplied TMC subarray
    mccs_subarray = get_device(single_prop(tm_subarray, "MccsSubarrayFQDN"))
    csp_subarray = get_device(single_prop(tm_subarray, "CspSubarrayFQDN"))
    sdp_subarray = get_device(single_prop(tm_subarray, "SdpSubarrayFQDN"))

    mccs_resources = json.loads(mccs_subarray.assignedResources)

    mccs_subarray_beam_ids = set(mccs_resources["subarray_beam_ids"])
    if assign_resources_block:
        mccs_subarray_beam_ids |= set(
            f"{subarray_id:02}-{beam['subarray_beam_id']:02}"
            for beam in get_in(["mccs", "subarray_beams"], assign_resources_block, [])
        )
    mccs_subarray_beams = [
        get_mccs_device(f"low-mccs/subarraybeam/{subarray_beam_id}")
        for subarray_beam_id in mccs_subarray_beam_ids
    ]

    mccs_station_beams = [
        get_mccs_device(f"low-mccs/beam/{station_beam_id}")
        for station_beam_id in mccs_resources["station_beam_ids"]
    ]

    device_tree = {
        tm_subarray: {
            mccs_subarray: {
                dev: {} for dev in mccs_station_beams + mccs_subarray_beams
            },
            csp_subarray: {
                get_device(csp_subarray.cbfSubarrayAddress): {},
            },
            sdp_subarray: {},
        },
    }

    return device_tree


[docs] def back_to_empty( tm_subarray: DeviceProxy | int, assign_resources_block: dict | None = None ): """Take a TMC subarray and do our best to bring it back to EMPTY. Split into 2 parts - 1) building the device tree then 2) operate on the device tree - to making integration testing using a shallower device tree easier. """ device_tree = build_device_tree_for_subarray(tm_subarray, assign_resources_block) abort_and_restart_tree(device_tree)
[docs] def pre_obs_checks(assign_resources_block: dict): """Make sure all devices relevant to an AssignResources are EMPTY.""" # get DeviceProxys relevant to current obs cbf = get_cbf_devices() sdp = get_sdp_devices() sps_stations = [ get_mccs_device(f"low-mccs/spsstation/{station_name}") for beam in assign_resources_block["mccs"]["subarray_beams"] for station_name in { station_label_from_id(aperture["station_id"]) for aperture in beam["apertures"] } ] tmc = get_tmc_devices() tmc_subarray = next( s for s in tmc.tmc_subarray_nodes if int(member(s)) == assign_resources_block["subarray_id"] ) LOG.info( "Ensure stations' CSP endpoints are all configured " "according to their Tango properties" ) ensure_sps_csp_endpoints(sps_stations) LOG.info("Ensure CBF processors are registered with the CBF allocator") ensure_cbf_processors_registered(cbf) # TODO remove when SKB-852 is resolved LOG.info("Ensure SDP devices are all ON") for dev in [sdp.controller, *sdp.subarrays]: state = dev.State() if state in {DevState.STANDBY, DevState.OFF}: LOG.warning(f"{dev.dev_name()} in state {state}, issuing On() - SKB-852") dev.On() LOG.info(f"Ensure {tmc_subarray.dev_name()} is EMPTY before using it") back_to_empty(tmc_subarray, assign_resources_block) LOG.info("Ensure that allocated PST beams are IDLE") # TODO this needs an SKB # Check that the PST beam is not in ABORTED. This is relevant to every # obs even if it's not a PST scan, because CSP chokes when it's not healthy pst_beams = [ get_device(d) for d in Database().get_device_exported_for_class("PstBeam") ] # TODO likewise, PST beams we actually intend to use must not be READY pst_cfg = assign_resources_block["csp"].get("pst") used_pst_beam_ids = pst_cfg["pst_beam_ids"] if pst_cfg else [] faulty_pst_beams = { beam for beam in pst_beams if beam.obsState in {ObsState.ABORTED, ObsState.FAULT} or beam.State() == DevState.OFF or (beam.obsState != ObsState.IDLE and beam.deviceID in used_pst_beam_ids) } for beam in faulty_pst_beams: if beam.State() == DevState.OFF: LOG.warning( f"Turning On() PST beam {beam.dev_name()} " f"in state {beam.obsState.name}" ) beam.On() if beam.obsState == ObsState.READY: LOG.warning( f"Abort()ing PST beam {beam.dev_name()} " f"in state {beam.obsState.name}" ) beam.Abort() if beam.obsState in {ObsState.ABORTED, ObsState.FAULT}: LOG.warning( f"obsreset()ting PST beam {beam.dev_name()} " f"in state {beam.obsState.name}" ) beam.obsreset() LOG.info("Ensure that CBF's PST routing is correctly configured") # TODO should not have to do this after CBF X? ensure_pst_routing(cbf, used_pst_beam_ids)
def abort_and_restart_tree(dev_tree: dict, abort=True, restart=True, timeout=5): """ Ensure `devs` supporting ObsState are in EMPTY. First attempts to Abort() and Restart(), and if that doesn't succeed, restart the devices via the device server interfaces. """ if abort: abortable_devs = {d for d in dev_tree if d.obsState in ABORTABLE_STATES} for dev in abortable_devs: LOG.warning(f"Abort()ing {dev.dev_name()} in obsState {dev.obsState!r}") try: dev.Abort() except DevFailed as ex: LOG.warning(f"{dev}.Abort() failed: {ex.args[0].desc}") abortable_devs |= {d for d in dev_tree if d.obsState == ObsState.ABORTING} try: wait_for(abortable_devs, "obsState", RESTARTABLE_STATES, timeout=timeout) except ValueError as ex: LOG.warning(f"Devices {ex.failed_devices} failed to Abort()") for subtree in dev_tree.values(): abort_and_restart_tree(subtree, restart=False) if restart: restartable_devs = {d for d in dev_tree if d.obsState in RESTARTABLE_STATES} for dev in restartable_devs: LOG.warning(f"Restart()ing {dev.dev_name()} in obsState {dev.obsState!r}") try: dev.Restart() except DevFailed as ex: LOG.warning(f"{dev}.Restart() failed: {ex.args[0].desc}") restartable_devs |= {d for d in dev_tree if d.obsState == ObsState.RESTARTING} try: wait_for(restartable_devs, "obsState", ObsState.EMPTY, timeout=timeout) except ValueError as ex: LOG.warning(f"Devices {ex.failed_devices} failed to Restart()") for subtree in dev_tree.values(): abort_and_restart_tree(subtree, abort=False) try: wait_for(list(dev_tree), "obsState", ObsState.EMPTY, timeout=timeout) except ValueError as ex: LOG.warning(f"Devices {ex.failed_devices} failed to reach EMPTY") restart_devices(ex.failed_devices) wait_for(ex.failed_devices, "obsState", ObsState.EMPTY)