Source code for ska_telmodel.telvalidation.mid_oet_tmc_validators

"""
This module retrieves semantic validation constants
constant file contains specific error messages and rules,
while execution of assign resource, configure command
from jupyter notebook or any UI we are validating json
payload which provided for execution of specific command.
"""

import logging
from datetime import datetime

import astropy.units as u
import simpleeval
from astropy.time import Time
from simpleeval import EvalWithCompoundTypes

from .constant import MID_VALIDATION_CONSTANT_JSON_FILE_PATH
from .coordinates_conversion import (
    dec_degs_str_formats,
    ra_dec_to_az_el,
    ra_degs_from_str_formats,
)
from .schematic_validation_exceptions import (
    SchemanticValdidationKeyError,
    SchematicValidationError,
)

logging.getLogger("telvalidation")
SEMANTIC_DATA_GLOBAL_CONSTANT = {}


def get_value_based_on_key(
    input_command_data: dict,
    search_key: str,
    item_populated_list: list,
    parent_key_for_search: str,
    comparison_parent_key: str,
) -> list:
    """
    :param input_command_data: dictionary containing details of the command
        which needs validation.

    :param search_key: string containing keys of all parameters
        which needs validation.

    :param item_populated_list: list containing all the search values

    :param parent_key_for_search: supporting key to identify proper semantic
        validation key

    :param comparison_parent_key: help to compare parent values.

    :returns: item_populated list

    """
    for key, item in input_command_data.items():
        if (
            parent_key_for_search
            and type(key) == str
            and key == search_key
            and parent_key_for_search == comparison_parent_key
        ):
            item_populated_list.append(item)
        elif type(item) == dict:
            get_value_based_on_key(
                item,
                search_key,
                item_populated_list,
                parent_key_for_search,
                comparison_parent_key=key,
            )
        elif type(item) == list and [val for val in item if type(val) == dict]:
            for new_val in item:
                get_value_based_on_key(
                    new_val,
                    search_key,
                    item_populated_list,
                    parent_key_for_search,
                    comparison_parent_key=key,
                )
    item_populated = (
        item_populated_list[0]
        if len(item_populated_list) > 0
        else item_populated_list
    )
    return item_populated


def apply_validation_rule(
    key: str, value: list, command_input_json_config: dict, parent_key: str
) -> str:
    """
    :param key: user input key for search.
    :param value: value list contains rule and error.
    :command_input_json_config: command input json from operator.
    :param parent_key: parent key helps to identify correct chield key.
    :returns: error message after apply rule.

    """

    res_value = get_value_based_on_key(
        command_input_json_config,
        key,
        item_populated_list=[],
        parent_key_for_search=parent_key,
        comparison_parent_key=None,
    )
    if res_value:
        # globally initialize semantic data and stored key, value
        # simpleeval library doen't supported to pass list, dict, set
        # as parameter to functions.
        SEMANTIC_DATA_GLOBAL_CONSTANT.update({key: res_value})
        eval_functions = simpleeval.DEFAULT_FUNCTIONS.copy()
        eval_functions.update(
            length=return_length,
            value=return_value_from_global_constant,
        )
        for rule_data in value:
            try:
                names = {}
                # added extra key check to validate based on dependency key
                if "dependency_key" in rule_data:
                    names = {
                        key: key,
                        rule_data["dependency_key"]: rule_data[
                            "dependency_key"
                        ],
                    }
                elif type(res_value) == list:
                    names = {key: key}
                else:
                    names = {key: res_value}
                eval_data = EvalWithCompoundTypes(
                    functions=eval_functions, names=names
                ).eval(rule_data["rule"])
                if eval_data is False or (
                    type(eval_data) != bool and len(eval_data) > 0
                ):
                    return rule_data["error"]
            except KeyError as key_error:
                logging.error(key_error)
                raise SchemanticValdidationKeyError(
                    message="Invalid rule and error key passed"
                )


[docs]def validate_json( semantic_validate_constant_json: dict, command_input_json_config: dict, error_msg_list: list, parent_key: str, ) -> list: """ :param semantic_validate_constant_json: json containing all the parameters along with its business semantic validation rules and error message. :param command_input_json_config: dictionary containing details of the command input which needs validation. This is same as for ska_telmodel.schema.validate. :param parent_key: temp key to store parent key, means if same semantic validation key present in 2 places this will help to identify correct parent. :returns: error_msg_list: list containing all combined error which arises due to semantic validation. """ # initially declared empty values for error messages list, last parent dict # and parent key for key, value in semantic_validate_constant_json.items(): if type(value) == list: # if validation key present in multiple dict parent_key # helps to populate currect child rule_result = apply_validation_rule( key=key, value=value, command_input_json_config=command_input_json_config, parent_key=parent_key, ) if rule_result: error_msg_list.append(rule_result) elif type(value) == dict: # added extra key as rule parent to perform rule validation # on child # e.g semantic rule suggest calculate beams lenght but beams # is having array of element, in this case parent_rule_key # key helps to apply rule on child if "parent_key_rule" in value: rule_key = list(value.keys())[1] rule_result = apply_validation_rule( key=rule_key, value=value["parent_key_rule"], command_input_json_config=command_input_json_config, parent_key=key, ) if rule_result: error_msg_list.append(rule_result) parent_key = key validate_json( value, command_input_json_config, error_msg_list, parent_key ) return error_msg_list
def return_length(key): """ this fucntion is created to just return length of element. simpleeval library not supported default lenght on list, tuple, set so we need to create separate method and pass it simpleeval as a function. :param key: semantic validate key. """ return len(SEMANTIC_DATA_GLOBAL_CONSTANT[key]) def return_value_from_global_constant(key): """ this fucntion is created to just return value of element from semantic data global constant. :param key: semantic validate key. """ return SEMANTIC_DATA_GLOBAL_CONSTANT[key]
[docs]def validate_target_is_visible( ra_str: str, dec_str: str, telescope: str, target_env: str, tm_data, observing_time: datetime = datetime.utcnow(), ) -> str: """ Check the target specific by ra,dec is visible during observing_time at telescope site :param ra_str: string containing value of ra :param dec_str: string containing value of dec :param telescope: string containing name of the telescope :param observing_time: string containing value of observing_time :param target_env: string containing the environment value(mid/low) for the target :param tm_data: telemodel tm dataobject using which we can load semantic validate json. """ observing_time = observing_time.strftime("%Y-%m-%dT%H:%M:%S") utcoffset = +2 * u.hour if target_env == "target_mid" else +8 * u.hour observing_time = (Time(observing_time) - utcoffset).strftime( "%Y-%m-%dT%H:%M:%S" ) validator_json_schema = tm_data[ MID_VALIDATION_CONSTANT_JSON_FILE_PATH ].get_dict() dish_elevation_limit = validator_json_schema["mid"]["AA0.5"][ "dish_elevation_limit" ]["min"] ra_dec = [ ra_degs_from_str_formats(ra_str)[0], dec_degs_str_formats(dec_str)[0], ] temp_list = ra_dec_to_az_el( telesc=telescope, ra=ra_dec[0], dec=ra_dec[1], obs_time=observing_time, el_limit=dish_elevation_limit, if_set=True, time_format="isot", tm_data=tm_data, ) if len(temp_list) >= 3 and temp_list[2]: return True else: error_message = ( f"Telescope: {telescope} target observing during {observing_time} " "is not visible" ) logging.error(error_message) raise SchematicValidationError(error_message)