Source code for ska_oso_scripting.engineering.low.workarounds.csp

"""Workarounds for known issues in the CSP subsystem."""
from __future__ import annotations

import json
import logging
from typing import Iterable

from ska_oso_scripting.engineering.low.utils.aiv_utils import CBFDevices, get_device
from ska_oso_scripting.engineering.low.utils.tango_utils import wait_for

__all__ = [
    "ensure_cbf_processors_registered",
    "ensure_pst_routing",
]

LOG = logging.getLogger(__name__)

P4_MAC_ADDRESS = "c2:4e:25:c4:f4:e0"


# TODO remove when PERENTIE-3094 is completed
[docs] def ensure_cbf_processors_registered(cbf: CBFDevices): """ Ensures that all CBF processors are registered with the CBF allocator. If the allocator is restarted after processors - for example because of an upgrade to ska-low-cbf but not ska-low-cbf-proc, or a Pod eviction - then it won't know about the processors, and won't be able to allocate them to a subarray. """ processors = {p.dev_name() for p in cbf.processors} processors_registered = { alveo["tango_dev"] for alveo in json.loads(cbf.allocator.stats_alveo) } processors_unregistered = processors - processors_registered for processor_trl in processors_unregistered: LOG.warning(f"Registering {processor_trl} with CBF allocator") get_device(processor_trl).register() if processors_unregistered: wait_for( cbf.allocator, "stats_alveo", lambda v: set.issubset( {p.dev_name() for p in cbf.processors}, {alveo["tango_dev"] for alveo in json.loads(v)}, ), )
[docs] def ensure_pst_routing(cbf: CBFDevices, beam_ids: Iterable[int]): """ Update PST routing. TODO: Identify or raise the SKB for this. """ # find the port that PST is connected to hardware_connections = [ dict(map(lambda param: param.split("="), entry.split())) for entry in cbf.allocator.get_property("hardware_connections")[ "hardware_connections" ] ] [pst_link] = ( conn for conn in hardware_connections if "pst" in conn.get("link", "") ) # Ensure all our PST beams are being routed to that port required_routes = [ {"src": {"beam": beam_id}, "dst": {"port": pst_link["port"]}} for beam_id in beam_ids ] current_routes = json.loads(cbf.connector.PsrRoutingTable)["PSR"] routes_to_add = [ r for r in required_routes if not any(e == r for e in current_routes) ] for route in routes_to_add: LOG.warning(f"Adding route to CBF connector PSR table: {route}") if routes_to_add: cbf.connector.UpdatePSREntry(json.dumps({"psr": routes_to_add})) LOG.debug("PSR routing tables: %s", cbf.connector.PsrRoutingTable)