Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SigProcWriter.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/sigproc/SigProcWriter.h"
25 #include <iterator>
26 #include <boost/units/cmath.hpp>
27 
28 namespace {
29 
30  //should work with any iterator
31  template<typename IteratorType, typename IteratorTag>
32  struct do_write {
33  static void exec(IteratorType begin, IteratorType const end, std::ostream& os) {
34  while(begin != end) {
35  //_file_stream.write(reinterpret_cast<const char*>(&(*it)), sizeof(data::TimeFrequencyDataType));
36  os << *begin;
37  ++begin;
38  }
39  }
40  };
41 
42  // we can speed things up if the memory is contiguous
43  template<typename IteratorType>
44  struct do_write<IteratorType, std::random_access_iterator_tag>
45  {
46  static void exec(IteratorType begin, IteratorType const end, std::ostream& os) {
47  os.write(reinterpret_cast<const char*>(&(*begin)), std::distance(begin, end) * sizeof(typename std::iterator_traits<typename std::remove_reference<IteratorType>::type>::value_type));
48  }
49  };
50 
51  // calls the correct specialization
52  template<typename IteratorType>
53  void write_out(IteratorType begin, IteratorType const end, std::ostream& os) {
54  do_write<IteratorType, typename std::iterator_traits<typename std::remove_reference<IteratorType>::type>::iterator_category>::exec(begin, end, os);
55  }
56 }
57 
58 namespace ska {
59 namespace cheetah {
60 namespace sigproc {
61 
62 template<typename HeaderType>
63 SigProcWriter<HeaderType>::SigProcWriter(WriterConfig const& config)
64  : _extension(config.extension())
65  , _sample_count(0)
66  , _max_samples_per_file(config.max_count()) // approx only
67 {
68  set_dir(config.dir());
69 }
70 
71 template<typename HeaderType>
72 SigProcWriter<HeaderType>::SigProcWriter(std::string const& filename)
73  : _extension(".fil")
74 {
75  set_dir(filename);
76 }
77 
78 template<typename HeaderType>
79 SigProcWriter<HeaderType>::SigProcWriter(boost::filesystem::path const& filename)
80  : _extension(".fil")
81 {
82  set_dir(filename);
83 }
84 
85 template<typename HeaderType>
86 SigProcWriter<HeaderType>::~SigProcWriter()
87 {
88 }
89 
90 template<typename HeaderType>
92 {
93  _file_stream.close();
94 }
95 
96 template<typename HeaderType>
97 void SigProcWriter<HeaderType>::set_dir(boost::filesystem::path const& path)
98 {
99  _dir=path;
100  if(!boost::filesystem::is_directory(_dir)) {
101  panda::Error e("SigProcWriter: directory does not exist:");
102  e << _dir;
103  throw e;
104  }
105 }
106 
107 template<typename HeaderType>
108 std::string SigProcWriter<HeaderType>::next_file(data::TimeFrequency<Cpu, uint8_t>::TimePointType const& time)
109 {
110  std::time_t ttp = std::chrono::system_clock::to_time_t(time);
111  char stem[20];
112  std::strftime(stem, sizeof(stem), "%Y_%m_%d_%H:%M:%S", std::gmtime(&ttp));
113  boost::filesystem::path file = _dir / boost::filesystem::path(std::string(stem) + _extension);
114  if(boost::filesystem::exists(file)) {
115  unsigned count = 0;
116  do {
117  file = _dir / boost::filesystem::path(std::string(stem) + "_" + std::to_string(++count) + _extension);
118  } while( boost::filesystem::exists(file));
119  }
120  return file.native();
121 }
122 
123 template<typename HeaderType>
124 template<typename Arch, typename NumericalRep>
126 {
127  HeaderType h;
128  fill_header(h, tf);
129  file_write(h, static_cast<pss::astrotypes::TimeFrequency<NumericalRep> const&>(tf));
130  return *this;
131 }
132 
133 template<typename HeaderType>
134 template<typename DerivedType, typename NumericalRep, typename Alloc>
136 {
137  HeaderType h;
138  h.number_of_channels(spectrum.template dimension<data::Frequency>());
139  h.number_of_bits(sizeof(NumericalRep)*8);
140  h.number_of_ifs(1);
141  h.tstart(spectrum.start_time());
142  file_write(h, spectrum);
143  return *this;
144 }
145 
146 template<typename HeaderType>
147 void SigProcWriter<HeaderType>::file_header(HeaderType const& h)
148 {
149  if((h.tstart().is_set() && (_header.sample_interval().value() > 0.0) && (abs(*h.tstart() - *_header.tstart()) >= 0.5 * _header.sample_interval())) || _header != h )
150  {
151  if(_file_stream.is_open()) {
152  PANDA_LOG_WARN << "break in data stream: creating new file with updated parameters: expected:\n" << pss::astrotypes::sigproc::Header::Info() << _header
153  << " got:\n" << pss::astrotypes::sigproc::Header::Info() << h;
154  // if the incomming data is incompatible with existing header info
155  // we start a new file
156  _file_stream.close();
157  }
158  }
159  if(!_file_stream.is_open()) {
160  _header = h;
161  _sample_count = 0;
162  _file_stream.open(next_file(*(h.tstart())), std::ios::binary);
163  _header.write(_file_stream);
164  }
165 }
166 
167 template<typename HeaderType>
168 template<typename T>
169 typename std::enable_if<pss::astrotypes::has_exact_dimensions<T, data::Time, data::Frequency>::value, SigProcWriter<HeaderType>&>::type
170 SigProcWriter<HeaderType>::file_write(HeaderType const& h, T const& tf, std::size_t spectrum)
171 {
172  file_header(h);
173 
174  std::size_t n_spectra = tf.template dimension<data::Time>() - spectrum;
175  bool break_file = false;
176  if(_max_samples_per_file != 0 ) {
177  auto remaining_file_space = _max_samples_per_file - _sample_count;
178  if(n_spectra > remaining_file_space) {
179  n_spectra = remaining_file_space;
180  break_file = true;
181  }
182  else if( n_spectra == remaining_file_space)
183  {
184  break_file = true;
185  }
186  }
187  for(std::size_t i=spectrum; i < n_spectra + spectrum; ++i) {
188  //auto sample = tf.spectrum(i);
189  auto sample = tf[data::DimensionIndex<data::Time>(i)];
190  write_out(sample.begin(), sample.end(), _file_stream);
191  }
192 
193  _header.tstart(*_header.tstart() + (_header.sample_interval() * (double)n_spectra));
194  if(break_file) {
195  _file_stream.close();
196  if(tf.template dimension<data::Time>() > n_spectra) {
197  // send the remainder to the next file
198  return file_write(_header, tf, n_spectra);
199  }
200  }
201  else {
202  _sample_count+=n_spectra;
203  }
204 
205  return *this;
206 }
207 
208 template<typename HeaderType>
209 template<typename T>
210 typename std::enable_if<pss::astrotypes::has_exact_dimensions<T, data::Frequency>::value, SigProcWriter<HeaderType>&>::type
211 SigProcWriter<HeaderType>::file_write(HeaderType const& h, T const& sample)
212 {
213  file_header(h);
214  write_out(sample.begin(), sample.end(), _file_stream);
215  if(_max_samples_per_file != 0 && ++_sample_count >= _max_samples_per_file) {
216  _file_stream.close();
217  }
218  return *this;
219 }
220 
221 template<typename HeaderType>
222 template<typename Arch, typename NumericalRep>
224 {
225  h.number_of_channels(tf.number_of_channels());
226  h.number_of_bits(sizeof(NumericalRep)*8);
227  h.number_of_ifs(1);
228  h.tstart(tf.start_time());
229  h.sample_interval(tf.sample_interval());
230  // Only equi-spacedfrequency channels supported
231  //h.frequency_channels(tf.channel_frequencies());
232  if(tf.channel_frequencies().size() != 0) {
233  h.fch1(tf.channel_frequencies()[0]);
234  if(tf.channel_frequencies().size() > 1) {
235  h.foff(tf.channel_frequencies()[1] - tf.channel_frequencies()[0]);
236  }
237  }
238 }
239 
240 template<typename HeaderType>
241 template<typename T>
242 typename std::enable_if<pss::astrotypes::has_exact_dimensions<T, data::Time, data::Frequency>::value, SigProcWriter<HeaderType>&>::type
243 SigProcWriter<HeaderType>::write(HeaderType const& header, T const& t)
244 {
245  return file_write(header, t);
246 }
247 
248 } // namespace sigproc
249 } // namespace cheetah
250 } // namespace ska
A single dimension representation of a Spectrum.
Definition: Spectrum.h:58
Some limits and constants for FLDO.
Definition: Brdz.h:35
std::size_t number_of_channels() const
TimePointType const & start_time() const
Definition: Spectrum.cpp:49
std::enable_if< pss::astrotypes::has_exact_dimensions< T, data::Time, data::Frequency >::value, SigProcWriter & >::type write(HeaderType const &, T const &t)
write the data out to file using the meta data from the header provided
Writes data types to a sigproc format file.
Definition: SigProcWriter.h:45