"""Workarounds for known issues in the CSP subsystem."""
from __future__ import annotations
import json
import logging
from typing import Iterable
from ska_oso_scripting.engineering.low.utils.aiv_utils import CBFDevices, get_device
from ska_oso_scripting.engineering.low.utils.tango_utils import wait_for
__all__ = [
"ensure_cbf_processors_registered",
"ensure_pst_routing",
]
LOG = logging.getLogger(__name__)
P4_MAC_ADDRESS = "c2:4e:25:c4:f4:e0"
# TODO remove when PERENTIE-3094 is completed
[docs]
def ensure_cbf_processors_registered(cbf: CBFDevices):
"""
Ensures that all CBF processors are registered with the CBF allocator.
If the allocator is restarted after processors - for example because of
an upgrade to ska-low-cbf but not ska-low-cbf-proc, or a Pod eviction -
then it won't know about the processors, and won't be able to allocate
them to a subarray.
"""
processors = {p.dev_name() for p in cbf.processors}
processors_registered = {
alveo["tango_dev"] for alveo in json.loads(cbf.allocator.stats_alveo)
}
processors_unregistered = processors - processors_registered
for processor_trl in processors_unregistered:
LOG.warning(f"Registering {processor_trl} with CBF allocator")
get_device(processor_trl).register()
if processors_unregistered:
wait_for(
cbf.allocator,
"stats_alveo",
lambda v: set.issubset(
{p.dev_name() for p in cbf.processors},
{alveo["tango_dev"] for alveo in json.loads(v)},
),
)
[docs]
def ensure_pst_routing(cbf: CBFDevices, beam_ids: Iterable[int]):
"""
Update PST routing.
TODO: Identify or raise the SKB for this.
"""
# find the port that PST is connected to
hardware_connections = [
dict(map(lambda param: param.split("="), entry.split()))
for entry in cbf.allocator.get_property("hardware_connections")[
"hardware_connections"
]
]
[pst_link] = (
conn for conn in hardware_connections if "pst" in conn.get("link", "")
)
# Ensure all our PST beams are being routed to that port
required_routes = [
{"src": {"beam": beam_id}, "dst": {"port": pst_link["port"]}}
for beam_id in beam_ids
]
current_routes = json.loads(cbf.connector.PsrRoutingTable)["PSR"]
routes_to_add = [
r for r in required_routes if not any(e == r for e in current_routes)
]
for route in routes_to_add:
LOG.warning(f"Adding route to CBF connector PSR table: {route}")
if routes_to_add:
cbf.connector.UpdatePSREntry(json.dumps({"psr": routes_to_add}))
LOG.debug("PSR routing tables: %s", cbf.connector.PsrRoutingTable)