# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for main PST to DLM Transfer application."""
from __future__ import annotations
import argparse
import logging
import os
import pathlib
import sys
import threading
import traceback
from signal import SIGINT, SIGTERM, signal
from types import FrameType
from ska_pst.common.constants import SKA_PST_SUBSYSTEM_IDS
from ska_ser_logging import configure_logging
from ska_pst.common import get_telescope_config_by_pst_subsystem
from .constants import DIR_PERMS
from .processing_context import ProcessingContext
from .scan_manager import ScanManager
[docs]class DlmTransfer:
"""Class to manage the main execution loop of the PST to DLM transfer."""
def __init__(
self: DlmTransfer,
ctx: ProcessingContext,
file_event_timeout: float = 10.0,
# this makes it easier for testing!
scan_manager: ScanManager | None = None,
logger: logging.Logger | None = None,
verbose: bool = False,
) -> None:
"""
Initialise an DlmTransfer object.
:param ctx: the processing context for the DLM transfer
:type ctx: ProcessingContext
:param file_event_timeout: the timeout to wait for a new file to arrive in seconds.
This value is used to stop creating a spin lock waiting for new
files to arrive. It is used to wait on the ``ctx`` to no longer be
in an processing state. Defaults to 10.0 seconds.
:type file_event_timeout: float, optional
:param scan_manager: the scan manager to use with the DLM transfer, defaults to None
:type scan_manager: ScanManager | None, optional
:param logger: the logger instance to use, defaults to None
:type logger: logging.Logger | None, optional
:param verbose: verbosity flag for logging, if true use logging.DEBUG.
defaults to False
:type verbose: bool, optional
"""
logging_level = logging.DEBUG if verbose else logging.INFO
configure_logging(level=logging_level)
self.logger = logger or logging.getLogger(__name__)
self._file_event_timeout = file_event_timeout
self._ctx = ctx
self._scan_manager = scan_manager or ScanManager(
ctx=self._ctx,
logger=self.logger,
)
[docs] def interrupt_processing(self: DlmTransfer) -> None:
"""Interrupt the processing and transferring of the scan."""
self._ctx.stop_processing()
[docs] def process(self: DlmTransfer) -> None: # noqa: C901 - override complexity
"""Primary processing method for the PST to DLM transfer."""
self.logger.debug(f"processing_context={self._ctx}")
while not self._ctx.wait_for(timeout=self._file_event_timeout):
try:
# Note - scan manager will stop iterating if ctx.stop_processing is called
for scan in self._scan_manager:
scan.process()
except Exception:
self.logger.warning(f"Error in processing {scan=}.", exc_info=True)
# this will set the scan as invalid and be skipped from future processing.
scan.processing_failed = True
self.logger.debug("DlmTransfer process has been interrupted and now exiting")
def check_path_permissions(path_under_test: pathlib.Path, logger: logging.Logger) -> None:
"""Confirm source and destination base paths has the correct permissions."""
paths_with_errors = []
path_under_test.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True)
if os.access(path_under_test, os.R_OK):
logger.debug(f"Read permission is granted for file: {path_under_test.name}")
else:
logger.error(f"Read permission is not granted for file: {path_under_test.name}")
paths_with_errors.append(path_under_test.name)
if os.access(path_under_test, os.W_OK):
logger.debug(f"Write permission is granted for file: {path_under_test.name}")
else:
logger.error(f"Write permission is not granted for file: {path_under_test.name}")
paths_with_errors.append(path_under_test.name)
if paths_with_errors:
raise PermissionError(f"Permission deviation detected: {', '.join(paths_with_errors)}")
def main(arg_list: list[str] | None = None) -> None:
"""
Parse command line arguments and execute the main processing loop.
:param arg_list: a list of arguments, defaults to None.
Python's argparse will use this list if set else it uses the command line
arguments. Setting this list is used in unit testing.
:type arg_list: list[str] | None, optional
"""
# do arg parsing here
p = argparse.ArgumentParser()
p.add_argument(
"local_path",
type=pathlib.Path,
help="local/source filesystem path in which PST data products are written by DSP",
)
p.add_argument(
"staging_path",
type=pathlib.Path,
help="local/staging filesystem path in which PST data products are prepared",
)
p.add_argument(
"dlm_path",
type=pathlib.Path,
help="local/dest filesystem path in which PST data products are made available to DLM",
)
p.add_argument("ska_subsystem", type=str, choices=SKA_PST_SUBSYSTEM_IDS, help="ska-subsystem")
p.add_argument(
"--scan-timeout",
type=float,
help="time out, in seconds, to mark scan as being inactive. Default is 300s",
default=300.0,
)
p.add_argument("-v", "--verbose", action="store_true")
# Passing a list to parse_args allows for unit testing, when None it checks the
# command line arguments.
args = p.parse_args(arg_list)
logging_level = logging.DEBUG if args.verbose else logging.INFO
configure_logging(level=logging_level)
logger = logging.getLogger(__name__)
# Check permissions for local and remote paths
try:
check_path_permissions(args.local_path, logger)
check_path_permissions(args.staging_path, logger)
check_path_permissions(args.dlm_path, logger)
except PermissionError as e:
sys.stderr.write(f"ERROR: {e}\n")
sys.exit(2)
telescope_config = get_telescope_config_by_pst_subsystem(args.ska_subsystem)
ctx = ProcessingContext(
telescope_config=telescope_config,
local_path=args.local_path,
staging_path=args.staging_path,
dlm_path=args.dlm_path,
scan_timeout=args.scan_timeout,
subsystem_id=args.ska_subsystem,
stop_processing_evt=threading.Event(),
)
dlm_transfer = DlmTransfer(ctx=ctx, logger=logger)
# handle SIGINT gracefully to prevent partially transferred files
def signal_handler(signal: int, frame: FrameType | None) -> None:
sys.stderr.write("CTRL + C pressed\n")
dlm_transfer.interrupt_processing()
signal(SIGINT, signal_handler)
signal(SIGTERM, signal_handler)
try:
dlm_transfer.process()
sys.exit(0)
except Exception:
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()