Source code for ska_sdp_cbf_emulator.emulator

"""
Asynchronous cbf emulator runners.
"""

import asyncio
import logging
from copy import copy
from typing import Union

import ska_sdp_config

from ska_sdp_cbf_emulator.packetiser import SenderConfig, packetise
from ska_sdp_cbf_emulator.scan_monitor import SdpConfigScanMonitor

logger = logging.getLogger(__name__)


async def _aemulate_scans(
    scan_monitor: SdpConfigScanMonitor,
    config: SenderConfig,
):
    """
    Asynchronously sends the scan queue of a provided scan monitor
    """
    sending_task = None

    def cancel_send():
        if sending_task and not sending_task.done():
            logger.info("cancelling current scan packetise")
            sending_task.cancel()

    while True:
        try:
            scan_target = await scan_monitor.pop_scan_target()
            if scan_target is None:
                break

            logger.debug("popped scan %i", scan_target.scan_id)
            cancel_send()
            if scan_target.scan_id != 0:
                logger.info("sending for scan target %s", scan_target)

                # updated config with subarray addresses
                scan_config = copy(config)
                scan_config.scan_id = scan_target.scan_id
                scan_config.transmission.target_host = scan_target.target_host
                scan_config.transmission.target_port = scan_target.target_port
                sending_task = asyncio.create_task(packetise(scan_config))
        except Exception:
            logger.exception("Exception raised in packetise loop")
            cancel_send()
            raise
    if sending_task:
        await sending_task
    logger.info("sender finished")


[docs] async def arun_cbf_emulator( scan_monitor_or_exec_block_id: Union[SdpConfigScanMonitor, str], config: SenderConfig, ): """ Asynchronously runs a scan monitor and scan sender until the last packet is sent after the monitor detects the execution block has been ended. Either a ScanMonitor object or an Execution Block ID can be given as the first parameter. In the latter case, a ScanMonitor object is automatically created and used. """ if isinstance(scan_monitor_or_exec_block_id, str): eb_id = scan_monitor_or_exec_block_id scan_monitor = SdpConfigScanMonitor(eb_id, ska_sdp_config.Config(**config.sdp_config_db)) else: scan_monitor = scan_monitor_or_exec_block_id await asyncio.gather( scan_monitor.arun(), _aemulate_scans(scan_monitor, config), ) logger.debug("scan monitor and sender done")