Source code for ska_sdp_batchlet_plugins.consumers.csv_file

import os
import weakref
from datetime import datetime

from ska_sdp_batchlet.plugins.plugin import ConsumerPlugin


# pylint: disable=consider-using-with,too-few-public-methods
[docs] class CSVFile(ConsumerPlugin): """ Consumer to which writes events to a CSV file """ _TIME_STR_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" def __init__(self, file_path: str): file_real_path = os.path.realpath(file_path) os.makedirs(os.path.dirname(file_real_path), exist_ok=True) self.__csv_file = open(file_real_path, mode="w", encoding="utf-8") weakref.finalize(self, self.__csv_file.close) self.__header_written = False super().__init__()
[docs] def process(self, event: dict): if event.get("target") != CSVFile.__name__: return event = event["event"] event["core"] = "" event["timestamp"] = datetime.strptime( event["timestamp"], self._TIME_STR_FORMAT ).timestamp() if not self.__header_written: header = ",".join(map(str, event.keys())) + "\n" self.__csv_file.write(header) self.__header_written = True row = ",".join(map(str, event.values())) + "\n" self.__csv_file.write(row)