SKA PST STAT Architecture

Classes

The following diagram below shows the classes involved in the core software architecture of the SKA PST STAT component.

@startuml PST Stat Class Diagram
allow_mixing

package ska::pst::common
{
  class ApplicationManager <<abstract>>
  class FileSegmentProducer
}

package ska::pst::smrb
{
  class SmrbSegmentProducer
}

class StatApplicationManager {
  + StatApplicationManager()
  + ~StatApplicationManager()
  + perform_configure_beam(config: AsciiHeader)
  + perform_configure_scan(config: AsciiHeader)
  + perform_start_scan()
  + perform_scan()
  + perform_end_scan()
  + perform_deconfigure_scan()
  + perform_deconfigure_beam()
  + set_timeout()
  - processor: std::unique_ptr<StatProcessor>
  - producer: std::shared_ptr<SmrbSegmentProducer>
}

class FileProcessor {
  + FileProcessor(config: AsciiHeader)
  + ~FileProcessor()
  + process()
  - processor: std::shared_ptr<StatProcessor>
  - reader: std::unique_ptr<FileSegmentProducer>
}

class StatProcessor {
  + StatProcessor(data_config: AsciiHeader, weights_config: AsciiHeader)
  + ~StatProcessor()
  + add_publisher(std::unique_ptr<StatPublisher>)
  + bool validate_segment(segment: SegmentProducer::Segment)
  + bool process(segment: SegmentProducer::Segment)
  + interrupt()
  - storage: std::shared_ptr<StatStorage>
  - computer: std::unique_ptr<StatComputer>
  - publishers: std::vector<std::unique_ptr<StatPublisher>>
  - config: AsciiHeader
}

class StatComputer {
  + StatComputer(data_config: AsciiHeader, weights_config: AsciiHeader, storage: StatStorage)
  + ~StatComputer()
  + bool compute(segment: SegmentProducer::Segment)
  + initialise()
  + interrupt()
  - storage: std::shared_ptr<StatStorage>
  - config: AsciiHeader
}

class StatStorage {
  + StatStorage(config: AsciiHeader)
  + resize(uint32_t ntime_bins, uint32_t nfreq_bins)
  + reset()
  + uint32_t get_npol() const
  + uint32_t get_ndim() const
  + uint32_t get_nchan() const
  + uint32_t get_nbin() const
  + uint32_t get_nrebin() const
  + uint32_t get_ntime_bins() const
  + uint32_t get_nfreq_bins() const
  + uint32_t get_ntime_vals() const
  + double get_total_sample_time() const
  + void set_total_sample_time(double)
  + bool is_storage_resized() const
  + bool is_storage_reset() const
  + mean_frequency_avg: std::vector<std::vector<float>>
  + mean_frequency_avg_rfi_excised: std::vector<std::vector<float>>
  + variance_frequency_avg: std::vector<std::vector<float>>
  + variance_frequency_avg_rfi_excised: std::vector<std::vector<float>>
  + mean_spectrum: std::vector<std::vector<std::vector<float>>>
  + variance_spectrum: std::vector<std::vector<std::vector<float>>>
  + mean_spectral_power: std::vector<std::vector<float>>
  + maximum_spectral_power: std::vector<std::vector<float>>
  + histogram_1d_freq_avg: std::vector<std::vector<std::vector<uint32_t>>>
  + histogram_1d_freq_avg_rfi_excised: std::vector<std::vector<std::vector<uint32_t>>>
  + rebinned_histogram_2d_freq_avg: std::vector<std::vector<std::vector<uint32_t>>>
  + rebinned_histogram_2d_freq_avg_rfi_excised: std::vector<std::vector<std::vector<uint32_t>>>
  + rebinned_histogram_1d_freq_avg: std::vector<std::vector<std::vector<uint32_t>>>
  + rebinned_histogram_1d_freq_avg_rfi_excised: std::vector<std::vector<std::vector<uint32_t>>>
  + num_clipped_samples_spectrum: std::vector<std::vector<std::vector<uint32_t>>>
  + num_clipped_samples: std::vector<std::vector<uint32_t>>
  + num_clipped_samples_rfi_excised: std::vector<std::vector<uint32_t>>
  + spectrogram: std::vector<std::vector<std::vector<float>>>
  + timeseries: std::vector<std::vector<std::vector<float>>>
  + timeseries_rfi_excised: std::vector<std::vector<std::vector<float>>>
  + rfi_mask_lut: std::vector<bool>
  + scalar_stats_t : struct
  + min_weights: std::vector<float>
  + max_weights: std::vector<float>
  + mean_weights: std::vector<float>
}

