Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SigProcFileStream.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/sigproc/SigProcFileStream.h"
25 #include "cheetah/data/TimeFrequency.h"
26 
27 #include "panda/Error.h"
28 #include <boost/filesystem.hpp>
29 #include <type_traits>
30 #include <iostream>
31 #include <new>
32 
33 
34 namespace ska {
35 namespace cheetah {
36 namespace sigproc {
37 
38 #define HI4BITS 240
39 #define LO4BITS 15
40 void char2ints (unsigned char c, int *i, int *j)
41 {
42  *i = c & LO4BITS;
43  *j = (c & HI4BITS) >> 4;
44 }
45 
46 void char4ints (unsigned char c, int *i, int *j, int *k, int *l)
47 {
48  *i = c & 3;
49  *j = (c & 12) >> 2;
50  *k = (c & 48) >> 4;
51  *l = (c & 192) >> 6;
52 }
53 
54 // @returns the number of samples read
55 static
56 std::size_t read_block(std::istream& input, int nbits, float *block, unsigned long nread)
57 {
58  int j, k, s1, s2, s3, s4;
59  std::size_t i;
60  short *shortblock=nullptr;
61  char *charblock=nullptr;
62  std::size_t iread;
63 
64  /* decide how to read the data based on the number of bits per sample */
65  try{
66  switch(nbits) {
67 
68  case 1:
69  // read n/8 bytes into character block containing n 1-bit pairs
70  charblock = (char *) malloc(nread / 8);
71  try {
72  input.read(charblock, nread / 8);
73  } catch (...)
74  {
75  if(!input.eof()) {
76  throw;
77  }
78  }
79 
80  k = 0;
81 
82  // unpack 1-bit pairs into the datablock
83  iread = input.gcount();
84  for (std::size_t i = 0; i < iread; ++i)
85  for (j = 0; j < 8; ++j) {
86  block[k++] = charblock[i] & 1;
87  charblock[i] >>= 1;
88  }
89 
90  iread = k; //number of samples read in
91  free(charblock);
92  break;
93 
94  case 2: // NOTE: Handles Parkes Hitrun survey data
95  // read n/4 bytes into character block containing n 2-bit pairs
96  charblock = (char *) malloc(nread / 4);
97  try {
98  input.read(charblock, nread / 4);
99  } catch (...)
100  {
101  if(!input.eof()) {
102  throw;
103  }
104  }
105 
106  j = 0;
107 
108  iread = input.gcount();
109  for(i = 0; i < iread; i++) {
110  char4ints(charblock[i], &s1, &s2, &s3, &s4);
111  block[j++] = (float) s1;
112  block[j++] = (float) s2;
113  block[j++] = (float) s3;
114  block[j++] = (float) s4;
115  }
116 
117  iread *= 4;
118  free(charblock);
119  break;
120 
121  case 4:
122  // read n/2 bytes into character block containing n 4-bit pairs
123  charblock = (char *) malloc(nread / 2);
124  try {
125  input.read(charblock, nread / 2);
126  } catch (...)
127  {
128  if(!input.eof()) {
129  throw;
130  }
131  }
132 
133  j = 0;
134 
135  /* unpack 4-bit pairs into the datablock */
136  iread = input.gcount();
137  for (i = 0; i < iread; i++) {
138  char2ints(charblock[i], &s1, &s2);
139  block[j++] = (float) s1;
140  block[j++] = (float) s2;
141  }
142 
143  iread *= 2; /* this is the number of samples read in */
144  free(charblock);
145  break;
146 
147  case 8:
148  /* read n bytes into character block containing n 1-byte numbers */
149  charblock = (char *) malloc(nread);
150  try {
151  input.read(charblock, nread);
152  } catch (...)
153  {
154  if(!input.eof()) {
155  throw;
156  }
157  }
158  /* copy numbers into datablock */
159  iread = input.gcount();
160  for (i = 0; i < iread; i++)
161  block[i] = (float) charblock[i];
162 
163  free(charblock);
164  break;
165 
166  case 16:
167  //NOTE: should condsider raising warning if this is used, as
168  //the data::TimeFrequencyDataType is uint_8.
169  /* read 2*n bytes into short block containing n 2-byte numbers */
170  shortblock = (short *) malloc(2 * nread);
171  try {
172  input.read(reinterpret_cast<char*>(shortblock), 2 * nread);
173  } catch (...)
174  {
175  if(!input.eof()) {
176  throw;
177  }
178  }
179 
180  /* copy numbers into datablock */
181  iread = input.gcount() / 2;
182  for (i = 0; i < iread; i++) {
183  block[i] = (float) shortblock[i];
184  }
185 
186  free(shortblock);
187  break;
188 
189  case 32:
190  /* read 4*n bytes into floating block containing n 4-byte numbers */
191  try {
192  input.read(reinterpret_cast<char*>(block), 4 * nread);
193  } catch (...)
194  {
195  if(!input.eof()) {
196  throw;
197  }
198  }
199 
200  iread = input.gcount() / 4;
201  break;
202 
203  default:
204  throw panda::Error("SigProcStream : read_block - nbits can only be 4, 8, 16 or 32!");
205  iread = 0;
206  }
207  } catch (...) {
208  if(shortblock) free(shortblock);
209  if(charblock) free(charblock);
210  iread = input.gcount() * 8/nbits;
211  if(!input.eof()) {
212  throw;
213  }
214  }
215 
216  /* return number of samples read */
217  return iread;
218 }
219 
220 SigProcFileStream::SigProcFileStream(Config const& config)
221  : _config(config)
222  , _nsamples(0)
223  , _filenames(config.sigproc_files())
224  , _file_it(_filenames.cbegin())
225 {
226  _header.number_of_bits(config.nbits());
227 }
228 
229 SigProcFileStream::~SigProcFileStream()
230 {
231 }
232 
233 void SigProcFileStream::transfer_header_info(data::TimeFrequency<Cpu, uint8_t>& chunk)
234 {
235  if( chunk.number_of_spectra() != 0 ){
236  // assume the chunk wants a set number of samples
237  chunk.resize( data::DimensionSize<data::Time>(chunk.number_of_spectra())
238  , data::DimensionSize<data::Frequency>(_header.number_of_channels()));
239  }
240  else if(number_of_data_samples() > 0 ) {
241  // we take as many samples os defined in the header
242  chunk.resize( data::DimensionSize<data::Time>(number_of_data_samples()/ _header.number_of_channels())
243  , data::DimensionSize<data::Frequency>(_header.number_of_channels()));
244  }
245  else {
246  // make a guess at how many samples to take
247  if( _header.number_of_bits() > 0 ) {
248  chunk.resize(
249  data::DimensionSize<data::Time>(sizeof(float)*8/_header.number_of_bits()),
250  data::DimensionSize<data::Frequency>(_header.number_of_channels()));
251  }
252  else {
253  throw panda::Error("unable to determine number of sigproc samples required");
254  }
255  }
256  chunk.sample_interval(_header.sample_interval());
257  if(chunk.channel_frequencies().size() > 1) {
258  chunk.set_channel_frequencies(_header.frequency_channels().begin(), _header.frequency_channels().end());
259  }
260  else {
261  chunk.set_channel_frequencies_const_width(static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType>(*_header.fch1())
262  ,static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType>(*_header.foff()));
263  }
264 }
265 
267 {
268  if( _nsamples == 0) {
269  _nsamples = (boost::filesystem::file_size(*(_file_it -1)) - _header.size() ) * (8/_header.number_of_bits());
270  }
271  return _nsamples;
272 }
273 
275 {
276  if(!_file_stream.is_open() || _file_stream.eof() || _file_stream.peek() == decltype(_file_stream)::traits_type::eof()) {
277  if(!next_file()) return true; // mark end of data
278  }
279 
280  typedef data::TimeFrequency<Cpu, uint8_t> TimeFrequencyType;
281  //typedef std::decay<decltype(std::declval<TimeFrequencyType>().data())>::type TimeFrequencyDataType;
282 
283  auto chunk = get_chunk<TimeFrequencyType>(data::DimensionSize<data::Time>(_config.chunk_samples()), data::DimensionSize<data::Frequency>(0));
284  assert(chunk);
285 
286  transfer_header_info(*chunk);
287  chunk->start_time(_start_time);
288  auto it = chunk->begin();
289  return fill_chunk(*chunk, 0, it);
290 }
291 
292 bool SigProcFileStream::fill_chunk(data::TimeFrequency<Cpu, uint8_t>& chunk, std::size_t offset, data::TimeFrequency<Cpu, uint8_t>::Iterator& it )
293 {
294  std::size_t data_to_read = chunk.number_of_spectra() * chunk.number_of_channels() - offset;
295  // of the read_block function. This is not ideal, but fine for the moment.
296 
298  std::size_t data_read;
299  {
300  // TODO - read in directly to the iterator instead of expanding to floats and back to the type via read_block
301  float* tmp = (float*)malloc(data_to_read * sizeof(float));
302  if(!tmp) throw std::bad_alloc();
303 
304  try {
305  data_read = read_block(_file_stream, _header.number_of_bits(), tmp, data_to_read);
306  }
307  catch(...) {
308  free(tmp);
309  throw;
310  }
311  assert(data_read <= data_to_read);
312 
313  // Convert float values back to data::TimeFrequencyDataType
314  // Note: This will corrupt data from 16 and 32 bit filterbank
315  // files. We should consider disabling the 16 and 32 bit switch
316  // cases in read_block so that this does not happen.
317  std::copy(tmp,tmp+data_read,it);
318  free(tmp);
319  }
320 
321  // This branch handles the case that we do not get a full chunks
322  // worth of data from the read (e.g. when we hit EOF)
323  std::size_t valid_samples = data_read / chunk.number_of_channels(); //implicit floor.
324  _start_time += std::chrono::duration<double>((double)valid_samples * _header.sample_interval().value());
325  if(data_read != data_to_read)
326  {
327  if( _file_stream.eof()) {
328  auto expected_header = _header;
329  // try to get missing chunks from the next file
330  bool rv=next_file();
331  if(!rv || expected_header.tstart() != _start_time || expected_header != _header) {
332  // Calculate the number of valid time samples (i.e. full spectra) and
333  // resize the chuck appropriately.
334  PANDA_LOG << "resizing to " << valid_samples;
335  chunk.resize( data::DimensionSize<data::Time>(valid_samples) );
336  return !rv;
337  }
338  else {
339  fill_chunk(chunk, offset + data_read, it);
340  }
341  }
342  }
343 
344  // Indicate that stream is still producing
345  return false;
346 }
347 
348 bool SigProcFileStream::next_file()
349 {
350  _nsamples = 0U;
351 
352  if(_file_it == _filenames.cend()) return false;
353 
354  // open file for input
355  _file_stream.exceptions();
356  _file_stream.clear();
357  _file_stream.open(*_file_it, std::ios::binary);
358 
359  if(!_file_stream.is_open()) {
360  std::cerr << "could not open file '" << *_file_it << "'" << std::endl; // TODO use logging framework
361  return false;
362  }
363  _file_stream.exceptions(std::ifstream::failbit | std::ifstream::badbit);
364 
365  // parse the header
366  try {
367  // try catch block to append filename to any message
368  try {
369  // process header
370  _header.read(_file_stream);
371  } catch(std::exception& e) {
372  panda::Error err(e);
373  err << " in file " << *_file_it;
374  throw err;
375  }
376  }
377  catch(...) {
378  // recover from a bad parse, moving on to the next file
379  ++_file_it;
380  _file_stream.close();
381  throw;
382  }
383  _start_time = *_header.tstart();
384 
385  // increment file iterator for next call
386  ++_file_it;
387 
388  return true;
389 }
390 
391 
392 } // namespace sigproc
393 } // namespace cheetah
394 } // namespace ska
std::size_t number_of_data_samples() const
return the number of samples (time * channels) in the currently opened file
Some limits and constants for FLDO.
Definition: Brdz.h:35
NumericalT DataType
the underlying data storage type for the amplitude of the signal
Definition: TimeFrequency.h:96
std::size_t number_of_channels() const
std::size_t number_of_spectra() const