# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for main PST to DLM Transfer application."""
from __future__ import annotations
import argparse
import logging
import os
import pathlib
import sys
import threading
import traceback
from signal import SIGINT, SIGTERM, signal
from types import FrameType
from typing import List
from ska_ser_logging import configure_logging
from .constants import DIR_PERMS
from .scan import ProcessingContext
from .scan_manager import ScanManager
[docs]class DlmTransfer:
"""Class to manage the main execution loop of the PST to DLM transfer."""
def __init__(
self: DlmTransfer,
ctx: ProcessingContext,
file_event_timeout: float = 10.0,
# this makes it easier for testing!
scan_manager: ScanManager | None = None,
logger: logging.Logger | None = None,
verbose: bool = False,
) -> None:
"""
Initialise an DlmTransfer object.
:param ctx: the processing context for the DLM transfer
:type ctx: ProcessingContext
:param file_event_timeout: the timeout to wait for a new file to arrive in seconds.
This value is used to stop creating a spin lock waiting for new
files to arrive. It is used to wait on the ``ctx`` to no longer be
in an processing state. Defaults to 10.0 seconds.
:type file_event_timeout: float, optional
:param scan_manager: the scan manager to use with the DLM transfer, defaults to None
:type scan_manager: ScanManager | None, optional
:param logger: the logger instance to use, defaults to None
:type logger: logging.Logger | None, optional
:param verbose: verbosity flag for logging, if true use logging.DEBUG.
defaults to False
:type verbose: bool, optional
"""
logging_level = logging.DEBUG if verbose else logging.INFO
configure_logging(level=logging_level)
self.logger = logger or logging.getLogger(__name__)
self._file_event_timeout = file_event_timeout
self._ctx = ctx
self._scan_manager = scan_manager or ScanManager(
ctx=self._ctx,
logger=self.logger,
)
[docs] def interrupt_processing(self: DlmTransfer) -> None:
"""Interrupt the processing and transferring of the scan."""
self._ctx.stop_processing()
[docs] def process(self: DlmTransfer) -> None: # noqa: C901 - override complexity
"""Primary processing method for the PST to DLM transfer."""
self.logger.debug(f"processing_context={self._ctx}")
while not self._ctx.wait_for(timeout=self._file_event_timeout):
try:
# Note - scan manager will stop iterating if ctx.stop_processing is called
for scan in self._scan_manager:
scan.process()
except Exception:
self.logger.warning(f"Error in processing {scan=}.", exc_info=True)
# this will set the scan as invalid and be skipped from future processing.
scan.processing_failed = True
self.logger.debug("DlmTransfer process has been interrupted and now exiting")
def check_path_permissions(path_under_test: pathlib.Path, logger: logging.Logger) -> None:
"""Confirm source and destination base paths has the correct permissions."""
paths_with_errors = []
path_under_test.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True)
if os.access(path_under_test, os.R_OK):
logger.debug(f"Read permission is granted for file: {path_under_test.name}")
else:
logger.error(f"Read permission is not granted for file: {path_under_test.name}")
paths_with_errors.append(path_under_test.name)
if os.access(path_under_test, os.W_OK):
logger.debug(f"Write permission is granted for file: {path_under_test.name}")
else:
logger.error(f"Write permission is not granted for file: {path_under_test.name}")
paths_with_errors.append(path_under_test.name)
if paths_with_errors:
raise PermissionError(f"Permission deviation detected: {', '.join(paths_with_errors)}")
def main(arg_list: List[str] | None = None) -> None:
"""Parse command line arguments and execute the main processing loop."""
# do arg parsing here
p = argparse.ArgumentParser()
p.add_argument(
"local_path",
type=pathlib.Path,
help="local/source filesystem path in which PST data products are written by DSP",
)
p.add_argument(
"staging_path",
type=pathlib.Path,
help="local/staging filesystem path in which PST data products are prepared",
)
p.add_argument(
"dlm_path",
type=pathlib.Path,
help="local/dest filesystem path in which PST data products are made available to DLM",
)
p.add_argument("ska_subsystem", type=str, default="pst-low", help="ska-subsystem")
p.add_argument(
"--scan-timeout",
type=float,
help="time out, in seconds, to mark scan as being inactive. Default is 300s",
default=300.0,
)
p.add_argument("-v", "--verbose", action="store_true")
# Didn't know we could pass a list to parse_args, this allows for testing
args = vars(p.parse_args(arg_list))
logging_level = logging.DEBUG if args["verbose"] else logging.INFO
configure_logging(level=logging_level)
logger = logging.getLogger(__name__)
# Check permissions for local and remote paths
try:
check_path_permissions(args["local_path"], logger)
check_path_permissions(args["staging_path"], logger)
check_path_permissions(args["dlm_path"], logger)
except PermissionError as e:
sys.stderr.write(f"ERROR: {e}\n")
sys.exit(2)
ctx = ProcessingContext(
local_path=args["local_path"],
staging_path=args["staging_path"],
dlm_path=args["dlm_path"],
scan_timeout=args["scan_timeout"],
subsystem_id=args["ska_subsystem"],
stop_processing_evt=threading.Event(),
)
dlm_transfer = DlmTransfer(ctx=ctx, logger=logger)
# handle SIGINT gracefully to prevent partially transferred files
def signal_handler(signal: int, frame: FrameType | None) -> None:
sys.stderr.write("CTRL + C pressed\n")
dlm_transfer.interrupt_processing()
signal(SIGINT, signal_handler)
signal(SIGTERM, signal_handler)
try:
dlm_transfer.process()
sys.exit(0)
except Exception:
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()