Source code for ska_mid_dish_dcp_lib.device.b5dc_device

"""Band 5 Down-converter Device."""
# pylint: disable=too-many-instance-attributes,too-few-public-methods
import logging
from enum import Enum

from ska_mid_dish_dcp_lib.device.b5dc_device_mappings import (
    B5dcAttenuationBusy,
    B5dcFrequency,
    B5dcFrequencyBusy,
    B5dcMappingException,
    B5dcPllState,
    map_clk_photodiode_current_ma,
    map_if_power_out_dbm,
    map_rf_power_in_dbm,
    map_rfcm_attenuation_db,
    map_rfcm_frequency_register,
    map_rfcm_pll_lock_register,
    map_spi_bus_busy,
    map_temperature_degc,
)
from ska_mid_dish_dcp_lib.interface.b5dc_interface import B5dcInterface
from ska_mid_dish_dcp_lib.protocol.b5dc_protocol import B5dcProtocolTimeout
from ska_mid_dish_dcp_lib.utils.helper import retry_on_exception


[docs]class B5dcDeviceAttenuationException(Exception): """Exception used to indicate issue setting attenuation."""
[docs]class B5dcDeviceFrequencyException(Exception): """Exception used to indicate issue setting frequency."""
[docs]class B5dcSpiBusy(Exception): """Exception used to indicate SPI bus is busy."""
[docs]class B5dcDevice: """B5dcDevice class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface): """Initialise the B5dcDevice class. :param logger: logging handle :param b5dc_interface: handle to the b5dc interface """ self._logger = logger self._b5dc_interface = b5dc_interface self.sensors = B5dcDeviceSensors(self._logger, self._b5dc_interface) self.attenuation_conf = B5dcDeviceConfigureAttenuation( self._logger, self._b5dc_interface ) self.frequency_conf = B5dcDeviceConfigureFrequency( self._logger, self._b5dc_interface )
[docs]class B5dcDeviceSensors: """B5dcDeviceSensors class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface): """Initialise the B5dcDeviceSensor class. :param logger: logging handle :param b5dc_interface: handle to the b5dc interface """ self._logger = logger self._b5dc_interface = b5dc_interface # Device state self.rfcm_frequency: float = 0.0 self.rfcm_pll_lock: B5dcPllState = B5dcPllState.NOT_LOCKED self.rfcm_h_attenuation_db: float = 0.0 self.rfcm_v_attenuation_db: float = 0.0 self.clk_photodiode_current_ma: float = 0.0 self.h_pol_rf_power_in_dbm: float = 0.0 self.v_pol_rf_power_in_dbm: float = 0.0 self.h_pol_if_power_out_dbm: float = 0.0 self.v_pol_if_power_out_dbm: float = 0.0 self.rf_temperature_degc: float = 0.0 self.rfcm_psu_pcb_temperature_degc: float = 0.0 self.sensor_mapper = { "spi_rfcm_frequency": { "mapping_function": map_rfcm_frequency_register, "variable_name": "rfcm_frequency", "unit": "GHz", }, "spi_rfcm_pll_lock": { "mapping_function": map_rfcm_pll_lock_register, "variable_name": "rfcm_pll_lock", "unit": "", }, "spi_rfcm_h_attenuation": { "mapping_function": map_rfcm_attenuation_db, "variable_name": "rfcm_h_attenuation_db", "unit": "dB", }, "spi_rfcm_v_attenuation": { "mapping_function": map_rfcm_attenuation_db, "variable_name": "rfcm_v_attenuation_db", "unit": "dB", }, "spi_rfcm_photo_diode_ain0": { "mapping_function": map_clk_photodiode_current_ma, "variable_name": "clk_photodiode_current_ma", "unit": "mA", }, "spi_rfcm_rf_in_h_ain1": { "mapping_function": map_rf_power_in_dbm, "variable_name": "h_pol_rf_power_in_dbm", "unit": "dBm", }, "spi_rfcm_rf_in_v_ain2": { "mapping_function": map_rf_power_in_dbm, "variable_name": "v_pol_rf_power_in_dbm", "unit": "dBm", }, "spi_rfcm_if_out_h_ain3": { "mapping_function": map_if_power_out_dbm, "variable_name": "h_pol_if_power_out_dbm", "unit": "dBm", }, "spi_rfcm_if_out_v_ain4": { "mapping_function": map_if_power_out_dbm, "variable_name": "v_pol_if_power_out_dbm", "unit": "dBm", }, "spi_rfcm_rf_temp_ain5": { "mapping_function": map_temperature_degc, "variable_name": "rf_temperature_degc", "unit": "deg C", }, "spi_rfcm_psu_pcb_temp_ain7": { "mapping_function": map_temperature_degc, "variable_name": "rfcm_psu_pcb_temperature_degc", "unit": "deg C", }, }
[docs] async def update_sensor(self, register_name: str) -> None: """Get updated data for sensor.""" try: mapping_function = self.sensor_mapper[register_name]["mapping_function"] variable_name = self.sensor_mapper[register_name]["variable_name"] unit = self.sensor_mapper[register_name]["unit"] except KeyError: self._logger.error( f"Mapping for register name ({register_name}) does not exist." ) raise try: reg_val = await getattr(self._b5dc_interface, register_name)() result = mapping_function(reg_val) # type: ignore setattr(self, variable_name, result) # type: ignore # if enum then show name if isinstance(result, Enum): result = result.name self._logger.debug(f"{variable_name} updated to {result} {unit}") except (B5dcMappingException, B5dcAttenuationBusy, B5dcFrequencyBusy) as exc: self._logger.error(f"Failed to update {variable_name}: {exc}") except B5dcProtocolTimeout as exc: self._logger.error( f"Protocol timeout exception raised on {variable_name} update: {exc}" ) raise
[docs] async def update_state(self) -> None: """Update all B5DC state.""" for register_name in self.sensor_mapper: await self.update_sensor(register_name)
[docs]class B5dcDeviceConfigureAttenuation: """B5dcDeviceConfigureAttenuation class.""" MAX_ATTENUATION_DB = 32.0
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface): """Initialise the B5dcDeviceConfigureAttenuation class. :param logger: logging handle :param b5dc_interface: handle to the b5dc interface """ self._logger = logger self._b5dc_interface = b5dc_interface
@retry_on_exception() # type: ignore async def _check_spi_bus_free(self) -> None: """Check that the SPI bus is not busy.""" reg_val = await self._b5dc_interface.spi_bus_lock() if map_spi_bus_busy(reg_val): raise B5dcSpiBusy @retry_on_exception() # type: ignore async def _check_atten_conf_done(self, atten_attr_name: str) -> float: """Check that the attenuation configuration is done. :param atten_attr_name: specific attenuation register :return: attenuation value in db """ reg_val = await getattr(self._b5dc_interface, atten_attr_name)() return map_rfcm_attenuation_db(reg_val)
[docs] async def set_attenuation( self, set_attenuation_db: int, atten_attr_name: str ) -> None: """Set the attenuation on the band 5 down converter. :param set_attenuation_db: value to set in dB :param atten_attr_name: specific register to set :raises B5dcDeviceAttenuationException: for input validation, value not being set correctly, timeout on protocol layer. """ if ( set_attenuation_db >= B5dcDeviceConfigureAttenuation.MAX_ATTENUATION_DB or set_attenuation_db < 0 ): raise B5dcDeviceAttenuationException( "Attenuation must be >= 0 and less than " f"({B5dcDeviceConfigureAttenuation.MAX_ATTENUATION_DB})" ) try: await self._check_spi_bus_free() reg_val = int(set_attenuation_db * 2) await getattr(self._b5dc_interface, f"set_{atten_attr_name}")(reg_val) result = await self._check_atten_conf_done(atten_attr_name) if result != set_attenuation_db: raise B5dcDeviceAttenuationException("Failed to set attenuation") except ( B5dcSpiBusy, B5dcProtocolTimeout, B5dcMappingException, B5dcAttenuationBusy, ) as exc: raise B5dcDeviceAttenuationException("Failed to set attenuation") from exc
[docs]class B5dcDeviceConfigureFrequency: """B5dcDeviceConfigureFrequency class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface): """Initialise the B5dcDeviceConfigureFrequency class. :param logger: logging handle :param b5dc_interface: handle to the b5dc interface """ self._logger = logger self._b5dc_interface = b5dc_interface
@retry_on_exception(retries=5, delay=5.0) # type: ignore async def _check_freq_conf_done(self) -> float: """Check that the frequency configuration is done. :return: frequency register """ reg_val = await self._b5dc_interface.spi_rfcm_frequency() # raise exception if busy flag set return map_rfcm_frequency_register(reg_val)
[docs] async def set_frequency(self, frequency: B5dcFrequency) -> None: """Set the frequency on the band 5 down converter. :param frequency: frequency to set :raises B5dcDeviceFrequencyException: for input validation, value not being set correctly, timeout on protocol layer. """ try: await self._b5dc_interface.set_spi_rfcm_frequency(frequency.value) current_frequency = await self._check_freq_conf_done() if frequency.frequency_value_ghz() != current_frequency: raise B5dcDeviceFrequencyException("Failed to set frequency.") except (B5dcFrequencyBusy, B5dcProtocolTimeout, B5dcMappingException) as exc: raise B5dcDeviceFrequencyException("Failed to set frequency.") from exc