Source code for ska_oso_scripting.engineering.low.workarounds.mccs

"""Workarounds for known issues in the MCCS subsystem."""
import json
import logging
from typing import Iterable

from tango import DeviceProxy

from ska_oso_scripting.engineering.low.utils.tango_utils import member, single_prop

__all__ = [
    "ensure_sps_csp_endpoints",
]

LOG = logging.getLogger(__name__)


[docs] def ensure_sps_csp_endpoints(sps_stations: Iterable[DeviceProxy]): """ Ensure that each station's CSP endpoint is set to its CspIngestIp property. We need to check that the stations' station beams are being sent to CBF, and not e.g. the DAQ. The only true way to tell this is to inspect TPMs, which we currently don't have access to from the SPC cluster. So we check the value reported by SpsStation and warn if it disagrees, but unconditionally call SetCspIngest anyway. """ for sps_station in sps_stations: default_csp_ingest = single_prop(sps_station, "CspIngestIp") if sps_station.cspIngestAddress != default_csp_ingest: stn = member(sps_station) LOG.warning( f"{stn}'s CSP ingest address set to {sps_station.cspIngestAddress}, " f"not {default_csp_ingest} as expected. Fixing..." ) csp_config = { "destination_ip": default_csp_ingest, "destination_port": 4660, } sps_station.SetCspIngest(json.dumps(csp_config))