import re
import threading
from collections import namedtuple
from datetime import datetime
from functools import reduce
from typing import Any, List, NamedTuple, Tuple
from ska_ser_skallop.datatypes import attributes
from ska_ser_skallop.subscribing import base
[docs]def get_attr_value_as_str(attr: base.AttributeInt) -> str:
"""
transform a tango base.DeviceAttribute value into a string as determined by
its type (name)
"""
if not attr:
return ""
if isinstance(attr.value, str):
return attr.value
mapping = {
attr.name in ["obsState", "obsstate"]: attributes.ObsState,
attr.name.lower() in ["configuredband"]: attributes.ConfiguredBand,
attr.name
in [
"dishpointingstate",
"dishPointingstate",
"dishPointingState",
]: attributes.PointingState,
attr.name
in [
"pointingstate",
"pointingState",
]: attributes.DishMasterPointingState,
attr.name
in ["cspsubarrayobsstate", "cspSubarrayObsState"]: attributes.ObsState,
attr.name
in ["sdpSubarrayObsState", "sdpsubarrayobsstate"]: attributes.ObsState,
attr.name
in [
"mccsSubarrayObsState",
"mccssubarrayobsstate",
]: attributes.ObsState,
attr.name in ["dishMode", "dishmode"]: attributes.DishMode,
attr.name in ["healthState", "healthstate"]: attributes.DishHealth,
attr.name in ["adminMode", "adminmode"]: attributes.AdminMode,
attr.name in ["powerState", "powerstate"]: attributes.DishMasterPowerState,
attr.name
in [
"observingState",
"observingstate",
]: attributes.DishMasterObservingState,
attr.name in ["State", "state"]: attributes.DevState,
}
# TODO add extractions for other types of attributes
if any(mapping.keys()):
return mapping[True](attr.value).name
return str(attr.value)
[docs]def get_device_name(event: base.EventDataInt) -> str:
"""returns the tango device owning the event"""
return event.device.name()
[docs]def get_attr_name(event: base.EventDataInt) -> str:
"""returns the event attribute for which the value haven been set"""
if event.attr_value is None:
return re.search(r"\w*(?<=$)", event.attr_name).group(0)
return event.attr_value.name
[docs]def get_attr_value_str(event: base.EventDataInt) -> str:
"""returns the attribute value for an event as a string"""
if event.attr_value is None:
if event.err:
return str(event.errors)
return ""
return get_attr_value_as_str(event.attr_value)
[docs]def get_date_lodged(event: base.EventDataInt, init_date: datetime = None) -> datetime:
"""
returns the initial date an event was generated (if it exists). If it does not exist
a new date can either be injected as a parameter or generated at the time of call
"""
if event.attr_value is None:
if init_date:
return init_date
return datetime.now()
return event.attr_value.time.todatetime()
ProducerNameStr = str
AttributeStr = str
DateStr = str
ValueStr = str
[docs]def describe_event(
event: base.EventDataInt, init_date: datetime = datetime.now()
) -> Tuple[ProducerNameStr, AttributeStr, ValueStr, DateStr]:
"""
Return an event as a tuple of strings describing the event.
:return: an event as a tuple of strings describing the event.
"""
producer_name = get_device_name(event)
attr = get_attr_name(event)
date = get_date_lodged_isoformat(event, init_date)
value = get_attr_value_str(event)
return producer_name, attr, value, date
[docs]def unpack_event(
event: base.EventDataInt, init_date: datetime = datetime.now()
) -> Tuple[str, str, str, datetime]:
"""
returns a tuple of key attributes for an event as device name, attribute, value and
date lodged
"""
device_name = get_device_name(event)
attr = get_attr_name(event)
date = get_date_lodged(event, init_date)
value = get_attr_value_str(event)
return device_name, attr, value, date
TracerMessageType = Tuple[datetime, str]
[docs]class TracerMessage(NamedTuple):
time: datetime
message: str
[docs]class Tracer:
"""class used to record messages at specific events"""
def __init__(self, message: str = None) -> None:
if message is None:
self.messages: List[TracerMessage] = []
else:
self.messages = []
self.message(message)
[docs] def message(self, message: str) -> None:
tracer_message = TracerMessage(datetime.now(), message)
self.messages.append(tracer_message)
[docs] def print_messages(self) -> str:
str_messages = [f"{x.time.isoformat()}: {x.message}" for x in self.messages]
reduced = reduce(lambda x, y: f"{x}\n{y}", str_messages)
return f"\n{reduced}"
[docs]def print_tracers(tracers: List[Tracer]) -> str:
message_list = [t.messages for t in tracers]
if message_list:
messages = reduce(lambda x, y: x + y, message_list)
messages.sort(key=lambda m: m.time)
messages_zipped = [f"{item.time} {item.message}" for item in messages]
message_string = str(reduce(lambda x, y: f"{x}\n{y}", messages_zipped))
return message_string
return ""
LogMessage = namedtuple("LogMessage", ["time", "log", "label"])
[docs]class LogBook:
log_filer = "log"
def __init__(self):
self.messages: List[LogMessage] = []
self.logbook_lock = threading.Lock()
[docs] def log(self, message: str, timestamp: datetime = None, label=None):
with self.logbook_lock:
if timestamp is None:
timestamp = datetime.now()
logMessage = LogMessage(timestamp, message, label)
self.messages.append(logMessage)
[docs] def read(self, filter_log=True, log_filter_pattern="") -> str:
if filter_log:
messages = [m for m in self.messages if m.label != "log"]
else:
messages = self.messages
if log_filter_pattern:
pattern = re.compile(rf"{log_filter_pattern}")
messages = [
m
for m in self.messages
if (pattern.findall(m.log) or m.label != "log")
]
messages.sort(key=lambda log: log.time)
logs = [f"{m.time.isoformat():<30}{m.log}" for m in messages]
if logs:
return reduce(lambda x, y: f"{x}\n{y}", logs)
return "no logs generated"
[docs]def i_can_subscribe(
producer: base.Producer,
attr: str,
event_type: Any,
) -> bool:
try:
# sub_id = producer.subscribe_event(attr, event_type, 1)
# producer.unsubscribe_event(sub_id)
# TODO fix method to check correctly for subscribe
return True
except Exception:
return False