"""
The ska_oso_oet.procedure.application module holds classes and functionality that
belong in the application layer of the OET. This layer holds the application
interface, delegating to objects in the domain layer for business rules and
actions.
"""
import collections
import dataclasses
import logging
import multiprocessing.context
import os
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple
from pubsub import pub
from ska_oso_oet import mptools
from ska_oso_oet.event import topics
from ska_oso_oet.procedure import domain
from ska_oso_oet.procedure.domain import EventMessage, ProcedureState
base_dir = os.path.dirname(os.path.realpath(__file__))
ABORT_SCRIPT = domain.FileSystemScript("file://" + base_dir + "/abort.py")
HISTORY_MAX_LENGTH = 10
DELETEABLE_STATES = [
domain.ProcedureState.COMPLETE,
domain.ProcedureState.FAILED,
domain.ProcedureState.STOPPED,
domain.ProcedureState.UNKNOWN,
]
LOGGER = logging.getLogger(__name__)
[docs]
@dataclasses.dataclass
class PrepareProcessCommand:
"""
PrepareProcessCommand is input argument dataclass for the
ScriptExecutionService prepare command. It holds all the information
required to load and prepare a Python script ready for execution.
"""
script: domain.ExecutableScript
init_args: domain.ProcedureInput
[docs]
@dataclasses.dataclass
class StartProcessCommand:
"""
StartProcessCommand is the input argument dataclass for the
ScriptExecutionService start command. It holds the references required to
start a prepared script process along with any late-binding runtime
arguments the script may require.
"""
process_uid: int
fn_name: str
run_args: domain.ProcedureInput
force_start: bool = False
[docs]
@dataclasses.dataclass
class StopProcessCommand:
"""
StopProcessCommand is the input argument dataclass for the
ScriptExecutionService Stop command. It holds the references required to
Stop a script process along with any late-binding runtime
arguments the script may require.
"""
process_uid: int
run_abort: bool
[docs]
@dataclasses.dataclass
class ProcedureHistory:
"""
ProcedureHistory is a non-functional dataclass holding execution history of
a Procedure spanning all transactions.
process_states: records time for each change of ProcedureState (list of
tuples where tuple contains the ProcedureState and time when state was
changed to)
stacktrace: None unless execution_error is True in which case stores
stacktrace from process
"""
[docs]
def __init__(
self,
process_states: Optional[List[Tuple[domain.ProcedureState, float]]] = None,
stacktrace=None,
):
if process_states is None:
process_states = []
self.process_states = process_states
self.stacktrace = stacktrace
def __eq__(self, other):
if not isinstance(other, ProcedureHistory):
return False
if (
self.process_states == other.process_states
and self.stacktrace == other.stacktrace
):
return True
return False
def __repr__(self):
p_history = ", ".join(
["({!s}, {!r})".format(s, t) for (s, t) in self.process_states]
)
return "<ProcessHistory(process_states=[{}], stacktrace={})>".format(
p_history, self.stacktrace
)
[docs]
@dataclasses.dataclass
class ArgCapture:
"""
ArgCapture is a struct to record function call and time of invocation.
"""
fn: str
fn_args: domain.ProcedureInput
time: float = None
[docs]
@dataclasses.dataclass
class ProcedureSummary:
"""
ProcedureSummary is a brief representation of a runtime Procedure. It
captures essential information required to describe a Procedure and to
distinguish it from other Procedures.
"""
id: int # pylint: disable=invalid-name
script: domain.ExecutableScript
script_args: List[ArgCapture]
history: ProcedureHistory
state: domain.ProcedureState
[docs]
class ScriptExecutionService:
"""
ScriptExecutionService provides the high-level interface and facade for
the script execution domain (i.e., the 'procedure' domain).
The interface is used to load and run Python scripts in their own
independent Python child process.
The shutdown method should be called to ensure cleanup of any
multiprocessing artefacts owned by this service.
"""
# defines which lifecycle event to announce when a lifecycle.statechange is received
# TODO rationalise procedure lifecycle events and topics for multi-run scripts
state_to_topic = {
ProcedureState.INITIALISING: topics.procedure.lifecycle.started,
ProcedureState.RUNNING: topics.procedure.lifecycle.started,
ProcedureState.COMPLETE: topics.procedure.lifecycle.complete,
ProcedureState.FAILED: topics.procedure.lifecycle.failed,
ProcedureState.STOPPED: topics.procedure.lifecycle.stopped,
}
[docs]
def __init__(
self,
mp_context: Optional[multiprocessing.context.BaseContext] = None,
abort_script: domain.ExecutableScript = ABORT_SCRIPT,
on_pubsub: Optional[List[Callable[[EventMessage], None]]] = None,
):
"""
Create a new ScriptExecutionService.
The .stop() method of this ScriptExecutionService can run a second
script once the current process has been terminated. By default, this
second script calls SubArrayNode.abort() to halt further activities
on the sub-array controlled by the terminated script. To run a
different script, define the script URI in the abort_script_uri
argument to this constructor.
:param mp_context: multiprocessing context to use or None for default
:param abort_script: post-termination script for two-phase abort
:param on_pubsub: callbacks to call when PUBSUB message is received
"""
callbacks = [self._update_state, self._update_stacktrace]
if on_pubsub:
callbacks.extend(on_pubsub)
self._process_manager = domain.ProcessManager(mp_context, callbacks)
self._abort_script = abort_script
self.states: Dict[int, domain.ProcedureState] = {}
self.script_args: Dict[int, List[ArgCapture]] = {}
self.scripts: Dict[int, domain.ExecutableScript] = {}
self.history: Dict[int, ProcedureHistory] = collections.defaultdict(
ProcedureHistory
)
self._state_updating = threading.RLock()
# pub.subscribe(self._update_state, topics.procedure.lifecycle.statechange)
# pub.subscribe(self._update_stacktrace, topics.procedure.lifecycle.stacktrace)
[docs]
def prepare(self, cmd: PrepareProcessCommand) -> ProcedureSummary:
"""
Load and prepare a Python script for execution, but do not commence
execution.
:param cmd: dataclass argument capturing the script identity and load
arguments
:return:
"""
pid = self._process_manager.create(cmd.script, init_args=cmd.init_args)
# this needs to be set here as create() will return before the ScriptWorker
# process has emitted CREATING event. Receipt of the CREATING event will
# set this state to CREATING again.
self.states[pid] = ProcedureState.CREATING
now = time.time()
self.scripts[pid] = cmd.script
self.script_args[pid] = [ArgCapture(fn="init", fn_args=cmd.init_args, time=now)]
self._prune_old_state()
return self._summarise(pid)
[docs]
def start(self, cmd: StartProcessCommand) -> ProcedureSummary:
"""
Start execution of a prepared procedure.
:param cmd: dataclass argument capturing the execution arguments
:return:
"""
self._process_manager.run(
cmd.process_uid,
call=cmd.fn_name,
run_args=cmd.run_args,
force_start=cmd.force_start,
)
self.script_args[cmd.process_uid].append(
ArgCapture(fn=cmd.fn_name, fn_args=cmd.run_args, time=time.time())
)
return self._summarise(cmd.process_uid)
[docs]
def summarise(self, pids: Optional[List[int]] = None) -> List[ProcedureSummary]:
"""
Return ProcedureSummary objects for Procedures with the requested IDs.
This method accepts an optional list of integers, representing the
Procedure IDs to summarise. If the pids is left undefined,
ProcedureSummary objects for all current Procedures will be returned.
:param pids: optional list of Procedure IDs to summarise.
:return: list of ProcedureSummary objects
"""
# freeze state to prevent mutation from events
with self._state_updating:
all_pids = self.states.keys()
if pids is None:
pids = all_pids
missing_pids = {p for p in pids if p not in all_pids}
if missing_pids:
raise ValueError(f"Process IDs not found: {missing_pids}")
return [self._summarise(pid) for pid in pids]
[docs]
def stop(self, cmd: StopProcessCommand) -> List[ProcedureSummary]:
"""
Stop execution of a running procedure, optionally running a
second script once the first process has terminated.
:param cmd: dataclass argument capturing the execution arguments
:return:
"""
self._process_manager.stop(cmd.process_uid)
# exit early if not instructed to run post-termination script
if not cmd.run_abort:
# Did not start a new process so return empty list
return []
# abort requires a subarray to target
subarray_id = self._get_subarray_id(cmd.process_uid)
# prepare second script
prepare_cmd = PrepareProcessCommand(
script=self._abort_script,
init_args=domain.ProcedureInput(subarray_id=subarray_id),
)
procedure_summary = self.prepare(prepare_cmd)
# wait for the script to be READY, then run it
self._wait_for_state(procedure_summary.id, ProcedureState.READY)
# start the second script
run_cmd = StartProcessCommand(
process_uid=procedure_summary.id,
fn_name="main",
run_args=domain.ProcedureInput(),
)
summary = self.start(run_cmd)
return [summary]
def shutdown(self):
self._process_manager.shutdown()
def _get_subarray_id(self, pid: int) -> int:
"""
Return a Subarray id for given procedure ID.
:param pid: Procedure ID to summarise
:return: subarray id
"""
procedure_summary = self._summarise(pid)
subarray_ids = {
arg_capture.fn_args.kwargs["subarray_id"]
for arg_capture in procedure_summary.script_args
if "subarray_id" in arg_capture.fn_args.kwargs
}
if not subarray_ids:
raise ValueError("Subarray ID not specified")
if len(subarray_ids) > 1:
raise ValueError("Multiple subarray IDs found")
return subarray_ids.pop()
def _summarise(self, pid: int) -> ProcedureSummary:
"""
Return a ProcedureSummary for the Procedure with the given ID.
CAUTION: do NOT modify the arguments! SES state is exposed here.
:param pid: Procedure ID to summarise
:return: ProcedureSummary
"""
with self._state_updating:
state = self.states[pid]
script = self.scripts[pid]
script_args = self.script_args[pid]
history = self.history[pid]
return ProcedureSummary(
id=pid,
script=script,
script_args=script_args,
history=history,
state=state,
)
def _prune_old_state(self):
"""
Remove the state associated with the oldest deletable Procedures so
that the state history remains below the history limit
HISTORY_MAX_LENGTH.
"""
# Delete oldest deletable procedure if procedure limit reached
with self._state_updating:
if len(self.states) > HISTORY_MAX_LENGTH:
lower_bound = len(self.states) - HISTORY_MAX_LENGTH
pids_to_consider = list(self.states.keys())[:lower_bound]
to_delete = {
old_pid
for old_pid in pids_to_consider
if self.states.get(old_pid, None) in DELETEABLE_STATES
}
for old_pid in to_delete:
del self.states[old_pid]
del self.history[old_pid]
del self.script_args[old_pid]
del self.scripts[old_pid]
def _update_state(self, event: EventMessage) -> None:
"""
Callback method that updates Procedure history whenever a message on
the procedure.lifecycle.statechange topic is received.
:param event: EventMessage to process
"""
payload = event.msg
if payload.get("topic", None) != "procedure.lifecycle.statechange":
return
pid = int(event.msg_src)
new_state = payload["kwargs"]["new_state"]
now = time.time()
with self._state_updating:
previous = self.states.get(pid, None)
self.states[pid] = new_state
self.history[pid].process_states.append((new_state, now))
# publish a legacy lifecycle status change event when appropriate
if new_state in self.state_to_topic:
pub.sendMessage(
self.state_to_topic[new_state],
msg_src=pid,
request_id=None,
result=self._summarise(pid),
)
# special case: there's no unique state to signify loading complete
if previous == ProcedureState.LOADING and new_state == ProcedureState.IDLE:
pub.sendMessage(
topics.procedure.lifecycle.created,
msg_src=pid,
request_id=None,
result=self._summarise(pid),
)
def _update_stacktrace(self, event: EventMessage) -> None:
"""
Callback method to record stacktrace event in the Procedure history
whenever a message on procedure.lifecycle.stacktrace is received.
:param event: EventMessage to process
"""
payload = event.msg
if payload.get("topic", None) != "procedure.lifecycle.stacktrace":
return
pid = int(event.msg_src)
with self._state_updating:
self.history[pid].stacktrace = event.msg["kwargs"]["stacktrace"]
def _wait_for_state(self, pid: int, state: ProcedureState, timeout=1.0, tick=0.01):
"""
A time-bound wait for a Procedure to reach the requested state.
:param pid: ID of Procedure to wait for
:param timeout: wait timeout, in seconds
:param tick: time between state checks, in seconds
"""
deadline = time.time() + timeout
sleep_secs = tick
while self.states.get(pid, None) != state and sleep_secs > 0:
time.sleep(sleep_secs)
sleep_secs = mptools._sleep_secs( # pylint: disable=protected-access
tick, deadline
)