Source code for ska_ost_osd.telvalidation.semantic_validator

"""This module contains the 'semantic_validate' functions which is exposed to
outside for use.

If observing command's input contains invalid values it will raise
validation error's based on provided rules. Integrated OSD API function
to fetch rule constraints values.
"""

import logging
from os import environ
from typing import Any, Dict, Optional

from pydantic import ValidationError
from ska_telmodel_client import TMData

from ska_ost_osd.telvalidation.models.semantic_schema_validator import SemanticModel

from .common.constant import (
    ASSIGN_RESOURCE,
    CONFIGURE,
    LOW_SBD_VALIDATION_CONSTANT_JSON_FILE_PATH,
    LOW_VALIDATION_CONSTANT_JSON_FILE_PATH,
    MID_SBD_VALIDATION_CONSTANT_JSON_FILE_PATH,
    MID_VALIDATION_CONSTANT_JSON_FILE_PATH,
    SEMANTIC_VALIDATION_VALUE,
    SKA_LOW_SBD,
    SKA_LOW_TELESCOPE,
    SKA_MID_SBD,
    SKA_MID_TELESCOPE,
)
from .common.error_handling import SchematicValidationError
from .oet_tmc_validators import clear_semantic_variable_data, validate_json

logging.getLogger("telvalidation")

VALIDATION_STRICTNESS = environ.get("VALIDATION_STRICTNESS", "2")


def get_validation_data(interface: str, telescope: str) -> Optional[str]:
    """Get the validation constant JSON file path based on the provided
    interface URI.

    :param interface: str, the interface URI from the observing command
        input.
    :param telescope: str, the telescope identifier (e.g., 'mid' or
        'low').
    :return: Optional[str], the validation constant JSON file path, or
        None if not found.
    """

    validation_constants = {
        SKA_LOW_TELESCOPE: LOW_VALIDATION_CONSTANT_JSON_FILE_PATH,
        SKA_MID_TELESCOPE: MID_VALIDATION_CONSTANT_JSON_FILE_PATH,
        SKA_MID_SBD: MID_SBD_VALIDATION_CONSTANT_JSON_FILE_PATH,
        SKA_LOW_SBD: LOW_SBD_VALIDATION_CONSTANT_JSON_FILE_PATH,
    }

    for key, value in validation_constants.items():
        if key in interface or key == telescope:
            return value

    # taking mid interface as default cause there is no any specific
    # key to differentiate the interface
    return validation_constants.get(SKA_MID_TELESCOPE)


def fetch_capabilities_from_osd(
    telescope: str,
    array_assembly: str,
    tm_data: Optional[dict] = None,
    osd_data: Optional[dict] = None,
) -> tuple[dict, dict]:
    """Fetch capabilities and basic capabilities from the Observatory State
    Database (OSD).

    :param telescope: str, the telescope identifier (e.g., 'mid' or
        'low').
    :param array_assembly: str, the specific capabilities (e.g.,
        'AAO.5', 'AA0.1').
    :param tm_data: Optional[Dict], the telemodel data object.
    :param osd_data: Optional[Dict], the OSD data dictionary passed
        externally.
    :return: Tuple[Dict, Dict], a tuple containing the capabilities and
        basic capabilities dictionaries.
    """

    from ska_ost_osd.osd.osd import get_osd_data

    if osd_data:
        fetched_osd_data = osd_data
    else:
        fetched_osd_data, _ = get_osd_data(
            capabilities=[telescope],
            array_assembly=array_assembly,
            tmdata=tm_data,
        )

    capabilities = fetched_osd_data.get("capabilities", {}).get(telescope, {})
    if capabilities:
        return (
            capabilities.get(array_assembly, {}),
            capabilities.get("basic_capabilities", {}),
        )

    return {}, {}


def get_matched_values_from_basic_capabilities(
    data: list | dict, key_to_find: str
) -> dict | None:
    """Efficiently search a nested dictionary and list structure to find the
    value for the given key from basic capabilities.

    :param data: dict or list, the nested data structure to search.
    :param key_to_find: str, the key to search for.
    :return: The value associated with the given key, or None if the key
        is not found.
    """

    if data is None:
        return None

    stack = [(data, [])]
    while stack:
        current, path = stack.pop()
        if isinstance(current, dict):
            if key_to_find in current.values():
                return current
            for key, value in current.items():
                if isinstance(value, (dict, list)):
                    stack.append((value, path + [key]))
        elif isinstance(current, list):
            for item in reversed(current):
                if isinstance(item, (dict, list)):
                    stack.append((item, path))
    return None


