Source code for ska_pst.send.dlm_transfer

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for main PST to DLM Transfer application."""

from __future__ import annotations

import argparse
import logging
import os
import pathlib
import sys
import threading
import traceback
from signal import SIGINT, SIGTERM, signal
from types import FrameType

from ska_pst.common.constants import SKA_PST_SUBSYSTEM_IDS
from ska_ser_logging import configure_logging

from ska_pst.common import get_telescope_config_by_pst_subsystem

from .constants import DIR_PERMS
from .processing_context import ProcessingContext
from .scan_manager import ScanManager


[docs]class DlmTransfer: """Class to manage the main execution loop of the PST to DLM transfer.""" def __init__( self: DlmTransfer, ctx: ProcessingContext, file_event_timeout: float = 10.0, # this makes it easier for testing! scan_manager: ScanManager | None = None, logger: logging.Logger | None = None, verbose: bool = False, ) -> None: """ Initialise an DlmTransfer object. :param ctx: the processing context for the DLM transfer :type ctx: ProcessingContext :param file_event_timeout: the timeout to wait for a new file to arrive in seconds. This value is used to stop creating a spin lock waiting for new files to arrive. It is used to wait on the ``ctx`` to no longer be in an processing state. Defaults to 10.0 seconds. :type file_event_timeout: float, optional :param scan_manager: the scan manager to use with the DLM transfer, defaults to None :type scan_manager: ScanManager | None, optional :param logger: the logger instance to use, defaults to None :type logger: logging.Logger | None, optional :param verbose: verbosity flag for logging, if true use logging.DEBUG. defaults to False :type verbose: bool, optional """ logging_level = logging.DEBUG if verbose else logging.INFO configure_logging(level=logging_level) self.logger = logger or logging.getLogger(__name__) self._file_event_timeout = file_event_timeout self._ctx = ctx self._scan_manager = scan_manager or ScanManager( ctx=self._ctx, logger=self.logger, )
[docs] def interrupt_processing(self: DlmTransfer) -> None: """Interrupt the processing and transferring of the scan.""" self._ctx.stop_processing()
[docs] def process(self: DlmTransfer) -> None: # noqa: C901 - override complexity """Primary processing method for the PST to DLM transfer.""" self.logger.debug(f"processing_context={self._ctx}") while not self._ctx.wait_for(timeout=self._file_event_timeout): try: # Note - scan manager will stop iterating if ctx.stop_processing is called for scan in self._scan_manager: scan.process() except Exception: self.logger.warning(f"Error in processing {scan=}.", exc_info=True) # this will set the scan as invalid and be skipped from future processing. scan.processing_failed = True self.logger.debug("DlmTransfer process has been interrupted and now exiting")
def check_path_permissions(path_under_test: pathlib.Path, logger: logging.Logger) -> None: """Confirm source and destination base paths has the correct permissions.""" paths_with_errors = [] path_under_test.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) if os.access(path_under_test, os.R_OK): logger.debug(f"Read permission is granted for file: {path_under_test.name}") else: logger.error(f"Read permission is not granted for file: {path_under_test.name}") paths_with_errors.append(path_under_test.name) if os.access(path_under_test, os.W_OK): logger.debug(f"Write permission is granted for file: {path_under_test.name}") else: logger.error(f"Write permission is not granted for file: {path_under_test.name}") paths_with_errors.append(path_under_test.name) if paths_with_errors: raise PermissionError(f"Permission deviation detected: {', '.join(paths_with_errors)}") def main(arg_list: list[str] | None = None) -> None: """ Parse command line arguments and execute the main processing loop. :param arg_list: a list of arguments, defaults to None. Python's argparse will use this list if set else it uses the command line arguments. Setting this list is used in unit testing. :type arg_list: list[str] | None, optional """ # do arg parsing here p = argparse.ArgumentParser() p.add_argument( "local_path", type=pathlib.Path, help="local/source filesystem path in which PST data products are written by DSP", ) p.add_argument( "staging_path", type=pathlib.Path, help="local/staging filesystem path in which PST data products are prepared", ) p.add_argument( "dlm_path", type=pathlib.Path, help="local/dest filesystem path in which PST data products are made available to DLM", ) p.add_argument("ska_subsystem", type=str, choices=SKA_PST_SUBSYSTEM_IDS, help="ska-subsystem") p.add_argument( "--scan-timeout", type=float, help="time out, in seconds, to mark scan as being inactive. Default is 300s", default=300.0, ) p.add_argument("-v", "--verbose", action="store_true") # Passing a list to parse_args allows for unit testing, when None it checks the # command line arguments. args = p.parse_args(arg_list) logging_level = logging.DEBUG if args.verbose else logging.INFO configure_logging(level=logging_level) logger = logging.getLogger(__name__) # Check permissions for local and remote paths try: check_path_permissions(args.local_path, logger) check_path_permissions(args.staging_path, logger) check_path_permissions(args.dlm_path, logger) except PermissionError as e: sys.stderr.write(f"ERROR: {e}\n") sys.exit(2) telescope_config = get_telescope_config_by_pst_subsystem(args.ska_subsystem) ctx = ProcessingContext( telescope_config=telescope_config, local_path=args.local_path, staging_path=args.staging_path, dlm_path=args.dlm_path, scan_timeout=args.scan_timeout, subsystem_id=args.ska_subsystem, stop_processing_evt=threading.Event(), ) dlm_transfer = DlmTransfer(ctx=ctx, logger=logger) # handle SIGINT gracefully to prevent partially transferred files def signal_handler(signal: int, frame: FrameType | None) -> None: sys.stderr.write("CTRL + C pressed\n") dlm_transfer.interrupt_processing() signal(SIGINT, signal_handler) signal(SIGTERM, signal_handler) try: dlm_transfer.process() sys.exit(0) except Exception: traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()