Source code for ska_mid_dish_dcp_lib.device.b5dc_device
"""Band 5 Down-converter Device."""
# pylint: disable=too-many-instance-attributes,too-few-public-methods
import logging
from enum import Enum
from ska_mid_dish_dcp_lib.device.b5dc_device_mappings import (
B5dcAttenuationBusy,
B5dcFrequency,
B5dcFrequencyBusy,
B5dcMappingException,
B5dcPllState,
map_clk_photodiode_current_ma,
map_if_power_out_dbm,
map_rf_power_in_dbm,
map_rfcm_attenuation_db,
map_rfcm_frequency_register,
map_rfcm_pll_lock_register,
map_spi_bus_busy,
map_temperature_degc,
)
from ska_mid_dish_dcp_lib.interface.b5dc_interface import B5dcInterface
from ska_mid_dish_dcp_lib.protocol.b5dc_protocol import B5dcProtocolTimeout
from ska_mid_dish_dcp_lib.utils.helper import retry_on_exception
[docs]class B5dcDeviceAttenuationException(Exception):
"""Exception used to indicate issue setting attenuation."""
[docs]class B5dcDeviceFrequencyException(Exception):
"""Exception used to indicate issue setting frequency."""
[docs]class B5dcDevice:
"""B5dcDevice class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface):
"""Initialise the B5dcDevice class.
:param logger: logging handle
:param b5dc_interface: handle to the b5dc interface
"""
self._logger = logger
self._b5dc_interface = b5dc_interface
self.sensors = B5dcDeviceSensors(self._logger, self._b5dc_interface)
self.attenuation_conf = B5dcDeviceConfigureAttenuation(
self._logger, self._b5dc_interface
)
self.frequency_conf = B5dcDeviceConfigureFrequency(
self._logger, self._b5dc_interface
)
[docs]class B5dcDeviceSensors:
"""B5dcDeviceSensors class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface):
"""Initialise the B5dcDeviceSensor class.
:param logger: logging handle
:param b5dc_interface: handle to the b5dc interface
"""
self._logger = logger
self._b5dc_interface = b5dc_interface
# Device state
self.rfcm_frequency: float = 0.0
self.rfcm_pll_lock: B5dcPllState = B5dcPllState.NOT_LOCKED
self.rfcm_h_attenuation_db: float = 0.0
self.rfcm_v_attenuation_db: float = 0.0
self.clk_photodiode_current_ma: float = 0.0
self.h_pol_rf_power_in_dbm: float = 0.0
self.v_pol_rf_power_in_dbm: float = 0.0
self.h_pol_if_power_out_dbm: float = 0.0
self.v_pol_if_power_out_dbm: float = 0.0
self.rf_temperature_degc: float = 0.0
self.rfcm_psu_pcb_temperature_degc: float = 0.0
self.sensor_mapper = {
"spi_rfcm_frequency": {
"mapping_function": map_rfcm_frequency_register,
"variable_name": "rfcm_frequency",
"unit": "GHz",
},
"spi_rfcm_pll_lock": {
"mapping_function": map_rfcm_pll_lock_register,
"variable_name": "rfcm_pll_lock",
"unit": "",
},
"spi_rfcm_h_attenuation": {
"mapping_function": map_rfcm_attenuation_db,
"variable_name": "rfcm_h_attenuation_db",
"unit": "dB",
},
"spi_rfcm_v_attenuation": {
"mapping_function": map_rfcm_attenuation_db,
"variable_name": "rfcm_v_attenuation_db",
"unit": "dB",
},
"spi_rfcm_photo_diode_ain0": {
"mapping_function": map_clk_photodiode_current_ma,
"variable_name": "clk_photodiode_current_ma",
"unit": "mA",
},
"spi_rfcm_rf_in_h_ain1": {
"mapping_function": map_rf_power_in_dbm,
"variable_name": "h_pol_rf_power_in_dbm",
"unit": "dBm",
},
"spi_rfcm_rf_in_v_ain2": {
"mapping_function": map_rf_power_in_dbm,
"variable_name": "v_pol_rf_power_in_dbm",
"unit": "dBm",
},
"spi_rfcm_if_out_h_ain3": {
"mapping_function": map_if_power_out_dbm,
"variable_name": "h_pol_if_power_out_dbm",
"unit": "dBm",
},
"spi_rfcm_if_out_v_ain4": {
"mapping_function": map_if_power_out_dbm,
"variable_name": "v_pol_if_power_out_dbm",
"unit": "dBm",
},
"spi_rfcm_rf_temp_ain5": {
"mapping_function": map_temperature_degc,
"variable_name": "rf_temperature_degc",
"unit": "deg C",
},
"spi_rfcm_psu_pcb_temp_ain7": {
"mapping_function": map_temperature_degc,
"variable_name": "rfcm_psu_pcb_temperature_degc",
"unit": "deg C",
},
}
[docs] async def update_sensor(self, register_name: str) -> None:
"""Get updated data for sensor."""
try:
mapping_function = self.sensor_mapper[register_name]["mapping_function"]
variable_name = self.sensor_mapper[register_name]["variable_name"]
unit = self.sensor_mapper[register_name]["unit"]
except KeyError:
self._logger.error(
f"Mapping for register name ({register_name}) does not exist."
)
raise
try:
reg_val = await getattr(self._b5dc_interface, register_name)()
result = mapping_function(reg_val) # type: ignore
setattr(self, variable_name, result) # type: ignore
# if enum then show name
if isinstance(result, Enum):
result = result.name
self._logger.debug(f"{variable_name} updated to {result} {unit}")
except (B5dcMappingException, B5dcAttenuationBusy, B5dcFrequencyBusy) as exc:
self._logger.error(f"Failed to update {variable_name}: {exc}")
except B5dcProtocolTimeout as exc:
self._logger.error(
f"Protocol timeout exception raised on {variable_name} update: {exc}"
)
raise
[docs] async def update_state(self) -> None:
"""Update all B5DC state."""
for register_name in self.sensor_mapper:
await self.update_sensor(register_name)
[docs]class B5dcDeviceConfigureAttenuation:
"""B5dcDeviceConfigureAttenuation class."""
MAX_ATTENUATION_DB = 32.0
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface):
"""Initialise the B5dcDeviceConfigureAttenuation class.
:param logger: logging handle
:param b5dc_interface: handle to the b5dc interface
"""
self._logger = logger
self._b5dc_interface = b5dc_interface
@retry_on_exception() # type: ignore
async def _check_spi_bus_free(self) -> None:
"""Check that the SPI bus is not busy."""
reg_val = await self._b5dc_interface.spi_bus_lock()
if map_spi_bus_busy(reg_val):
raise B5dcSpiBusy
@retry_on_exception() # type: ignore
async def _check_atten_conf_done(self, atten_attr_name: str) -> float:
"""Check that the attenuation configuration is done.
:param atten_attr_name: specific attenuation register
:return: attenuation value in db
"""
reg_val = await getattr(self._b5dc_interface, atten_attr_name)()
return map_rfcm_attenuation_db(reg_val)
[docs] async def set_attenuation(
self, set_attenuation_db: int, atten_attr_name: str
) -> None:
"""Set the attenuation on the band 5 down converter.
:param set_attenuation_db: value to set in dB
:param atten_attr_name: specific register to set
:raises B5dcDeviceAttenuationException: for input validation,
value not being set correctly, timeout on protocol layer.
"""
if (
set_attenuation_db >= B5dcDeviceConfigureAttenuation.MAX_ATTENUATION_DB
or set_attenuation_db < 0
):
raise B5dcDeviceAttenuationException(
"Attenuation must be >= 0 and less than "
f"({B5dcDeviceConfigureAttenuation.MAX_ATTENUATION_DB})"
)
try:
await self._check_spi_bus_free()
reg_val = int(set_attenuation_db * 2)
await getattr(self._b5dc_interface, f"set_{atten_attr_name}")(reg_val)
result = await self._check_atten_conf_done(atten_attr_name)
if result != set_attenuation_db:
raise B5dcDeviceAttenuationException("Failed to set attenuation")
except (
B5dcSpiBusy,
B5dcProtocolTimeout,
B5dcMappingException,
B5dcAttenuationBusy,
) as exc:
raise B5dcDeviceAttenuationException("Failed to set attenuation") from exc
[docs]class B5dcDeviceConfigureFrequency:
"""B5dcDeviceConfigureFrequency class."""
[docs] def __init__(self, logger: logging.Logger, b5dc_interface: B5dcInterface):
"""Initialise the B5dcDeviceConfigureFrequency class.
:param logger: logging handle
:param b5dc_interface: handle to the b5dc interface
"""
self._logger = logger
self._b5dc_interface = b5dc_interface
@retry_on_exception(retries=5, delay=5.0) # type: ignore
async def _check_freq_conf_done(self) -> float:
"""Check that the frequency configuration is done.
:return: frequency register
"""
reg_val = await self._b5dc_interface.spi_rfcm_frequency()
# raise exception if busy flag set
return map_rfcm_frequency_register(reg_val)
[docs] async def set_frequency(self, frequency: B5dcFrequency) -> None:
"""Set the frequency on the band 5 down converter.
:param frequency: frequency to set
:raises B5dcDeviceFrequencyException: for input validation,
value not being set correctly, timeout on protocol layer.
"""
try:
await self._b5dc_interface.set_spi_rfcm_frequency(frequency.value)
current_frequency = await self._check_freq_conf_done()
if frequency.frequency_value_ghz() != current_frequency:
raise B5dcDeviceFrequencyException("Failed to set frequency.")
except (B5dcFrequencyBusy, B5dcProtocolTimeout, B5dcMappingException) as exc:
raise B5dcDeviceFrequencyException("Failed to set frequency.") from exc