Source code for ska_ser_skallop.subscribing.helpers

import re
import threading
from collections import namedtuple
from datetime import datetime
from functools import reduce
from typing import Any, List, NamedTuple, Tuple

from ska_ser_skallop.datatypes import attributes
from ska_ser_skallop.subscribing import base


[docs]def get_attr_value_as_str(attr: base.AttributeInt) -> str: """ transform a tango base.DeviceAttribute value into a string as determined by its type (name) """ if not attr: return "" if isinstance(attr.value, str): return attr.value mapping = { attr.name in ["obsState", "obsstate"]: attributes.ObsState, attr.name.lower() in ["configuredband"]: attributes.ConfiguredBand, attr.name in [ "dishpointingstate", "dishPointingstate", "dishPointingState", ]: attributes.PointingState, attr.name in [ "pointingstate", "pointingState", ]: attributes.DishMasterPointingState, attr.name in ["cspsubarrayobsstate", "cspSubarrayObsState"]: attributes.ObsState, attr.name in ["sdpSubarrayObsState", "sdpsubarrayobsstate"]: attributes.ObsState, attr.name in [ "mccsSubarrayObsState", "mccssubarrayobsstate", ]: attributes.ObsState, attr.name in ["dishMode", "dishmode"]: attributes.DishMode, attr.name in ["healthState", "healthstate"]: attributes.DishHealth, attr.name in ["adminMode", "adminmode"]: attributes.AdminMode, attr.name in ["powerState", "powerstate"]: attributes.DishMasterPowerState, attr.name in [ "observingState", "observingstate", ]: attributes.DishMasterObservingState, attr.name in ["State", "state"]: attributes.DevState, } # TODO add extractions for other types of attributes if any(mapping.keys()): return mapping[True](attr.value).name return str(attr.value)
[docs]def get_device_name(event: base.EventDataInt) -> str: """returns the tango device owning the event""" return event.device.name()
[docs]def get_attr_name(event: base.EventDataInt) -> str: """returns the event attribute for which the value haven been set""" if event.attr_value is None: return re.search(r"\w*(?<=$)", event.attr_name).group(0) return event.attr_value.name
[docs]def get_attr_value_str(event: base.EventDataInt) -> str: """returns the attribute value for an event as a string""" if event.attr_value is None: if event.err: return str(event.errors) return "" return get_attr_value_as_str(event.attr_value)
[docs]def get_date_lodged(event: base.EventDataInt, init_date: datetime = None) -> datetime: """ returns the initial date an event was generated (if it exists). If it does not exist a new date can either be injected as a parameter or generated at the time of call """ if event.attr_value is None: if init_date: return init_date return datetime.now() return event.attr_value.time.todatetime()
[docs]def get_date_lodged_isoformat( event: base.EventDataInt, init_date: datetime = datetime.now() ) -> str: """renders the date for an event as an isoformated string""" if event.attr_value is None: return init_date.isoformat() return event.attr_value.time.isoformat()
ProducerNameStr = str AttributeStr = str DateStr = str ValueStr = str
[docs]def describe_event( event: base.EventDataInt, init_date: datetime = datetime.now() ) -> Tuple[ProducerNameStr, AttributeStr, ValueStr, DateStr]: """ Return an event as a tuple of strings describing the event. :return: an event as a tuple of strings describing the event. """ producer_name = get_device_name(event) attr = get_attr_name(event) date = get_date_lodged_isoformat(event, init_date) value = get_attr_value_str(event) return producer_name, attr, value, date
[docs]def unpack_event( event: base.EventDataInt, init_date: datetime = datetime.now() ) -> Tuple[str, str, str, datetime]: """ returns a tuple of key attributes for an event as device name, attribute, value and date lodged """ device_name = get_device_name(event) attr = get_attr_name(event) date = get_date_lodged(event, init_date) value = get_attr_value_str(event) return device_name, attr, value, date
TracerMessageType = Tuple[datetime, str]
[docs]class TracerMessage(NamedTuple): time: datetime message: str
[docs]class Tracer: """class used to record messages at specific events""" def __init__(self, message: str = None) -> None: if message is None: self.messages: List[TracerMessage] = [] else: self.messages = [] self.message(message)
[docs] def message(self, message: str) -> None: tracer_message = TracerMessage(datetime.now(), message) self.messages.append(tracer_message)
[docs] def print_messages(self) -> str: str_messages = [f"{x.time.isoformat()}: {x.message}" for x in self.messages] reduced = reduce(lambda x, y: f"{x}\n{y}", str_messages) return f"\n{reduced}"
LogMessage = namedtuple("LogMessage", ["time", "log", "label"])
[docs]class LogBook: log_filer = "log" def __init__(self): self.messages: List[LogMessage] = [] self.logbook_lock = threading.Lock()
[docs] def log(self, message: str, timestamp: datetime = None, label=None): with self.logbook_lock: if timestamp is None: timestamp = datetime.now() logMessage = LogMessage(timestamp, message, label) self.messages.append(logMessage)
[docs] def read(self, filter_log=True, log_filter_pattern="") -> str: if filter_log: messages = [m for m in self.messages if m.label != "log"] else: messages = self.messages if log_filter_pattern: pattern = re.compile(rf"{log_filter_pattern}") messages = [ m for m in self.messages if (pattern.findall(m.log) or m.label != "log") ] messages.sort(key=lambda log: log.time) logs = [f"{m.time.isoformat():<30}{m.log}" for m in messages] if logs: return reduce(lambda x, y: f"{x}\n{y}", logs) return "no logs generated"
[docs]def i_can_subscribe( producer: base.Producer, attr: str, event_type: Any, ) -> bool: try: # sub_id = producer.subscribe_event(attr, event_type, 1) # producer.unsubscribe_event(sub_id) # TODO fix method to check correctly for subscribe return True except Exception: return False