Source code for ska_mid_dish_dcp_lib.protocol.b5dc_protocol

"""B5DC protocol interface."""
# pylint: disable=too-many-instance-attributes
import asyncio
import enum
import logging
import struct
from asyncio.futures import Future
from asyncio.transports import BaseTransport
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Optional, Tuple

PROTOCOL_ID = 0xBEEF
OP_CODE_WRITE = 0
OP_CODE_READ = 1
READ_DICT_TIMEOUT_S = 1.0
BUFFER_CHECK_PERIOD_S = 0.02
MAX_BUFFER_SIZE = 1000
BUFFER_TRIM_SIZE = int(MAX_BUFFER_SIZE / 2)


[docs]class B5dcProtocolException(Exception): """Exception class for B5dc protocol."""
[docs]class B5dcProtocolTimeout(Exception): """Exception class for response timeout."""
[docs]class B5dcErrorCode(enum.IntEnum): """Band 5 down-converter specific protocol errors.""" SUCCESS = 0x00000000 FAILED = 0x00000001 NETWORK = 0xFF000000
[docs]@dataclass class B5dcRequest: """Dataclass that contains data related to a request.""" read: int address: int length: int = 1 data: Optional[int] = None
[docs]@dataclass class B5dcResponse: """Dataclass that contains data related to a response.""" error_code: B5dcErrorCode length: int = 0 data: Optional[int] = None
[docs]class BaseB5dcProtocol(asyncio.DatagramProtocol): """Class that defines the B5DC protocol."""
[docs] def __init__(self, logger: logging.Logger): """Initialise band 5 down-converter protocol class. :param logger: logging handle """ self._logger = logger self._protocol_id: int = PROTOCOL_ID self.sequence_number: int = 0 self._receive_buffer: OrderedDict[int, B5dcResponse] = OrderedDict()
def _increment_sequence_number(self) -> None: self.sequence_number += 1 if self.sequence_number >= pow(2, 32): self.sequence_number = 0
[docs] def handle_received_messages(self, message: bytes) -> None: """Decode raw data and store in dictionary. :param message: raw message """ try: decoded_response = self.unpack_response(message) self._logger.debug(f"Decoded response: {decoded_response}") self._receive_buffer[decoded_response[0]] = decoded_response[1] self._limit_buffer_size() except B5dcProtocolException: str_bytes = B5dcProtocol.format_bytes_to_hexadecimal_string(message) self._logger.debug(f"Message could not be parsed: ({str_bytes}).")
def _limit_buffer_size(self) -> None: if len(self._receive_buffer) >= MAX_BUFFER_SIZE: self._logger.warning( "Buffer size exceeded maximum size." f"Trimming last {BUFFER_TRIM_SIZE} entries in buffer." ) # remove last BUFFER_TRIM_SIZE entries from buffer for _ in range(BUFFER_TRIM_SIZE): self._receive_buffer.popitem(last=False)
[docs] async def get_response(self, sequence_number: int) -> B5dcResponse: """Fetch received message from buffer. :param sequence_number: message sequence number :return: response associated with sequence number """ try: await asyncio.wait_for( self._wait_for_response(sequence_number), timeout=READ_DICT_TIMEOUT_S ) return self._receive_buffer.pop(sequence_number) except asyncio.TimeoutError as exc: raise B5dcProtocolTimeout("Timeout receiving response.") from exc
async def _wait_for_response(self, sequence_number: int) -> None: """Wait until the sequence number is available.""" while sequence_number not in self._receive_buffer: await asyncio.sleep(BUFFER_CHECK_PERIOD_S)
[docs] def construct_request(self, address: int, data: Optional[int] = None) -> bytes: """Construct the payload for a request. :param address: memory address :param data: data when write request is made, defaults to None :return: payload """ if data is not None: request = B5dcRequest(OP_CODE_WRITE, address, 1, data) self._logger.debug( f"Sending write request: address ({hex(address)}) data ({data})" ) else: request = B5dcRequest(OP_CODE_READ, address) self._logger.debug(f"Sending read request: address ({hex(address)})") return self.pack_request(self.sequence_number, request)
[docs] def pack_request( self, sequence_number: int, b5dc_request: B5dcRequest, ) -> bytes: """Packs the data required to make a request to the B5DC. :param sequence_number: sequence number :param b5dc_request: request information :return: The packed data """ if b5dc_request.length > 1: raise B5dcProtocolException("Length > 1 unsupported.") format_str = "!HI" payload_bytes = struct.pack(format_str, self._protocol_id, sequence_number) command_part = self._command_payload_request( b5dc_request.address, b5dc_request.length, b5dc_request.data ) payload_bytes += command_part return payload_bytes
[docs] def unpack_request(self, request: bytes) -> Tuple[int, B5dcRequest]: """Unpack the bytes request. :param sequence_number: expected sequence number :param response: packed response data received from transport :raises B5dcProtocolException: For protocol ID inconsistency :raises B5dcProtocolException: For no payload data :raises B5dcProtocolException: Error in parsing the payload :return: seqeunce number and request data """ try: header = struct.unpack("!HI", self._get_start_bytes(request, 6)) remaining_payload = self._get_end_bytes(request, 6) except (struct.error, B5dcProtocolException) as exc: raise B5dcProtocolException("Error parsing response message.") from exc protocol_id_received = header[0] sequence_number_received = header[1] if protocol_id_received != PROTOCOL_ID: raise B5dcProtocolException("Unexpected protocol ID received.") if len(remaining_payload) == 0: raise B5dcProtocolException("No payload received, only header information.") unpacked_request = None try: unpacked_request = self._get_next_cmd_request(remaining_payload) except (struct.error, B5dcProtocolException) as exc: raise B5dcProtocolException("Error parsing request message.") from exc return sequence_number_received, unpacked_request
[docs] def pack_response( self, sequence_number: int, b5dc_response: B5dcResponse, ) -> bytes: """Pack the expected response into bytes. :param sequence_number: sequence number :param b5dc_response: response data :return: Packed response in bytes """ format_str = "!HI" payload_bytes = struct.pack(format_str, self._protocol_id, sequence_number) command_part = self._command_payload_response( b5dc_response.error_code.value, b5dc_response.length, b5dc_response.data ) payload_bytes += command_part return payload_bytes
[docs] def unpack_response(self, response: bytes) -> Tuple[int, B5dcResponse]: """Unpack the response from the B5DC. :param response: packed response data received from transport :raises B5dcProtocolException: For protocol ID inconsistency :raises B5dcProtocolException: For no payload data received :raises B5dcProtocolException: If there is an error in parsing the payload :return: sequence number, Decoded response """ try: header = struct.unpack("!HI", self._get_start_bytes(response, 6)) remaining_payload = self._get_end_bytes(response, 6) except (struct.error, B5dcProtocolException) as exc: raise B5dcProtocolException("Error parsing response message.") from exc protocol_id_received = header[0] sequence_number_received = header[1] if protocol_id_received != PROTOCOL_ID: raise B5dcProtocolException("Unexpected protocol ID received.") if len(remaining_payload) == 0: raise B5dcProtocolException("No payload received, only header information.") unpacked_response = None try: unpacked_response = self._get_next_cmd_response(remaining_payload) except (struct.error, B5dcProtocolException, ValueError) as exc: raise B5dcProtocolException("Error parsing response message.") from exc return sequence_number_received, unpacked_response
def _get_next_cmd_response(self, response: bytes) -> B5dcResponse: unpacked = struct.unpack("!II", self._get_start_bytes(response, 8)) error_code = B5dcErrorCode.FAILED try: error_code = B5dcErrorCode(unpacked[0]) except ValueError as exc: self._logger.error( "Error code received (%s) could not be mapped to enum.", unpacked[0] ) raise B5dcProtocolException("Invalid error code.") from exc length = unpacked[1] data = None if length == 1: unpacked = struct.unpack("!III", self._get_start_bytes(response, 12)) data = unpacked[2] if length > 1: raise B5dcProtocolException("Unsupported data length > 1.") return B5dcResponse(error_code, length, data) def _get_next_cmd_request(self, response: bytes) -> B5dcRequest: unpacked = struct.unpack("!III", self._get_start_bytes(response, 12)) read = unpacked[0] address = unpacked[1] length = unpacked[2] data = None if read == 0: unpacked = struct.unpack("!IIII", self._get_start_bytes(response, 16)) data = unpacked[3] if length > 1: raise B5dcProtocolException("Unsupported data length > 1.") return B5dcRequest(read, address, length, data) def _command_payload_request( self, address: int, length: int, data: Optional[int] = None ) -> bytes: packed_data = bytes() op_code = OP_CODE_READ format_str = "!III" # default read format # data present implies a write if data is not None: op_code = OP_CODE_WRITE format_str = "!IIII" packed_data = struct.pack(format_str, op_code, address, length, data) else: packed_data = struct.pack(format_str, op_code, address, length) return packed_data def _command_payload_response( self, error_code: int, length: int, data: Optional[int] = None ) -> bytes: packed_data = bytes() format_str = "!II" # default write response format # data present implies a read response if data is not None: format_str = "!III" packed_data = struct.pack(format_str, error_code, length, data) else: packed_data = struct.pack(format_str, error_code, length) return packed_data def _get_start_bytes(self, payload: bytes, num_bytes: int) -> bytes: if len(payload) < num_bytes: raise B5dcProtocolException( f"Payload does not have {num_bytes} bytes to read." ) return payload[:num_bytes] def _get_end_bytes(self, payload: bytes, num_bytes: int) -> bytes: if len(payload) < num_bytes: raise B5dcProtocolException( f"Payload does not have {num_bytes} bytes to read." ) return payload[num_bytes:]
[docs] @staticmethod def format_bytes_to_hexadecimal_string(data: bytes) -> str: """Convert bytes representation to string hexadecimal representation. :param data: data in bytes :return: string formatted hexadecimal """ hex_string = data.hex() return " ".join( f"0x{hex_string[i:i + 2]}" for i in range(0, len(hex_string), 2) )
[docs]class B5dcProtocol(BaseB5dcProtocol): """B5dcProtocol class."""
[docs] def __init__( self, on_con_lost: Future[Any], logger: logging.Logger, remote_addr: Tuple[str, int], error_count_threshold: int = 3, ): """Initialise B5dcProtocol class. :param on_con_lost: Future to indicate when connection lost :param logger: logging handle :param remote_addr: IP and port of B5DC device """ super().__init__(logger) self.on_con_lost = on_con_lost self._logger = logger self._transport: Optional[BaseTransport] = None self._remote_addr = remote_addr self.connection_established = False self.error_counter = 0 self.error_count_threshold = error_count_threshold self._lock = asyncio.Lock()
[docs] def connection_made(self, transport: BaseTransport) -> None: """Set transport when connection is established.""" self._logger.info("Connection to B5DC established.") self._transport = transport self._remote_addr = transport.get_extra_info("peername") self.connection_established = True
[docs] def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: """Receive data from transport layer.""" self.error_counter = 0 if addr == self._remote_addr: str_bytes = B5dcProtocol.format_bytes_to_hexadecimal_string(data) self._logger.debug(f"Received from B5DC: {str_bytes}.") self.handle_received_messages(data)
[docs] def error_received(self, exc: Any) -> None: """Log error received.""" self._logger.info(f"Error received {exc}.") self.error_counter += 1 if self.error_counter >= self.error_count_threshold: self.set_connection_lost_flags()
[docs] def connection_lost(self, exc: Any) -> None: """Log connection lost.""" self._logger.info(f"Connection to B5DC lost: {exc}") self.set_connection_lost_flags()
[docs] def set_connection_lost_flags(self) -> None: """Set connection lost flag to false and complete on_con_lost future.""" self.connection_established = False if not self.on_con_lost.done(): self.on_con_lost.set_result(True)
[docs] async def sync_write_register(self, address: int, data: int) -> None: """Write synchronously to B5DC and return whether successful. :param address: address field to write data to :param data: data to be written :raises B5dcProtocolTimeout: when response is not received in timeout period """ async with self._lock: response = await self._sync_request(address, data) if response.error_code != B5dcErrorCode.SUCCESS: raise B5dcProtocolException("Write failed.")
[docs] async def sync_read_register(self, address: int) -> Optional[int]: """Read address data synchronously from B5DC. :param address: address to read :raises B5dcProtocolTimeout: when read times out :return: data read from address """ async with self._lock: response = await self._sync_request(address) if response.error_code != B5dcErrorCode.SUCCESS: raise B5dcProtocolException("Read failed: Error code fail.") return response.data
async def _sync_request( self, address: int, data: Optional[int] = None ) -> B5dcResponse: if self._transport is None: raise B5dcProtocolException("No valid transport handle.") request_bytes = self.construct_request(address, data) sequence_number_tx = self.sequence_number str_bytes = B5dcProtocol.format_bytes_to_hexadecimal_string(request_bytes) self._logger.debug(f"Sending message: {str_bytes}") self._transport.sendto(request_bytes) self._increment_sequence_number() try: response = await self.get_response(sequence_number_tx) except B5dcProtocolTimeout: self._logger.debug( "Timeout receiving response for request" f" with sequence number: {sequence_number_tx}" ) raise return response