Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
DataExportConfig.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/exporters/DataExportConfig.h"
25 #include "cheetah/exporters/OcldFileStreamerConfig.h"
26 #include "cheetah/exporters/SclFileStreamerConfig.h"
27 #include "cheetah/exporters/SpCclFileStreamerConfig.h"
28 #include "cheetah/exporters/SpCandidateDataStreamerConfig.h"
29 #ifdef ENABLE_SPEAD
30 #include "cheetah/exporters/SpCclSpeadStreamerConfig.h"
31 #endif // ENABLE_SPEAD
32 #include "cheetah/exporters/SpCclSigProcConfig.h"
33 #include "cheetah/sigproc/WriterConfig.h"
34 #include "panda/ChannelInfo.h"
35 #include "panda/ChannelId.h"
36 
37 
38 namespace ska {
39 namespace cheetah {
40 namespace exporters {
41 
42 DataExportConfig::SinkConfigs::SinkConfigs()
43  : panda::ConfigModule("sink_configs")
44 {
45 }
46 
47 void DataExportConfig::SinkConfigs::add_options(OptionsDescriptionEasyInit& )
48 {
49 }
50 
51 DataExportConfig::DataExportConfig( std::string const& tag)
52  : utils::Config(tag)
53  , _exporters_init(false)
54 {
55  number_of_threads(0); // syncronous mode by default
56  add(_channel_config);
57  add(_sink_configs);
58  _sink_configs.add_factory("sigproc", []() { return new sigproc::WriterConfig(); });
59  _sink_configs.add_factory("spccl_files", []() { return new SpCclFileStreamerConfig(); });
60  _sink_configs.add_factory("sp_candidate_data", []() { return new SpCandidateDataStreamerConfig(); });
61  _sink_configs.add_factory("ocld_files", []() { return new OcldFileStreamerConfig(); });
62  _sink_configs.add_factory("scl_files", []() { return new SclFileStreamerConfig(); });
63  _sink_configs.add_factory("spccl_sigproc_files", []() { return new SpCclSigProcConfig(); });
64 #ifdef ENABLE_SPEAD
65  _sink_configs.add_factory("spccl_spead", []() { return new SpCclSpeadStreamerConfig(); });
66 #endif // ENABLE_SPEAD
67  _sink_configs.add_default_function([](std::string const& tag) {
68  PANDA_LOG_WARN << "unknown sink type: " << tag;
69  });
70 }
71 
72 DataExportConfig::~DataExportConfig()
73 {
74 }
75 
76 void DataExportConfig::number_of_threads(unsigned const& number_of_threads)
77 {
78  panda::ProcessingEngineConfig config(number_of_threads);
79  config.number_of_threads(number_of_threads);
80  _switch_config.set_engine_config(config);
81 }
82 
83 void DataExportConfig::activate_streams(std::vector<std::string> const& streams)
84 {
85  for(auto const& stream : streams) {
86  _switch_config.activate_channel(panda::ChannelId(stream));
87  }
88 }
89 
90 void DataExportConfig::activate(panda::ChannelId const& stream)
91 {
92  _switch_config.activate_channel(stream);
93 }
94 
96 {
97  _sinks.emplace_back(exporter_config);
98 }
99 
100 panda::Engine& DataExportConfig::engine(panda::ChannelId const& id)
101 {
102  return _switch_config.engine(id);
103 }
104 
105 std::vector<DataExportStreamConfig> const& DataExportConfig::exporters() const
106 {
107  if( ! _exporters_init ) {
108  _exporters_init = true;
109  for( panda::ChannelInfo const* channel : _channel_config.channels() ) {
110  if(channel->active()) {
111  _switch_config.activate_channel(channel->id());
112  } else {
113  _switch_config.deactivate_channel(channel->id());
114  }
115  for( auto const& sink_id : channel->sinks() ) {
116  if(sink_id != "") {
117  bool id_found=false;
118  for( auto const& section : _sink_configs.subsections() )
119  {
120  auto it=_sink_configs.subsection(section);
121  while(it != _sink_configs.subsection_end()) {
122  if(it->id() == sink_id) {
123  id_found = true;
124  _sinks.emplace_back(DataExportStreamConfig(channel->id()
125  , _switch_config.engine(channel->id())
126  , ExporterType(section)
127  , static_cast<utils::Config const&>(*it)));
128  }
129  ++it;
130  }
131  }
132  if(!id_found) {
133  PANDA_LOG_ERROR << "Channel error (" << channel->id().to_string() << "): sink with id=" << sink_id << " not found";
134  }
135  }
136  }
137  }
138  }
139  return _sinks;
140 }
141 
142 void DataExportConfig::add_options(OptionsDescriptionEasyInit& add_options)
143 {
144  add_options
145  ("threads", boost::program_options::value<unsigned>()->default_value(0u)->notifier(std::bind(&DataExportConfig::number_of_threads, this, std::placeholders::_1)), "set the number of dedicated threads to run data export services");
146 }
147 
148 panda::DataSwitchConfig& DataExportConfig::switch_config() const
149 {
150  return _switch_config;
151 }
152 
153 void DataExportConfig::parse_property_tree(boost::property_tree::ptree const& pt, boost::program_options::variables_map& vm)
154 {
155  utils::Config::parse_property_tree(pt, vm); // call the base method
156 }
157 
158 panda::ProcessingEngineConfig const& DataExportConfig::engine_config(panda::ChannelId const& channel_id) const
159 {
160  return _switch_config.engine_config(channel_id);
161 }
162 
163 void DataExportConfig::set_engine_config(panda::ChannelId const& channel_id, panda::ProcessingEngineConfig const& config)
164 {
165  _switch_config.set_engine_config(channel_id, config);
166 }
167 
168 void DataExportConfig::set_engine_config(panda::ProcessingEngineConfig const& config)
169 {
170  _switch_config.set_engine_config(config);
171 }
172 
173 void DataExportConfig::channel(panda::ChannelInfo const& channel)
174 {
175  _channel_config.channel(channel);
176 }
177 
178 void DataExportConfig::add_sink(std::string const& tag, std::function<panda::ConfigModule*()> const& cm)
179 {
180  _sink_configs.add_factory(tag, cm);
181 }
182 
183 void DataExportConfig::add_sink(panda::ConfigModule& cm)
184 {
185  _sink_configs.add(cm);
186 }
187 
188 } // namespace exporters
189 } // namespace cheetah
190 } // namespace ska
void channel(panda::ChannelInfo const &)
explicity add a channel configuration
std::vector< DataExportStreamConfig > const & exporters() const
return a list of configured export streamer configurations
void activate_streams(std::vector< std::string > const &streams)
mark the provided streams as active
Base class for module configuration.
Definition: Config.h:42
panda::Engine & engine(panda::ChannelId const &)
return the engine allocated to the specified channel
Defines the mapping of a sinks configuration block with the sink type, and the channel to associate i...
void set_engine_config(panda::ChannelId const &channel_id, panda::ProcessingEngineConfig const &config)
set the engine confugration for a specified channel
void add_exporter(DataExportStreamConfig)
set an export streamer configurations
Some limits and constants for FLDO.
Definition: Brdz.h:35
string based tag for refering to the type of Exporter, based on a std::string
Definition: ExporterType.h:36
panda::ProcessingEngineConfig const & engine_config(panda::ChannelId const &channel_id) const
return the processing engine configuration associated with the specified channel
panda::DataSwitchConfig & switch_config() const
return the configuration suitable for sending to a panda::DataSwitch
void number_of_threads(unsigned const &number_of_threads)
set the number of dedicated threads to service exporters export (default 0)