- Module code
- ska_sdp_batchlet_plugins.consumers.csv_file
-
Source code for ska_sdp_batchlet_plugins.consumers.csv_file
import os
import weakref
from datetime import datetime
from ska_sdp_batchlet.plugins.plugin import ConsumerPlugin
# pylint: disable=consider-using-with,too-few-public-methods
[docs]
class CSVFile(ConsumerPlugin):
"""
Consumer to which writes events to a CSV file
"""
_TIME_STR_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, file_path: str):
file_real_path = os.path.realpath(file_path)
os.makedirs(os.path.dirname(file_real_path), exist_ok=True)
self.__csv_file = open(file_real_path, mode="w", encoding="utf-8")
weakref.finalize(self, self.__csv_file.close)
self.__header_written = False
super().__init__()
[docs]
def process(self, event: dict):
if event.get("target") != CSVFile.__name__:
return
event = event["event"]
event["core"] = ""
event["timestamp"] = datetime.strptime(
event["timestamp"], self._TIME_STR_FORMAT
).timestamp()
if not self.__header_written:
header = ",".join(map(str, event.keys())) + "\n"
self.__csv_file.write(header)
self.__header_written = True
row = ",".join(map(str, event.values())) + "\n"
self.__csv_file.write(row)