Source code for emulator_engine.services.api.router_client

"""API client class which sets up an API router and common routes for an IP block emulator."""

from typing import Any, Self

from emulator_engine import DEFAULT_QOS, RABBITMQ_HOST
from emulator_engine.services.messaging import api_queue_service
from fastapi import APIRouter, Response
from pika import BlockingConnection, ConnectionParameters

from ska_mid_cbf_emulators.common import EmulatorError, IdService, InternalRestRequest, InternalRestResponse, LoggingBase


[docs] class RouterClient(LoggingBase): """API client class which sets up an API router and common routes for an IP block emulator. Args: bitstream_emulator_id (:obj:`str`): The unique ID of the bitstream emulator to generate an API router for. ip_block_id (:obj:`str`): The unique ID of the IP block to generate an API router for. \ Default is the empty string (i.e. this client is for a top-level controller). """ def __init__(self: Self, bitstream_emulator_id: str, ip_block_id: str = '', *args, **kwargs) -> None: super().__init__() self._bitstream_emulator_id = bitstream_emulator_id self._ip_block_id = ip_block_id try: connection = BlockingConnection(ConnectionParameters(host=RABBITMQ_HOST, connection_attempts=5)) except Exception: self.log_error('Could not connect to RabbitMQ broker after 5 attempts.') raise EmulatorError('Could not connect to RabbitMQ broker after 5 attempts.') channel = connection.channel() channel.basic_qos(prefetch_count=DEFAULT_QOS) self._request_queue = IdService.api_request_queue_id(self._bitstream_emulator_id, self._ip_block_id) self._callback_queue = IdService.api_callback_queue_id(self._bitstream_emulator_id, self._ip_block_id) channel.queue_declare(queue=self._request_queue, durable=True) channel.queue_declare(queue=self._callback_queue, durable=True) connection.close() if len(self._ip_block_id): self._router = APIRouter( prefix=f'/{self._ip_block_id}', tags=[f'{self._ip_block_id}'], ) else: self._router = APIRouter( prefix='', tags=['controller'] ) self._setup(*args, **kwargs) self._initialize_common_routes() @property def router(self: Self) -> APIRouter: """:obj:`APIRouter`: The FastAPI Router used by this client.""" return self._router @router.setter def router(self: Self, new_router: APIRouter) -> None: self._router = new_router
[docs] def get_rpc_response(self: Self, request_body: InternalRestRequest) -> InternalRestResponse: """Sends an RPC request via RabbitMQ, then waits for and returns the response.""" return api_queue_service.ApiQueueService.get_rpc_response( request_queue=self._request_queue, callback_queue=self._callback_queue, request_body=request_body )
def _setup(self: Self, *args, **kwargs): return def _get_state_diagram(self: Self, contextOnly: bool = False) -> Response: """Get a state transition diagram for the current state of an IP block emulator. HTTP Method: `GET` Query Parameters: contextOnly (:obj:`bool`, optional): If False (default), the full emulator context (all known states and transitions) will be included. If True, only the context of the current state (the current state, the previous state, and any states accessible from the current state) will be included. """ rpc_response = self.get_rpc_response(InternalRestRequest( method_name='_get_state_diagram', http_method='GET', kwargs={'context_only': contextOnly} )) return Response( content=rpc_response.body, status_code=rpc_response.status, media_type='image/png' ) def _get_state(self: Self, response: Response) -> dict[str, Any]: """Get the current state of an IP block emulator. HTTP Method: `GET` """ rpc_response = self.get_rpc_response(InternalRestRequest( method_name='_get_state', http_method='GET', kwargs={} )) response.status_code = rpc_response.status return rpc_response.body def _initialize_common_routes(self: Self) -> None: """Initializes common routes.""" if len(self._ip_block_id): self.router.add_api_route('/state_diagram', self._get_state_diagram, methods=['GET']) self.router.add_api_route('/state', self._get_state, methods=['GET'])