def replace_matched_capabilities_values(
    nested_dict: dict, path: list[str], new_value: Any
) -> None:
    """Replace the value in capabilities data that matches a key from basic
    capabilities.

    :param nested_dict: dict, the dictionary to modify.
    :param path: list[str], the path to the key to replace, represented
        as a list of keys.
    :param new_value: Any, the new value to assign to the key.
    :raises KeyError: If any key in the path is not found in the nested
        dictionary.
    :raises TypeError: If the path does not lead to a dictionary.
    """

    current = nested_dict
    for key in path[:-1]:
        if not isinstance(current, dict) or key not in current:
            raise KeyError(f"Key '{key}' not found in the nested dictionary.")
        current = current[key]

    if not isinstance(current, dict):
        raise TypeError("The path does not lead to a dictionary.")

    last_key = path[-1]
    if last_key not in current:
        raise KeyError(f"Key '{last_key}' not found in the nested dictionary.")

    current[last_key] = new_value


def build_basic_capabilities_lookup(
    basic_capabilities: Any,
) -> Dict[str, Dict[str, Any]]:
    """Builds reference lookup dictionary from nested basic capabilities data.

    :param basic_capabilities: nested capability data containing lists
        of items with '_id' fields :return dictionary mapping each
        reference type to its corresponding item by id
    :example:
        >>> basic_capabilities = {
        ...     'dish_elevation_limit_deg': 15,
        ...     'receiver_information': [
        ...         {'max_frequency_hz': 1050000000, 'min_frequency_hz': 350000000,
        'rx_id': 'Band_1'},
        ...         {'max_frequency_hz': 1760000000, 'min_frequency_hz': 950000000,
        'rx_id': 'Band_2'},
        ...         {'max_frequency_hz': 3050000000, 'min_frequency_hz': 1650000000,
        'rx_id': 'Band_3'}
        ...     ]
        ... }
        >>> build_basic_capabilities_lookup(basic_capabilities)
        {
            'rx_id': {
                'Band_1': {'max_frequency_hz': 1050000000,
                'min_frequency_hz': 350000000, 'rx_id': 'Band_1'},
                'Band_2': {'max_frequency_hz': 1760000000,
                'min_frequency_hz': 950000000, 'rx_id': 'Band_2'},
                'Band_3': {'max_frequency_hz': 3050000000,
                'min_frequency_hz': 1650000000, 'rx_id': 'Band_3'}
            }
        }
    """

    capabilities_lookup: Dict[str, Dict[str, Any]] = {}

    def collect(node: Any) -> None:
        if isinstance(node, dict):
            for value in node.values():
                if isinstance(value, list) and all(
                    isinstance(item, dict) for item in value
                ):
                    for item in value:
                        for key in item:
                            if key.endswith("_id"):
                                capabilities_lookup.setdefault(key, {})[
                                    item[key]
                                ] = item
                else:
                    collect(value)
        elif isinstance(node, list):
            for item in node:
                collect(item)

    collect(basic_capabilities)
    return capabilities_lookup


def fetch_matched_capabilities_from_basic_capabilities(
    capabilities: Any, basic_capabilities: Dict[str, Dict[str, Any]]
) -> Any:
    """Recursively matches and replaces capability references using basic
    capability mappings.

    :param capabilities: input capabilities data as nested dict, list,
        or scalar
    :param basic_capabilities: lookup dictionary mapping reference keys
        to capability details :return transformed capabilities with
        matched basic capability values

    :example:
        >>> capabilities = {
        ...     'allowed_channel_count_range_max': [58982],
        ...     'allowed_channel_count_range_min': [1],
        ...     'allowed_channel_width_values': [13440],
        ...     'available_bandwidth_hz': 800000000,
        ...     'available_receivers': ['Band_1', 'Band_2'],
        ...     'cbf_modes': ['correlation', 'pst'],
        ...     'max_baseline_km': 1.5,
        ...     'number_dish_ids': ['SKA001', 'SKA036', 'SKA063', 'SKA100'],
        ...     'number_fsps': 4,
        ...     'number_meerkat_dishes': 0,
        ...     'number_meerkatplus_dishes': 0,
        ...     'number_pss_beams': 0,
        ...     'number_pst_beams': 1,
        ...     'number_ska_dishes': 4,
        ...     'number_zoom_channels': 0,
        ...     'number_zoom_windows': 0,
        ...     'ps_beam_bandwidth_hz': 400000000
        ... }
        >>> basic_capabilities = {
        ...     'rx_id': {
        ...         'Band_1': {'max_frequency_hz': 1050000000,
        'min_frequency_hz': 350000000,
        'rx_id': 'Band_1'},
        ...         'Band_2': {'max_frequency_hz': 1760000000,
        'min_frequency_hz': 950000000,
        'rx_id': 'Band_2'}
        ...     }
        ... }
        >>> fetch_matched_capabilities_from_basic_capabilities(capabilities,
        basic_capabilities)
        {
            'allowed_channel_count_range_max': [58982],
            'allowed_channel_count_range_min': [1],
            'allowed_channel_width_values': [13440],
            'available_bandwidth_hz': 800000000,
            'available_receivers': [
                {'max_frequency_hz': 1050000000, 'min_frequency_hz': 350000000,
                'rx_id': 'Band_1'},
                {'max_frequency_hz': 1760000000, 'min_frequency_hz': 950000000,
                'rx_id': 'Band_2'}
            ],
            'cbf_modes': ['correlation', 'pst'],
            'max_baseline_km': 1.5,
            'number_dish_ids': ['SKA001', 'SKA036', 'SKA063', 'SKA100'],
            'number_fsps': 4,
            'number_meerkat_dishes': 0,
            'number_meerkatplus_dishes': 0,
            'number_pss_beams': 0,
            'number_pst_beams': 1,
            'number_ska_dishes': 4,
            'number_zoom_channels': 0,
            'number_zoom_windows': 0,
            'ps_beam_bandwidth_hz': 400000000
        }
    """

    if isinstance(capabilities, dict):
        return {
            key: fetch_matched_capabilities_from_basic_capabilities(
                value, basic_capabilities
            )
            for key, value in capabilities.items()
        }
    elif isinstance(capabilities, list):
        if all(isinstance(item, str) for item in capabilities):
            for mapping in basic_capabilities.values():
                if all(ref in mapping for ref in capabilities):
                    return [mapping[ref] for ref in capabilities]
        return [
            fetch_matched_capabilities_from_basic_capabilities(item, basic_capabilities)
            for item in capabilities
        ]
    return capabilities