class StatPublisher <<abstract>> {
  # config: AsciiHeader
  + StatPublisher(config: AsciiHeader)
  + ~StatPublisher()
  {abstract} + publish(std::shared_ptr<StatStorage>)
}

class ScalarStatPublisher implements StatPublisher {
  + ScalarStatPublisher(config: AsciiHeader)
  + ~ScalarStatPublisher()
  + publish(std::shared_ptr<StatStorage>)
  + reset()
  + scalar_stats_t get_scalar_stats();
}

class StatHdf5FileWriter implements StatPublisher {
  + StatHdf5FileWriter(config: AsciiHeader)
  + ~StatHdf5FileWriter()
  + publish(std::shared_ptr<StatStorage>)
}

StatProcessor *-- StatComputer
StatProcessor *-- StatPublisher
StatProcessor o-- StatStorage
StatComputer o-- StatStorage
StatPublisher o-- StatStorage

ApplicationManager <|-- StatApplicationManager
StatApplicationManager o-- SmrbSegmentProducer
StatApplicationManager *-- StatProcessor

FileProcessor *-- StatProcessor
FileProcessor o- FileSegmentProducer

component ska_pst_stat_core
ska_pst_stat_core -- "uses" StatApplicationManager
component ska_pst_stat_file_processor
ska_pst_stat_file_processor -- "uses" FileProcessor

@enduml

Class diagram showing main classes involved

@startuml
abstract StatPublisher {
  + {abstract} publish(stat_storage: StatStorage)
}

abstract class StatKafkaProducer {
  + send(topic: string, payload: string)
}

class CppKafkaProducer {
  + send(topic: string, payload: string)
}

class StatKafkaProducerFactory {
  + create_producer(KafkaConnectionDetails) : StatKafkaProducer
  + register_producer_scheme(scheme: string, StatKafkaProducer_FuncType)
  - producers: map<string, StatKafkaProducer>
  - producer_func_ptrs: map<str, StatKafkaProducer_FuncType>
}

class StatApplicationManager {
  - producer_factory: StatKafkaProducerFactory
  # perform_scan()
}

class StatProcessor {
  + add_publisher(StatPublisher)
  - publishers: vector<StatPublisher>
}

StatApplicationManager o-- StatProcessor

class StatKafkaPublisher {
  + StatKafkaPublisher(config: AsciiHeader, producer_factory: StatKafkaProducerFactory)
  + publish(stat_storage: StatStorage)
  + get_msg_senders() : map<string, StatKafkaMessageSender>
  - msg_senders : map<string, StatKafkaMessageSender>
  - producer_factory: StatKafkaProducerFactory
}

class StatKafkaMessageSender {
}

StatPublisher <|-- StatKafkaPublisher
StatKafkaProducer <|-- CppKafkaProducer
StatKafkaPublisher o-- StatKafkaMessageSender
StatProcessor --> StatKafkaPublisher
StatApplicationManager o-- StatKafkaProducerFactory
StatKafkaProducerFactory ..> StatKafkaProducer : constructs
StatApplicationManager ..> StatKafkaPublisher : constructs
StatKafkaPublisher --> StatKafkaProducerFactory

note top of StatKafkaProducerFactory : Used to construct and cache Kafka producers
note "Existing" as N1
note "Publisher for all Kafka related stats" as N2

StatApplicationManager .. N1
N1 .. StatPublisher
N1 .. StatProcessor
N2 .. StatKafkaPublisher

@enduml

Class diagram showing StatApplicationManager and relationship with StatKafkaPublisher

@startuml

abstract class StatKafkaMsgGenerator {
  + {abstract} generate_msg(StatStorage) : string
}

class StatMsgPackGenerator {
  + StatMsgPackGenerator(config: AsciiHeader, strategy: StatMsgPackStrategy, dashboard_type: string)
  + generate_msg(StatStorage) : string
  + get_strategy() : StatMsgPackStrategy
  + get_dashboard_type() : string
  + {static} create_msg_pack_generator(config: AsciiHeader, dashboard_type: string) : StatMsgPackGenerator
  - config: AsciiHeader
  - strategy: StatMsgPackStrategy
  - dashboard_type: string
}

