"""API client class which sets up an API router and common routes for an IP block emulator."""
from typing import Any, Self
from emulator_engine import DEFAULT_QOS, RABBITMQ_HOST
from emulator_engine.services.messaging import api_queue_service
from fastapi import APIRouter, Response
from pika import BlockingConnection, ConnectionParameters
from ska_mid_cbf_emulators.common import EmulatorError, IdService, InternalRestRequest, InternalRestResponse, LoggingBase
[docs]
class RouterClient(LoggingBase):
"""API client class which sets up an API router and common routes for an IP block emulator.
Args:
bitstream_emulator_id (:obj:`str`): The unique ID of the bitstream emulator to generate an API router for.
ip_block_id (:obj:`str`): The unique ID of the IP block to generate an API router for. \
Default is the empty string (i.e. this client is for a top-level controller).
"""
def __init__(self: Self, bitstream_emulator_id: str, ip_block_id: str = '', *args, **kwargs) -> None:
super().__init__()
self._bitstream_emulator_id = bitstream_emulator_id
self._ip_block_id = ip_block_id
try:
connection = BlockingConnection(ConnectionParameters(host=RABBITMQ_HOST, connection_attempts=5))
except Exception:
self.log_error('Could not connect to RabbitMQ broker after 5 attempts.')
raise EmulatorError('Could not connect to RabbitMQ broker after 5 attempts.')
channel = connection.channel()
channel.basic_qos(prefetch_count=DEFAULT_QOS)
self._request_queue = IdService.api_request_queue_id(self._bitstream_emulator_id, self._ip_block_id)
self._callback_queue = IdService.api_callback_queue_id(self._bitstream_emulator_id, self._ip_block_id)
channel.queue_declare(queue=self._request_queue, durable=True)
channel.queue_declare(queue=self._callback_queue, durable=True)
connection.close()
if len(self._ip_block_id):
self._router = APIRouter(
prefix=f'/{self._ip_block_id}',
tags=[f'{self._ip_block_id}'],
)
else:
self._router = APIRouter(
prefix='',
tags=['controller']
)
self._setup(*args, **kwargs)
self._initialize_common_routes()
@property
def router(self: Self) -> APIRouter:
""":obj:`APIRouter`: The FastAPI Router used by this client."""
return self._router
@router.setter
def router(self: Self, new_router: APIRouter) -> None:
self._router = new_router
[docs]
def get_rpc_response(self: Self, request_body: InternalRestRequest) -> InternalRestResponse:
"""Sends an RPC request via RabbitMQ, then waits for and returns the response."""
return api_queue_service.ApiQueueService.get_rpc_response(
request_queue=self._request_queue,
callback_queue=self._callback_queue,
request_body=request_body
)
def _setup(self: Self, *args, **kwargs):
return
def _get_state_diagram(self: Self, contextOnly: bool = False) -> Response:
"""Get a state transition diagram for the current state of an IP block emulator.
HTTP Method:
`GET`
Query Parameters:
contextOnly (:obj:`bool`, optional): If False (default), the full emulator context (all known states and transitions)
will be included. If True, only the context of the current state (the current state, the previous state,
and any states accessible from the current state) will be included.
"""
rpc_response = self.get_rpc_response(InternalRestRequest(
method_name='_get_state_diagram',
http_method='GET',
kwargs={'context_only': contextOnly}
))
return Response(
content=rpc_response.body,
status_code=rpc_response.status,
media_type='image/png'
)
def _get_state(self: Self, response: Response) -> dict[str, Any]:
"""Get the current state of an IP block emulator.
HTTP Method:
`GET`
"""
rpc_response = self.get_rpc_response(InternalRestRequest(
method_name='_get_state',
http_method='GET',
kwargs={}
))
response.status_code = rpc_response.status
return rpc_response.body
def _initialize_common_routes(self: Self) -> None:
"""Initializes common routes."""
if len(self._ip_block_id):
self.router.add_api_route('/state_diagram', self._get_state_diagram, methods=['GET'])
self.router.add_api_route('/state', self._get_state, methods=['GET'])