def validate_command_input(
    observing_command_input: dict,
    tm_data: TMData,
    interface: str,
    telescope: str,
    array_assembly: str,
    osd_data: dict,
) -> list:
    """Invoke semantic validation for the given command input.

    :param observing_command_input: dict, user JSON input for semantic
        validation.
    :param tm_data: TMData, the TMData object created externally.
    :param interface: str, assign/configure resource schema interface name.
    :param telescope: str, the telescope identifier (e.g., 'mid' or 'low').
    :param array_assembly: str, specific capabilities like 'AA0.5', 'AA1'.
    :param osd_data: dict, externally passed OSD data dictionary.
    :return: list, error messages if validation fails; empty list
    otherwise.
    """

    semantic_validate_data = tm_data[
        get_validation_data(interface, telescope)
    ].get_dict()
    # call OSD API and fetch capabilities and basic capabilities
    capabilities, basic_capabilities = fetch_capabilities_from_osd(
        telescope=semantic_validate_data["telescope"],
        array_assembly=array_assembly,
        tm_data=tm_data,
        osd_data=osd_data,
    )
    capabilities_lookup = build_basic_capabilities_lookup(basic_capabilities)
    matched_capabilities = fetch_matched_capabilities_from_basic_capabilities(
        capabilities, capabilities_lookup
    )
    validation_data = semantic_validate_data[array_assembly].get(
        "assign_resource"
        if ASSIGN_RESOURCE in interface
        else "configure"
        if CONFIGURE in interface
        else "sbd"
    )

    msg_list = validate_json(
        validation_data,
        command_input_json_config=observing_command_input,
        parent_path_list=[],
        capabilities=matched_capabilities,
    )

    return msg_list


[docs] def semantic_validate( observing_command_input: dict, tm_data: TMData, array_assembly: str = "AA0.5", interface: Optional[str] = None, raise_semantic: bool = True, osd_data: Optional[dict] = None, ) -> Any: """Entry point for semantic validation, usable by other libraries like CDM. :param observing_command_input: dict, details of the command to validate. This should match the structure expected by `ska_telmodel.schema.validate`. If the command is a JSON string, convert it to a dictionary with `json.loads` first. :param tm_data: TMData, telemodel data object used to load semantic validation JSON. :param osd_data: Optional[dict], externally passed OSD data dictionary. :param interface: Optional[str], full interface URI; provide only if missing in `observing_command_input`. :param array_assembly: str, array assembly version like 'AA0.5' or 'AA0.1'. :param raise_semantic: bool, default True. If True, raises `SchematicValidationError` on validation failure; if False, only logs errors and returns False. :return: bool, True if semantic validation passes, False otherwise. """ if int(VALIDATION_STRICTNESS) == SEMANTIC_VALIDATION_VALUE: try: SemanticModel( observing_command_input=observing_command_input, tm_data=tm_data, array_assembly=array_assembly, interface=interface, raise_semantic=raise_semantic, osd_data=osd_data, ) except ValidationError as err: raise err except ValueError as semantic_error: raise semantic_error clear_semantic_variable_data() version = observing_command_input.get("interface") or interface telescope = observing_command_input.get("telescope") if not version: message = ( "Interface is missing from observing_command_input. Please provide" " interface='...' explicitly." ) logging.warning(message) raise SchematicValidationError(message) msg_list = validate_command_input( observing_command_input, tm_data, version, telescope, array_assembly, osd_data, ) msg_list = [msg for msg in msg_list if msg] # Remove None values if msg_list: msg = "\n".join(msg_list) logging.error( "Also following errors were encountered during semantic %s", f"validations:\n{msg}", ) if raise_semantic: raise SchematicValidationError(msg) return False return True