Source code for ska_sdp_scripting.processing_block

"""High-level API for SKA SDP scripts."""

# pylint: disable=too-many-instance-attributes

import logging
import os
from collections.abc import Mapping
from typing import List, Sequence, Union

import ska_ser_logging

from .buffer_request import BufferRequest
from .config import new_config_client
from .phase import Phase
from .receive_addresses import generate_host_port_channel_map

# Initialise logging
ska_ser_logging.configure_logging()
LOG = logging.getLogger("ska_sdp_scripting")
LOG.setLevel(logging.DEBUG)


[docs] class ProcessingBlock: """ Claim the processing block. :param pb_id: processing block ID :type pb_id: str, optional """ def __init__(self, pb_id: str = None): # Get connection to config DB LOG.info("Opening connection to config DB") self._config = new_config_client() # Processing block ID if pb_id is None: self._pb_id = os.getenv("SDP_PB_ID") else: self._pb_id = pb_id LOG.debug("Processing Block ID %s", self._pb_id) # Claim processing block pb = None for txn in self._config.txn(): txn.take_processing_block(self._pb_id, self._config.client_lease) pb = txn.get_processing_block(self._pb_id) LOG.info("Claimed processing block") # Processing Block self._pb = pb # Execution block ID self._eb_id = pb.eb_id # DNS name self._service_name = "receive" self._chart_name = None self._namespace = None # Ports self._ports = [] # pylint: disable=too-many-arguments
[docs] def receive_addresses( self, configured_host_port: dict, chart_name: str = None, service_name: str = None, namespace: str = None, update_dns=True, ): """ Generate receive addresses and update the processing block state. Chart_name and service_name are only needed if the configured_host_port needs updating with dns names Currently this only applies to the vis-receive script. :param configured_host_port: constructed host and port :param chart_name: Name of the statefulset :param service_name: Name of the headless service :param namespace: namespace where it's going to be deployed :param update_dns: Need to update with DNS address? (default True) """ # Generate receive addresses LOG.info("Generating receive addresses") if update_dns: # Update the DNS address within the host port # Using names of the service and chart self._update_chart_names(service_name, chart_name, namespace) receive_addresses = self._update_with_dns_name( configured_host_port ) LOG.info("Updated host port with DNS names.") else: # we do not update anything # This is mainly for the pointing script LOG.info("No update of DNS names executed.") receive_addresses = configured_host_port self._add_config_beam_info_to_recv_addrs(receive_addresses) # Update receive addresses in processing block state LOG.info("Updating receive addresses in processing block state") for txn in self._config.txn(): state = txn.get_processing_block_state(self._pb_id) state["receive_addresses"] = receive_addresses txn.update_processing_block_state(self._pb_id, state) # Write pb_id in pb_receive_addresses in EB LOG.info("Writing PB ID to pb_receive_addresses in EB") for txn in self._config.txn(): eb = txn.get_execution_block(self._eb_id) eb["pb_receive_addresses"] = self._pb_id txn.update_execution_block(self._eb_id, eb)
[docs] def config_host_port_channel_map( self, scan_types, port_start, channels_per_port, *, num_hosts=1, max_ports_per_host=None, ): """ Configures a dictionary of host and port channel maps for a given list of scan types. This is a wrapper function for generate_host_port_channel_map function which returns a dictionary of host and port channel maps for a given list of scan types. :param scan_types: The list of scan types for which receive addresses should be calculated. :param port_start: The first port to allocate on each host. :param channels_per_port: The number of channels that are sent per port. :param num_hosts: If given, the exact number of hosts to allocate. :param max_ports_per_host: If given, the maximum number of ports to allocate to a host before a new host is allocated (per beam). :returns: The dictionary of host and port channels maps for a given list of scan types. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later. """ recv_addresses, host_port_count = generate_host_port_channel_map( scan_types, port_start, channels_per_port, num_hosts=num_hosts, max_ports_per_host=max_ports_per_host, ) return recv_addresses, host_port_count
[docs] def get_dependencies(self): """ Get the list of processing block dependencies. :returns: processing block dependencies :rtype: list """ return self._pb.dependencies
[docs] def get_parameters(self): """ Get script parameters from processing block. The schema checking is not currently implemented. :returns: processing block parameters :rtype: dict """ return self._pb.parameters
[docs] def update_parameters( self, default_parameters: dict, parameters: Union[dict, Mapping] ): """ Nested overwrite of default_parameter values with ones in parameters. :param default_parameters: :dict: default parameter values :param parameters: :dict: script specific parameters :returns: processing block additional parameters :rtype: dict """ for param, value in parameters.items(): if isinstance(value, Mapping): default_parameters[param] = self.update_parameters( default_parameters.get(param, {}), value ) else: default_parameters[param] = value return default_parameters
[docs] def get_scan_types(self) -> List[str]: """ Get scan types from the execution block. Updates the scan types with the default parameters and channels. This is only supported for real-time scripts :returns: scan types :rtype: list """ LOG.info("Retrieving channel link map from EB") concrete_scan_types = None for txn in self._config.txn(): eb = txn.get_execution_block(self._eb_id) scan_types = eb.get("scan_types") channels = eb.get("channels") default_scan_types = {} concrete_scan_types = [] # collect pseudo scan types containing default values for scan_type in scan_types: scan_type_id = scan_type.get("scan_type_id") if scan_type_id.startswith("."): default_scan_types[scan_type_id] = scan_type else: concrete_scan_types.append(scan_type) self._update_scan_types( concrete_scan_types, default_scan_types, channels ) # scan_types are updated in place return concrete_scan_types
def _update_scan_types( self, concrete_scan_types: List[dict], default_scan_types: dict, channels: dict, ): """ Updates the scan types with the default parameters and channels. Factored out from `get_scan_types` to reduce complexity. :poram concrete_scan_types: concrete (non-default) scan types :poram default_scan_types: default scan types :param channels: channel definitions """ for scan_type in concrete_scan_types: beams = scan_type.get("beams") # Updates the scan type if required to derive # from default scan types if "derive_from" in scan_type.keys(): default = default_scan_types[scan_type.get("derive_from")] beams = { **{k: {} for k in default.get("beams").keys()}, **beams, } for beam_key, beam_value in beams.items(): if beam_key in default.get("beams"): beam_value.update(default.get("beams")[beam_key]) scan_type["beams"] = beams # Update scan types with relevant channels to the beams self._add_channels(channels, scan_type)
[docs] @staticmethod def request_buffer(size: float, tags: List[str]) -> BufferRequest: """ Request a buffer reservation. This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders. :param size: size of the buffer :type size: float :param tags: tags describing the type of buffer required :type tags: list of str :returns: buffer reservation request :rtype: :class:`BufferRequest` """ return BufferRequest(size, tags)
[docs] def create_phase(self, name: str, requests: List[BufferRequest]) -> Phase: """ Create a script phase for deploying execution engines. The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too. :param name: name of the phase :type name: str :param requests: resource requests :type requests: list of :class:`BufferRequest` :returns: the phase :rtype: :class:`Phase` """ kind = self._pb.script["kind"] qc_config = self._pb.parameters.get("queue_connector_configuration") return Phase( name, requests, self._config, self._pb_id, self._eb_id, kind, qc_config, )
[docs] def exit(self): """Perform clean-up.""" LOG.info("Closing connection to config DB") self._config.close()
[docs] def nested_parameters(self, parameters: dict): """Convert flattened dictionary to nested dictionary. :param parameters: parameters to be converted :return: nested parameters """ result = {} for keys, values in parameters.items(): self._split_rec(keys, values, result) return result
# ------------------------------------- # Private methods # ------------------------------------- def _update_chart_names( self, service_name: str, chart_name: str, namespace: str ) -> None: """ Assign custom names if they are provided by user This is required for _update_with_dns_name. Note: the namespace is read from the "SDP_HELM_NAMESPACE" env variable if not provided :param chart_name: Name of the statefulset :param service_name: Name of the headless service :param namespace: namespace where it's going to be deployed :return: updated names stored in ProcessingBlock """ if namespace is not None: self._namespace = namespace else: self._namespace = os.getenv("SDP_HELM_NAMESPACE") if service_name is not None: self._service_name = service_name if chart_name is not None: self._chart_name = chart_name else: for txn in self._config.txn(): for deploy_id in txn.list_deployments(): if self._pb_id in deploy_id: self._chart_name = deploy_id def _update_with_dns_name(self, configured_host_port: dict) -> dict: """ Generate DNS name for the receiving processes. The DNS address will then be updated within the host port This only needs to be done for receive scripts. The host part of the DNS name is generated by the PB for each host in the host maps by doing "<deployment_id><host_stub>", with <host_stub> being the current host value in the host maps. At this point <host_stub> contains the numbers 0, 1, 2..., so we now turn them into the correct prefix that, after appended to the deployment ID, will yield the pod names as generated by the vis-receive chart (which have the form "<deployment_id>-<statefulset_index:02d>-0"). Note also that the host needs to have a dot at the end because the PB appends the service name straight after the hostname. :param configured_host_port: constructed host and port :return: configured_host_port (input dictionary modified in-place) """ if ( self._chart_name is None or self._service_name is None or self._namespace is None ): raise ValueError( "Names of chart, service and namespace " "must be provided to" "obtain receive address DNS name" ) # If the correct names are provided # Get DNS address for beam in configured_host_port.values(): for values in beam.values(): modified_hosts = ( [] ) # Create a new list to store modified hosts for start, hostname in values["host"]: hostname = int(hostname) dns_name = ( self._chart_name + "-" + f"{hostname:02d}" + "-0." + self._service_name + "." + self._namespace ) modified_hosts.append((start, dns_name)) values["host"] = tuple(modified_hosts) return configured_host_port def _split_rec(self, keys: str, values, out: dict): """Splitting keys in dictionary using recursive approach. :param keys: keys from the dictionary :param values: values from the dictionary :param out: output result """ keys, *rest = keys.split(".", 1) if rest: self._split_rec(rest[0], values, out.setdefault(keys, {})) else: out[keys] = values @staticmethod def _add_channels(channels: dict, scan_type: dict): """ Add related channels to the beams in the given scan_type :param channels: channels from execution block :param scan_type: scan type to update """ beams = scan_type.get("beams") for beam, beam_value in beams.items(): beam_channels_id = beam_value.get("channels_id") for channel in channels: if beam_channels_id == channel.get("channels_id"): beam_value.update(channel) break if beam_channels_id and "spectral_windows" not in beam_value: raise ValueError( f"scan_type with {beam} beam does not " f"have channel_id information." ) # This one still has too high cognitive complexity (23). @staticmethod def _get_hosts_and_ports_per_beam( beam_value: dict, channels_per_port: int, max_channels_per_process: int, num_process: int, port_start: int, ): """ Construct hosts and ports for each beam in each scan :param beam_value: value of a given beam :param channels_per_port: number of channels to be sent to each port :param max_channels_per_process: maximum number of channels per process :param num_process: number of receive processes :param port_start: starting port the receiver will be listening in """ # Initial variables hosts = [] ports = [] prev_count = 0 process_per_channel = 0 entry = True for chan in beam_value.get("spectral_windows"): start = chan.get("start") prev_count = prev_count + chan.get("count") for i in range(0, chan.get("count"), max_channels_per_process): port_count = 0 if entry: prev_count = prev_count + chan.get("count") num_process = 1 entry = False else: prev_count = prev_count + chan.get("count") if prev_count >= max_channels_per_process: process_per_channel += 1 num_process += process_per_channel if i == 0: prev_count = 0 hosts.append([start, f"-{process_per_channel}."]) if channels_per_port > 1: for j in range(0, channels_per_port): ports.append([start, port_start + j, 1, port_count]) port_count += 1 else: ports.append([start, port_start, 1]) start = start + max_channels_per_process return hosts, num_process, ports def _add_config_beam_info_to_recv_addrs( self, receive_addresses: dict ) -> None: """ Add the "function", and if present the "{x}_beam_id" keys from "beams" listed in the execution block to the appropriate beams in receive_addresses. "{x}_beam_id" can be visibility_beam_id, search_beam_id, timing_beam_id, vlbi_beam_id depending on the type of beam. Updates receive_addresses in place. """ beam_id_types = ["visibility", "search", "timing", "vlbi"] for txn in self._config.txn(): eb = txn.get_execution_block(self._eb_id) eb_beams = eb.get("beams") eb_beams_dict = {beam["beam_id"]: beam for beam in eb_beams} for _, recv_beam in receive_addresses.items(): self._add_recv_beam(recv_beam, eb_beams_dict, beam_id_types) @staticmethod def _add_recv_beam( recv_beam: dict, eb_beams: dict, beam_id_types: Sequence[str] ) -> None: """ Factored out from the above to reduce complexity, see that for description (each receive address). :param recv_beam: receive address value :param eb_beams: mapping of ids to beams :param beam_id_types: the types of beam """ for beam_id, beam_value in recv_beam.items(): if "function" in eb_beams[beam_id].keys(): beam_value["function"] = eb_beams[beam_id]["function"] for int_beam_id in beam_id_types: int_beam_id_key = f"{int_beam_id}_beam_id" if int_beam_id_key in eb_beams[beam_id].keys(): beam_value[int_beam_id_key] = eb_beams[beam_id][ int_beam_id_key ]