abstract class StatMsgPackStrategy {
  + pack_data_type(Packer, StatStorage, AsciiHeader);
  + pack_metadata(Packer, StatStorage, AsciiHeader);
  + pack_data(Packer, StatStorage, AsciiHeader);
}

class StatTimeseriesMsgPackStrategy {
  + pack_data_type(Packer, StatStorage, AsciiHeader);
  + pack_metadata(Packer, StatStorage, AsciiHeader);
  + pack_data(Packer, StatStorage, AsciiHeader);
}

class StatBandpassMsgPackStrategy {
  + pack_data_type(Packer, StatStorage, AsciiHeader);
  + pack_metadata(Packer, StatStorage, AsciiHeader);
  + pack_data(Packer, StatStorage, AsciiHeader);
}

class StatHistogramMsgPackStrategy {
  + pack_data_type(Packer, StatStorage, AsciiHeader);
  + pack_metadata(Packer, StatStorage, AsciiHeader);
  + pack_data(Packer, StatStorage, AsciiHeader);
}

class StatKafkaMessageSender {
   + StatKafkaMessageSender(topic: string, producer: StatKafkaProducer, generator: StatKafkaMsgGenerator)
   + send_stats(StatStorage)
   + get_topic() : string
   + get_generator() : StatKafkaMsgGenerator
   - topic : string
   - producer: StatKafkaProducer
   - msg_generator: StatKafkaMsgGenerator
}

StatMsgPackGenerator o-- StatMsgPackStrategy
StatMsgPackStrategy <|-- StatTimeseriesMsgPackStrategy
StatMsgPackStrategy <|-- StatBandpassMsgPackStrategy
StatMsgPackStrategy <|-- StatHistogramMsgPackStrategy
StatKafkaMessageSender o-- StatKafkaMsgGenerator
StatKafkaMsgGenerator <|-- StatMsgPackGenerator

note left of StatMsgPackGenerator : Used to map StatStorage to MsgPack encoded messages
note left of StatMsgPackStrategy : Used pack the data_type, metadata, and data values of messages
note "Used for a particular message type and topic" as N2

N2 .. StatKafkaMessageSender

@enduml

More detailed class diagram showing StatKafkaMessageSender and associated classes

StatProcessor

This is the core class to handle the processing of voltage data. It has been designed to work on data that is either coming from shared memory ring buffers during a scan or via memory mapped (mmap) files.

Applications, such as ska_pst_stat_core or ska_pst_stat_file_processor, that perform statistical calculations will use this class directly rather than performing their own orchestration.

During instantiation, this class will create a StatStorage instance with the correct sizes based on configuration. It creates instances of StatComputer and StatHdf5FileWriter passing along a shared pointer to the StatStorage instance.

This is not threadsafe, calls to the process method should ensure that the calls to it are threadsafe.

The StatProcessor asserts that there is data at least the length of one RESOLUTION bytes (i.e. NPOL * NDIM * NBITS * NCHAN * UDP_NSAMP / 8). If there is a fractional amount it will only calculate the statistics of an integer multiple of RESOLUTION bytes.

StatComputer

This class is the main class for performing the statistical computations.

This class is designed to be re-used between different blocks of data perform a calculation and updates the StatStorage struct.

See the StatHdf5FileWriter section for the output statistics that are calculated.

StatHdf5FileWriter

A utility class used for writing out the computed statistics to a file system. Instances of this class are passed a shared StatsStorage and the output path of where to write statistics to. Every call to write will serialise the StatStorage to a new HDF5 file.

HDF5 was chosen given it is an open standard, rather than creating new structured file format. Detailed information about the structure of these files is available at Statistics HDF5 File Format.

StatStorage

This class provides an abstraction to all of the storage required to hold the statistics products computed by the StatComputer. The class will be constructed with configuration parameters stored in a ska::pst::common::AsciiHeader with the following required parameters:

  • NPOL Number of polarisations in the input data stream (will always be 2).

  • NDIM Number of dimensions of each time sample (will always be 2).

  • NCHAN Number of channels in the input data stream.

  • NBIT Number of bits per sample in the input data stream.

  • NREBIN Number of bins in the re-binned input state histograms.

The class provides public methods to resize the storage and to reset all the values of the storage to zero. As documented in the StatStorage Class API, the class exposes all of the storage fields as 1, 2 or 3-dimension std::vector attributes of the appropriate types.

StatApplicationManager

This class is an implementation of the ska::pst::common::ApplicationManager class and is used by the ska_pst_stat_core process to manage the lifecycle of configuring the system and performing a scan.

