Source code for ska_mid_dish_dcp_lib.interface.b5dc_interface

"""Module that handles the parsing of b5dc ICD."""

import csv
import inspect
import logging
import types
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable, List, Optional, Union

ICD_FILENAME = "band5.fpg"
ICD_RELATIVE_DIR = "icd"
ICD_WRITABLE_REG_FILENAME = "writable_registers.txt"


[docs]@dataclass(frozen=True) class B5dcProperty: """Dataclass that contains register information from ICD.""" name: str address: int writable: bool description: str = "" def __str__(self) -> str: """Override the string method to format the address in hexidecimal.""" description = f"Name: {self.name}, Address: {hex(self.address)}" description += f", Writable: {self.writable}, Description: {self.description}" return description
[docs]class B5dcPropertyParser: # pylint: disable=too-few-public-methods """Class that parses the B5DC ICD."""
[docs] def __init__(self, logger: logging.Logger): """Read the ICD files and generates a list of properties. :param logger: logger handle for logging """ self._logger = logger self.properties = self._get_icd_properties()
def _get_icd_properties(self) -> List[B5dcProperty]: """Parse the ICD files to get a list of properties. :return: B5DC properties """ icd_path = self._get_icd_path() writable_reg_names = self._get_writable_register_names(icd_path) fpg_file = icd_path / ICD_FILENAME b5dc_properties = [] try: with fpg_file.open("r") as file: csv_reader = csv.reader(file, delimiter=" ") lines = list(csv_reader) self._validate_writable_registers_in_icd(lines, writable_reg_names) b5dc_properties = self._parse_fpg_lines(lines, writable_reg_names) except FileNotFoundError: self._logger.error("File not found at path: %s", fpg_file) raise return b5dc_properties def _validate_writable_registers_in_icd( self, icd_file_row: List[List[str]], writable_registers: List[str] ) -> None: """Check the register names in the writable list has a match in the ICD file. :param icd_file_row: A row in the fpg file :param writable_registers: List of writable registers """ icd_register_names = [] for idx, line in enumerate(icd_file_row): if line[0] == "?register": assert ( len(line) == 4 ), f"Length of parameters for line {idx} in ICD file is incorrect." icd_register_names.append(line[1]) for register in writable_registers: assert ( register in icd_register_names ), f"Check register naming ({register}) is not in ICD." self._logger.debug("ICD validation passed") def _parse_fpg_lines( self, icd_file_row: List[List[str]], writable_registers: List[str] ) -> List[B5dcProperty]: """Parse the ICD rows and create B5dcProperty objects to represent the row. :param icd_file_row: List of strings from ICD file :param writable_registers: List of writable registers :return: List of B5dcProperty """ b5dc_properties = [] for line in icd_file_row: if line[0] == "?register": property_name = line[1] register_address = int(line[2], 16) writable = property_name in writable_registers b5dc_property = B5dcProperty(property_name, register_address, writable) b5dc_properties.append(b5dc_property) self._logger.debug("Property found: %s", b5dc_property) return b5dc_properties def _get_writable_register_names(self, path: Path) -> List[str]: """Get the list of register names which are writable. :return: Writable register names """ path = path / ICD_WRITABLE_REG_FILENAME try: with path.open("r") as file: registers = [line.strip() for line in file if line.strip()] except FileNotFoundError: self._logger.error("File not found at path: %s", path) raise return registers def _get_icd_path(self) -> Path: """ Get the path to the ICD files. :return: Path to ICD files """ script_dir = Path(__file__).parent return script_dir / ICD_RELATIVE_DIR
[docs]class B5dcInterface: # pylint: disable=too-few-public-methods """Class that exposes the B5dc registers as python properties."""
[docs] def __init__( self, logger: logging.Logger, icd_parser: B5dcPropertyParser, get_method: Optional[Callable[[int], Union[Any, Awaitable[Any]]]] = None, set_method: Optional[Callable[[int, int], Union[Any, Awaitable[Any]]]] = None, ): """Initialise the B5dcInterface class. :param logger: logging handle :param icd_parser: contains parsed ICD :param get_method: method that is called when property is accessed :param set_method: method that is called when property is set """ self._logger = logger self._icd_parser = icd_parser self._get_callback = get_method self._set_callback = set_method # create map of register name to b5dc property for quick access of members self._properties = {prop.name: prop for prop in self._icd_parser.properties} self._create_dynamic_methods()
def _create_dynamic_methods(self) -> None: for register_name in self._properties: if ( self._properties[register_name].writable and self._set_callback is not None ): async def async_setter( self: Any, value: int, name: str = register_name ) -> Any: return await self._set_method_async(name, value) def sync_setter( self: Any, value: int, name: str = register_name ) -> Any: return self._set_method(name, value) setter = self._create_dynamic_method( async_setter, sync_setter, self._set_callback, ) setattr( self.__class__, f"set_{register_name}", types.MethodType(setter, self), ) self._logger.debug(f"Created setter method (set_{register_name}(...)).") if self._get_callback is not None: async def async_getter(self: Any, name: str = register_name) -> Any: return await self._get_method_async(name) def sync_getter(self: Any, name: str = register_name) -> Any: return self._get_method(name) getter = self._create_dynamic_method( async_getter, sync_getter, self._get_callback ) setattr(self.__class__, register_name, types.MethodType(getter, self)) self._logger.debug(f"Created getter method ({register_name}()).") def _create_dynamic_method( self, async_func: Callable[..., Awaitable[Any]], sync_func: Callable[..., Any], callback: Union[Callable[..., Any], Callable[..., Awaitable[Any]]], ) -> Union[Callable[..., Any], Callable[..., Awaitable[Any]]]: # Returns either the async or sync method based on the callback type return async_func if inspect.iscoroutinefunction(callback) else sync_func async def _set_method_async(self, name: str, value: int) -> None: address = self._properties[name].address self._logger.debug( f"Set method ({name}) " f"called with address ({hex(address)}) and value ({value})" ) if self._set_callback is not None: await self._set_callback(address, value) def _set_method(self, name: str, value: int) -> None: address = self._properties[name].address self._logger.debug( f"Set method ({name}) " f"called with address ({hex(address)}) and value ({value})" ) if self._set_callback is not None: self._set_callback(address, value) async def _get_method_async(self, name: str) -> int: val = -1 address = self._properties[name].address if self._get_callback is not None: val = await self._get_callback(address) self._logger.debug(f"Get method ({name}) called with address ({hex(address)}).") return val def _get_method(self, name: str) -> int: val = -1 address = self._properties[name].address if self._get_callback is not None: val = self._get_callback(address) # type: ignore self._logger.debug(f"Get method ({name}) called with address ({hex(address)}).") return val
if __name__ == "__main__": # pragma: no cover logger_ = logging.getLogger(__name__) logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=logging.DEBUG ) b5dc_property_parser = B5dcPropertyParser(logger_) b5dc_interface = B5dcInterface(logger_, b5dc_property_parser)