"""
Asynchronous cbf emulator runners.
"""
import asyncio
import logging
from copy import copy
from typing import Union
import ska_sdp_config
from ska_sdp_cbf_emulator.packetiser import SenderConfig, packetise
from ska_sdp_cbf_emulator.scan_monitor import SdpConfigScanMonitor
logger = logging.getLogger(__name__)
async def _aemulate_scans(
scan_monitor: SdpConfigScanMonitor,
config: SenderConfig,
):
"""
Asynchronously sends the scan queue of a provided scan monitor
"""
sending_task = None
def cancel_send():
if sending_task and not sending_task.done():
logger.info("cancelling current scan packetise")
sending_task.cancel()
while True:
try:
scan_target = await scan_monitor.pop_scan_target()
if scan_target is None:
break
logger.debug("popped scan %i", scan_target.scan_id)
cancel_send()
if scan_target.scan_id != 0:
logger.info("sending for scan target %s", scan_target)
# updated config with subarray addresses
scan_config = copy(config)
scan_config.scan_id = scan_target.scan_id
scan_config.transmission.target_host = scan_target.target_host
scan_config.transmission.target_port = scan_target.target_port
sending_task = asyncio.create_task(packetise(scan_config))
except Exception:
logger.exception("Exception raised in packetise loop")
cancel_send()
raise
if sending_task:
await sending_task
logger.info("sender finished")
[docs]
async def arun_cbf_emulator(
scan_monitor_or_exec_block_id: Union[SdpConfigScanMonitor, str],
config: SenderConfig,
):
"""
Asynchronously runs a scan monitor and scan sender until
the last packet is sent after the monitor detects the
execution block has been ended.
Either a ScanMonitor object or an Execution Block ID can be given as the
first parameter. In the latter case, a ScanMonitor object is automatically
created and used.
"""
if isinstance(scan_monitor_or_exec_block_id, str):
eb_id = scan_monitor_or_exec_block_id
scan_monitor = SdpConfigScanMonitor(eb_id, ska_sdp_config.Config(**config.sdp_config_db))
else:
scan_monitor = scan_monitor_or_exec_block_id
await asyncio.gather(
scan_monitor.arun(),
_aemulate_scans(scan_monitor, config),
)
logger.debug("scan monitor and sender done")