When the application is in a ScanConfigured state this class will have created an instance of the StatProcessor class which will be used during a scan to perform the actual calculation of the statistics and writing the outputs to a file.

FileProcessor

This class is used by the ska_pst_stat_file_processor application to process a specific set of data and weights files. When the application runs it will read a config file into a ska::pst::common::AsciiHeader that is passed into the constructor of this class. When an instance of this class is created it will create an instance of a ska::pst::common::FileSegmentProducer, a StatProcessor, and a StatPublisher (specifically the StatHdf5FileWriter).

Kafka Dashboard Classes

The following are classes that relate specifically to the generation and publishing of MsgPack messages to send to Kafka topics

StatKafkaPublisher

This is a specific implementation of the existing StatPublisher class. The design uses only 1 StatKafkaPublisher but it creates separate instances of a StatKafkaMessageSender. This is so the StatApplicationManager doesn’t need to know about all the different types of dashboards, it just knows we may need to send stats to Kafka. In doing so we can evolve this publisher and the sender class independently the application manager, add new data types and MsgPack generators.

StatKafkaProducerFactory

This uses the factory pattern to create Kafka producers. In Kafka terminology, a producer is effectively a connection that sends messages to the Kafka server.

This factory is used to cache the connection to a Kafka instance if the same bootstrap server is applied, which would be the case for the all the topics for a given scan.

StatKafkaMessageSender

This class is similar to a publisher but has all the configuration and connections to Kafka it needs. The StatKafkaPublisher will create instances of this per dashboard type and provide the topic, Kafka producer and the MsgPack generator. This class is designed to be generic in the sense it doesn’t know what type of message or topic it is using. The StatKafkaPublisher will loop over the instances of this class every time its publish method is called and instances of this class will use the generator to generator message strings and then forward the messages to the Kafka producer.

StatKafkaProducer

This abstract class is used to represent the class of the library that is ultimately chosen. This allows using the bridge design pattern to allow swapping out of cppkafka for a different library.

CppKafkaProducer

This class is a concrete implementation of the StatKafkaProducer class that abstracts over the cppkafka::Producer class.

StatMsgPackGenerator

This class encodes the correct structure of the dashboard messages. The generate_msg takes the current StatStorage can encodes common metadata as well as delegating to a StatMsgPackStrategy instance for packing of the data_type, metadata, and data values.

This class and the associated strategy classes, do not have anything to do Kafka. This also allows for unit testing that we are able to encode the messages correctly.

StatMsgPackStrategy

This is a purely abstract class that is used as an interface by instances of StatMsgPackGenerator to populate the data_type, metadata, and data values within the MsgPack message.

StatBandpassMsgPackStrategy

An implementation of the StatMsgPackStrategy that encodes the Bandpass data.

StatHistogramMsgPackStrategy

An implementation of the StatMsgPackStrategy that encodes the Histogram data.

StatTimeseriesMsgPackStrategy

An implementation of the StatMsgPackStrategy that encodes the Timeseries data.

Sequences

Processing of a block of data

@startuml PST STAT Processor
actor Client
Client -> StatProcessor: processor = StatProcessor(config)
activate StatProcessor
activate StatComputer
StatProcessor -> StatStorage: storage = StatStorage()
activate StatStorage
StatProcessor -> StatComputer: computer = StatComputer(config, storage)
Client <-- StatProcessor
Client -> StatPublisher : publisher = StatPublisher(config)
activate StatPublisher
Client <-- StatPublisher
Client -> StatProcessor: append(publisher)
loop while has data
  Client -> StatProcessor: process(segment)
  StatProcessor -> StatStorage: reset()
  StatProcessor -> StatComputer: initialise()
  StatProcessor -> StatComputer: process(segment)
  StatComputer --> StatStorage: updates
  StatProcessor <-- StatComputer
  StatProcessor -> StatPublisher: publish(storage)
  StatProcessor <-- StatPublisher
  Client <-- StatProcessor
end
Client --> StatProcessor: drop
deactivate StatProcessor
deactivate StatComputer
deactivate StatPublisher
deactivate StatStorage
@enduml

Sequence diagram for processing statistics with the StatProcessor class, common to both StatApplicationManager and FileProcessor sequences

Processing data during a scan

@startuml PST STAT Client using SmrbSegmentProducer sequence
actor Client
Client -> StatApplicationManager: configure beam
activate StatApplicationManager
StatApplicationManager -> SmrbSegmentProducer: SmrbSegmentProducer(data_key, weights_key, viewer)
activate SmrbSegmentProducer
Client -> StatApplicationManager: configure scan
StatApplicationManager -> SmrbSegmentProducer: connect
StatApplicationManager <-- SmrbSegmentProducer: connected
StatApplicationManager -> StatProcessor: StatProcessor(config)
activate StatProcessor
StatApplicationManager <-- StatProcessor: configured
Client <-- StatApplicationManager: scan configured
group scan
  Client -> StatApplicationManager: start scan
  StatApplicationManager -> StatApplicationManager: start background processing
  loop in background while scanning
      StatApplicationManager -> SmrbSegmentProducer: next_segment()
      StatApplicationManager <-- SmrbSegmentProducer: return segment
      StatApplicationManager -> StatProcessor: process(segment)
      StatApplicationManager <-- StatProcessor
      StatApplicationManager -> StatApplicationManager: wait stat interval
  end
  Client -> StatApplicationManager: end scan
  StatApplicationManager -> StatProcessor: interrupt
  StatApplicationManager -> StatApplicationManager: stop background processing
end
Client -> StatApplicationManager: deconfigure scan
StatApplicationManager --> StatProcessor: drop
deactivate StatProcessor
Client <-- StatApplicationManager: scan deconfigured
Client -> StatApplicationManager: deconfigure beam
StatApplicationManager -> SmrbSegmentProducer: disconnect
deactivate SmrbSegmentProducer
Client <-- StatApplicationManager: beam deconfigured
@enduml

Sequence diagram for processing statistics during a scan with the StatApplicationManager class

Processing files after a scan

@startuml PST STAT DADA file processor
actor Client
Client -> FileProcessor: create(config)
activate FileProcessor
FileProcessor -> StatProcessor: StatProcessor(config)
activate StatProcessor
Client -> FileProcessor: process(data_file_path, weights_file_path)
FileProcessor -> FileSegmentProducer: FileSegmentProducer(file)
activate FileSegmentProducer
FileSegmentProducer -> "Data File": mmap
activate "Data File"
FileSegmentProducer -> "Weights File": mmap
activate "Weights File"

loop for configured number of blocks or EOF
  FileProcessor -> FileSegmentProducer: next_segment()
  FileSegmentProducer --> "Data File": next_block
  FileSegmentProducer --> "Weights File": next_block
  FileProcessor <-- FileSegmentProducer: return segment
  FileProcessor -> StatProcessor: process(segment)
  FileProcessor <-- StatProcessor
end

FileProcessor --> FileSegmentProducer: drop
FileSegmentProducer --> "Data File": release
deactivate "Data File"
FileSegmentProducer --> "Weights File": release
deactivate "Weights File"
deactivate FileSegmentProducer

' FileProcessor -> FileSegmentProducer: create
' activate FileSegmentProducer
' FileSegmentProducer -> File: mmap
' activate File
' loop while data present
'   FileProcessor -> FileSegmentProducer: read next
'   FileProcessor <-- FileSegmentProducer: return next block ptr or null
'   alt data present
'     FileProcessor -> StatProcessor: process(segment)
'   else no data preset
'     Client <-- FileProcessor: process complete
'   end
' end

@enduml

Sequence diagram for processing statistics from a file using the FileProcessor class

Publishing Kafka dashboard messages

@startuml
actor LMC
participant StatApplicationManager as SAM
participant StatKafkaPublisher as SKP
participant StatKafkaMessageSender as SKMS
participant StatMsgPackGenerator as SMPG
participant StatKafkaProducerFactory as SKFF
participant StatKafkaProducer as SKFProd
queue Kafka
autoactivate on

LMC -> SAM : start scan
SAM -> SKP : create
loop per msg type
SKP -> SKFF : get producer
SKFF -> SKFProd : create
SKFProd -> Kafka : connect
return connected
return instance
return producer
SKP -> SMPG : construct specific generator
return instance
SKP -> SKMS : construct (with topic, producer, and msg generator)
return instance
end
return instance
group while scanning
SAM -> SKP : publish
loop per msg type
SKP -> SKMS : send stats
SKMS -> SMPG : generate msg
return encoded msg
SKMS -> SKFProd : send msg
SKFProd -> Kafka : send msg
return sent
return sent
return sent
end
return published
end
LMC -> SAM : stop scan
return scan ended
@enduml

Sequence diagram for publishing Kafka dashboard messages