Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
Rfim.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/rfim/ampp/Rfim.h"
25 #include "cheetah/data/Units.h"
26 #include "panda/Error.h"
27 #include "panda/Log.h"
28 #include <cmath>
29 #include <numeric>
30 #include <type_traits>
31 
32 namespace ska {
33 namespace cheetah {
34 namespace rfim {
35 namespace ampp {
36 
37 
38 template<typename RfimTraits>
39 typename Rfim<RfimTraits>::ReturnType Rfim<RfimTraits>::operator()(panda::PoolResource<Cpu>&, std::shared_ptr<cheetah::data::TimeFrequency<Cpu, NumericalRep>> data)
40 {
41  return run(*data);
42 }
43 
44 template<typename RfimTraits>
45 Rfim<RfimTraits>::Rfim(Config const& c, typename RfimTraits::BandPassHandler const& bp_handler)
46  : _config(c)
47  , _zero_dm(c.zero_dm()?0:1)
48  , _bad_spectra(0)
49  , _fraction_bad_channels(0.0)
50  , _sample_counter(0)
51  , _max_history(c.max_history())
52  , _use_mean_over_rms(0)
53  , _bandpass_data_ptr(std::make_shared<Spectrum<NumericalRep>>())
54  , _bandpass_data(*_bandpass_data_ptr)
55  , _bandpass_handler(bp_handler)
56 {
57  assert(_max_history != 0);
58  _mean_buffer.set_capacity(_max_history);
59  _rms_buffer.set_capacity(_max_history);
60 }
61 
62 template<typename RfimTraits>
63 Rfim<RfimTraits>::~Rfim()
64 {
65 }
66 
67 template<typename RfimTraits>
68 Rfim<RfimTraits>::Rfim(Rfim&& c)
69  : _config(c._config)
70  , _zero_dm(c._zero_dm)
71  , _bad_spectra(c._bad_spectra)
72  , _mean_buffer(std::move(c._mean_buffer))
73  , _rms_buffer(std::move(c._rms_buffer))
74  , _fraction_bad_channels(c._fraction_bad_channels)
75  , _sample_counter(c._sample_counter)
76  , _max_history(c._max_history)
77  , _use_mean_over_rms(c._use_mean_over_rms)
78  , _bandpass_data_ptr(c._bandpass_data_ptr)
79  , _bandpass_data(*_bandpass_data_ptr)
80  , _bandpass_handler(c._bandpass_handler)
81 {
82 }
83 
84 namespace {
85 struct MiniBand {
86  public:
87  MiniBand(std::size_t number_of_chans, data::DimensionSize<data::Frequency> next_band_index, BandPassConfig const& config)
88  : band_sigma(0)
89  , band_mean(config.mean())
90  , band_mean_sq(0.0)
91  , band_min(0)
92  , number_of_channels(number_of_chans)
93  , next_band_index(next_band_index)
94  , _config(config)
95  {}
96 
97  void reset() {
98  band_sigma = 0.0;
99  band_mean = 0.0;
100  band_mean_sq = 0.0;
101  band_min = std::numeric_limits<float>::max();
102  }
103 
104  public:
105  float band_sigma;
106  float band_mean;
107  float band_mean_sq;
108  float band_min;
109  std::size_t number_of_channels;
110  data::DimensionSize<data::Frequency> next_band_index;
111  BandPassConfig const& _config;
112 };
113 
114 }
115 
116 template<typename RfimTraits>
117 void Rfim<RfimTraits>::run_impl(data::TimeFrequency<Cpu, NumericalRep> const& data, DataAdapter& adapter)
118 {
119  typedef typename data::TimeFrequency<Cpu, NumericalRep>::ConstSpectra Spectra;
120  data::DimensionSize<data::Frequency> nchannels(data.number_of_channels());
121  if((std::size_t)nchannels == 0) return;
122 
123  double sqr_nchannels = std::sqrt((unsigned)nchannels);
124  unsigned nsamples = data.number_of_spectra();
125 
126  float spectrum_rms = 0.;
127 
128  // Split the spectrum into nbands bands for the purpose of matching
129  // the spectrum to the model
130  unsigned nbands = 8;
131  unsigned const channels_per_band = nchannels / nbands;
132  std::vector<MiniBand> mini_bands;
133  {
134  // TODO we could store this setup once for each value of nchannels
135  mini_bands.reserve(nbands);
136  data::DimensionSize<data::Frequency> next_index(0);
137  for(unsigned i = 0; i < nbands; ++i) {
138  next_index += data::DimensionSize<data::Frequency>(channels_per_band);
139  mini_bands.emplace_back(channels_per_band, next_index, _config.bandpass());
140  }
141  unsigned channels_last_band = nchannels - channels_per_band * nbands;
142  if(channels_last_band != 0) {
143  mini_bands.emplace_back(channels_last_band, next_index + channels_last_band, _config.bandpass());
144  ++nbands; // add a band for any left over channels
145  }
146  }
147 
148  std::unique_lock<std::mutex> bandpass_lock(_mutex);
149  // setuup _replacement_sample the first time round
150  if(_replacement_sample.size() != nchannels) {
151  _replacement_sample.resize(nchannels, 0);
152  _bandpass_data.resize(nchannels, _config.bandpass().mean());
153  }
154 
155  for(unsigned sample_number = 0U; sample_number < nsamples; ++sample_number) {
156  //auto const& bandpass = _bandpass.current_set();
157  float spectrum_sum = 0.0;
158  float spectrum_sum_sq = 0.0;
159  std::vector<float> good_channels(nbands, 0.0);
160 
161  // reset mini bands for each new spectrum
162  for(auto& band : mini_bands) {
163  band.reset();
164  }
165 
166  // Training phase, perform the following tasks:
167  //
168  Spectra sample = data.spectrum(sample_number);
169  if (!_rms_buffer.full())
170  {
171  adapter.mark_good(sample); // assume its good until we know better
172  data::DimensionIndex<data::Frequency> current_channel(0);
173  for(auto& mini_band : mini_bands) {
174  do {
175  auto const& channel_value = sample[current_channel];
176  // determine the miniband minimum as a good proxy for the level
177  if (channel_value < mini_band.band_min) mini_band.band_min = channel_value;
178  // compute the mean and rms per mini band
179  mini_band.band_mean += channel_value;
180  mini_band.band_mean_sq += std::pow(channel_value,2);
181  }
182  while(++current_channel < mini_band.next_band_index);
183  // rewind to begining of miniband and update _bandpass_data
184  current_channel -= mini_band.number_of_channels;
185  do {
186  // bandpass is taken to be the weighted average value of the
187  // current minimum and the existing bandpass value for this channel
188  _bandpass_data[current_channel] = (_bandpass_data[current_channel] * _rms_buffer.size() + mini_band.band_min) / (_rms_buffer.size()+1);
189  _replacement_sample[current_channel] = _bandpass_data[current_channel];
190  }
191  while(++current_channel < mini_band.next_band_index);
192  }
193 
194  // Now find the distances between data and model and the RMSs in
195  // each band COULD PROBABLY BE TUCKED INTO THE FOR LOOP ABOVE
196  spectrum_rms=std::numeric_limits<float>::max();
197  float mini_band_sum = 0.0;
198  for (MiniBand& mini_band : mini_bands) {
199  //odata_minus_model[b] = mini_data[b] - mini_model[b];
200  mini_band.band_mean /= mini_band.number_of_channels;
201  mini_band.band_sigma = sqrt(mini_band.band_mean_sq/mini_band.number_of_channels
202  - std::pow(mini_band.band_mean,2)); // TODO adjust for final band size
203  // Assume the minimum bandSigma to be the best estimate of this
204  // spectrum RMS
205  if(mini_band.band_sigma < spectrum_rms) spectrum_rms = mini_band.band_sigma;
206  // Get the bandpass average across the minibands
207  mini_band_sum += mini_band.band_mean;
208  }
209  // Push back the spectrum rms into a ring buffer and work out the average
210  _rms_buffer.push_back(spectrum_rms);
211  _rms_run_ave = std::accumulate(_rms_buffer.begin(), _rms_buffer.end(),
212  0.0)/_rms_buffer.size();
213  //std::cout << "RFIstats: training rms buffer average=" << _rms_run_ave
214  // << " " << _rms_buffer.size() << " " << spectrum_rms << std::endl;
215  // The very last time this is done, store a reference value of
216  // the mean over the rms; this works as I have just added on the
217  // last value two lines above
218  if (_rms_buffer.full()) {
219  _mean_over_rms = 96.0 / _rms_run_ave;
220 /*
221  // smooth out the bandpass model within 3 sigma
222  data::DimensionIndex<data::Frequency> i(0);
223  auto last_value = mini_bands[0].band_mean;
224  auto three_sigma = spectrum_rms * 3.0;
225  for(auto& value : _bandpass_data)
226  {
227  if( value > last_value) {
228  if( value - last_value > three_sigma )
229  {
230  value = last_value + spectrum_rms;
231  }
232  }
233  else {
234  if( last_value - value > three_sigma )
235  {
236  value = last_value - spectrum_rms;
237  }
238  }
239  last_value = value;
240  _replacement_sample[i] = value;
241  ++i;
242  }
243 */
244  _bandpass_data.start_time(data.start_time());
245  _bandpass_data.calculate_mean();
246  _bandpass_rms=_rms_run_ave;
247  _bandpass_handler(_bandpass_data);
248  }
249 
250  // update replacement sample stats
251  _replacement_sample_rms = spectrum_rms;
252  //_replacement_sample_mean = _mean_run_ave;
253  _replacement_sample_mean = mini_band_sum / nbands;
254 
255  //std::cout << "RFIstats: replacement_sample_rms=" << _replacement_sample_rms
256  // << " replacement_sample_mean=" << _replacement_sample_mean << std::endl;
257  continue;
258  }
259  else {
260  // Now out of the training phase
261  // Update the running average with the current last value,
262  // and take the oldest off the end. Then add the new
263  // value onto the buffer. The idea here is that the new
264  // value is not used for the current spectrum, but rather
265  // for the one after. The current spectrum is evaluated
266  // based on the rms values in the buffer up to that
267  // point.
268  _rms_run_ave -= (_rms_buffer.front()/_rms_buffer.size());
269  _rms_run_ave += (_rms_buffer.back()/_rms_buffer.capacity());
270 
271  // In extreme RFI cases, the measured RMS may start growing
272  // due to particular RFI signals. The mean over rms ratio
273  // should remain approximately constant over the course of the
274  // observation. Use this as a safety check, after the mean has
275  // been reasonably determined, to set the RMS to a more
276  // reliable value :
277  if(_use_mean_over_rms) {
278  //recover the rms running average
279  spectrum_rms = std::abs(_mean_run_ave / _mean_over_rms);
280  _rms_buffer.push_back(spectrum_rms);
281  }
282  else {
283  _rms_buffer.push_back(_replacement_sample_rms);
284  }
285  }
286  // now use this rms to define a margin of tolerance for bright
287  // channels
288  float margin = _config.channel_rejection_rms() * _rms_run_ave;
289 
290  // Now loop around all the channels: if you find a channel where
291  // (I - bandpass) > margin, then mark as bad
292  std::size_t total_good_channels = 0;
293  data::DimensionIndex<data::Frequency> current_channel(0);
294  for(auto & channel_value : sample) {
295  if (channel_value - _bandpass_data[current_channel] > margin) {
296  // clipping this channel to values from the last good
297  // spectrum
298  adapter.mark_bad(channel_value, current_channel);
299  //std::cout << "The rms_run_ave at clipping: " << _rms_run_ave << std::endl;
300  }
301  else{
302  unsigned band;
303  if(channels_per_band) {
304  band = (int)current_channel / channels_per_band;
305  }
306  else {
307  band = 0;
308  }
309  ++good_channels[band];
310  ++total_good_channels;
311  spectrum_sum += channel_value;
312  }
313  ++current_channel;
314  }
315  // So now we have the mean of the incoming data, in a reliable
316  // form after channel clipping
317  _fraction_bad_channels += (float)(nchannels - total_good_channels)/nchannels;
318  spectrum_sum /= total_good_channels;
319 
320  // Check if more than 20% of the channels in each band were
321  // bad. If so in more than half of the bands, 4 in this case,
322  // keep record. Also, if one band is completely gone, or less
323  // than 80% of the total survive, keep record.
324  unsigned bad_bands = 0;
325  float good_channel_fraction = 0.8;
326  for (unsigned b = 0; b < nbands; ++b){
327  if (good_channels[b] < good_channel_fraction * mini_bands[b].number_of_channels) {
328  ++bad_bands;
329  }
330  /*
331  if (good_channels[b] == 0) {
332  bad_bands += nbands / 2;
333  }
334  */
335  }
336  if (total_good_channels < good_channel_fraction * nchannels)
337  bad_bands += nbands/2;
338 
339  // Let us now build up the running average of spectrumSum values
340  // (_maxHistory of them) if the buffer is not full, compute the
341  // new meanRunAve like this
342 
343 
344  if (!_mean_buffer.full()) {
345  _mean_buffer.push_back(spectrum_sum);
346  _mean_run_ave = std::accumulate(_mean_buffer.begin(), _mean_buffer.end(),
347  0.0)/_mean_buffer.size();
348  }
349  else {
350  // just update the running average with the new value, and
351  // take the oldest off the end, using the same principle as
352  // with the rms buffer, i.e. do not use the current
353  // measurement for the current spectrum.
354 
355  // Note there is a tiny descrepance at the point when the
356  // buffer is first full
357 
358  _mean_run_ave -= _mean_buffer.front()/_mean_buffer.size();
359  _mean_run_ave += _mean_buffer.back()/_mean_buffer.size();
360  _mean_buffer.push_back(spectrum_sum);
361  }
362 
363  // Now we can check if this spectrum has an abnormally high mean
364  // compared to the running average
365 
366  // Let us define the tolerance first, and remember, we are
367  // summing across nBins, hence sqrt(nchannels), strictly only valid
368  // for Gaussian stats
369  float spectrum_rms_tolerance = _config.spectrum_rejection_rms() *
370  _bandpass_rms/sqr_nchannels;
371 
372  //Now check, if spectrum_sum - model > tolerance, declare this
373  //time sample useless, replace its data and take care of the
374  //running averages, also cut the first_spectra worth of the
375  //first spectra, also cut spectra where badBands >= 4, see
376  //above
377 
378  /*
379  unsigned const first_spectra = 1000;
380  if (_mean_buffer.size() < first_spectra) {
381  // clip the sample, but continue to build the stats; this
382  // helps the stats converge
383  RfiPolicy::replace_sample(sample, _replacement_sample.begin());
384  }
385  else
386  */
387  if (spectrum_sum - _mean_run_ave > spectrum_rms_tolerance ||
388  bad_bands >= nbands / 2 ) {
389  // we need to remove this entire spectrum
390  adapter.mark_bad(sample);
391  //std::cout << " rms_run_ave at bad spectrum: " << _rms_run_ave << std::endl;
392  // keep a record of bad spectra
393  ++_bad_spectra;
394 
395  // now remove the last samples from the running average
396  // buffers and replace them with the values of the
397  // lastgoodspectrum
398  _mean_buffer.pop_back();
399  _rms_buffer.pop_back();
400  _mean_buffer.push_back(_replacement_sample_mean);
401  _rms_buffer.push_back(_replacement_sample_rms);
402  }
403  // else keep a copy of the original spectrum, as it is good, but
404  // clip everything down to 3 sigma
405 
406  else {
407  spectrum_sum = 0.0; //
408  spectrum_sum_sq = 0.0; //
409  data::DimensionIndex<data::Frequency> current_channel(0);
410  for(auto const& channel_value : sample) {
411  if (channel_value - _bandpass_data[current_channel]
412  < 3.0 * _rms_run_ave) {
413  _replacement_sample[current_channel] = channel_value;
414  adapter.mark_good(channel_value, current_channel);
415  }
416  spectrum_sum += _replacement_sample[current_channel];
417  spectrum_sum_sq += std::pow(_replacement_sample[current_channel],2);
418  ++current_channel;
419  }
420  // and keep the mean and rms values as computed
421  _replacement_sample_mean = spectrum_sum / nchannels;
422  _replacement_sample_rms = sqrt(spectrum_sum_sq/nchannels -
423  std::pow(_replacement_sample_mean,2));
424  }
425  // Now we have a valid spectrum, either the original or
426  // replaced; this spectrum is good, so let us do the final
427  // bits of post processing reset the spectrumSum, and SumSq,
428  // and flatten subtract the bandpass from the data.
429  //
430  /*
431  spectrum_sum = 0.0;
432  spectrum_sum_sq = 0.0;
433  current_channel = data::DimensionIndex<data::Frequency>(0);
434  for(auto & channel_value : sample) {
435  if (_bandpass_data[current_channel] > channel_value)
436  {
437  channel_value=0;
438  ++current_channel;
439  continue;
440  }
441  channel_value -= _bandpass_data[current_channel];
442  spectrum_sum += channel_value;
443  spectrum_sum_sq += std::pow(channel_value,2);
444  ++current_channel;
445  }
446 
447  // and normalize: bring to zero mean if zerodm is specified or
448  // use the running mean if not
449  spectrum_sum /= nchannels; // New meaning of these two variables
450  spectrum_rms = sqrt(spectrum_sum_sq/nchannels - std::pow(spectrum_sum,2));
451 
452  // Avoid nastiness in those first spectra by avoiding divisions
453  // by zero
454  if (spectrum_rms == 0.0) spectrum_rms = 1.0;
455  for(auto & channel_value : sample) {
456  channel_value -= (float)_zero_dm * spectrum_sum;
457  // it may be better to normalize by the running average RMS,
458  // given this is a sensitive operation. For example, an
459  // artificially low rms may scale things up
460  channel_value /= spectrum_rms;
461  // make sure this division is not introducing signals that
462  // you would have clipped
463  if (channel_value > _config.channel_rejection_rms())
464  channel_value = 0.0;
465  }
466  */
467  // The bandpass is flat and the spectrum clean and normalized,
468  // so move to the next spectrum and write out some stats:
469  unsigned const report_stats_every = 1 * _max_history;
470  if (_sample_counter == 0) {
471  // calculate fractions
472  float fraction_bad_spectra = 100.0 *
473  (float)_bad_spectra / (float)report_stats_every;
474 
475  // if the fraction of bad spectra becomes >99%, then empty the
476  // circular buffers and go into learning mode again
477  if (fraction_bad_spectra > 99.0) {
478  _rms_buffer.resize(0);
479  _mean_buffer.resize(0);
480  PANDA_LOG << "Lost track of the RFI model, retraining.";
481  }
482 
483  float fraction_bad_channels = 100.0 *
484  _fraction_bad_channels / (float)report_stats_every ;
485  PANDA_LOG_DEBUG << "RFIstats: mean=" << _mean_run_ave << " rms=" << _rms_run_ave << " bad_spec="
486  << fraction_bad_spectra << " bad_chan"
487  << fraction_bad_channels ;
488  // Reset _bad
489  _bad_spectra = 0;
490  _fraction_bad_channels = 0.0;
491 
492  }
493  // and update the model
494  //_bandpass.set_mean(_mean_run_ave);
495  //std::cout << margin << " " << spectrum_rms_tolerance << " " << _mean_run_ave << std::endl;
496  _bandpass_data.adjust_mean(_mean_run_ave);
497  _bandpass_rms=_rms_run_ave;
498  ++_sample_counter;
499  _sample_counter = _sample_counter % report_stats_every;
500  }
501 }
502 
503 template<typename RfimTraits>
504 typename Rfim<RfimTraits>::ReturnType Rfim<RfimTraits>::run(cheetah::data::TimeFrequency<Cpu, NumericalRep>& data)
505 {
506  return _policy.template exec([this](cheetah::data::TimeFrequency<Cpu, NumericalRep>& data, DataAdapter& adapter) mutable
507  {
508  this->run_impl(data, adapter);
509  }, data);
510 }
511 
512 } // namespace ampp
513 } // namespace rfim
514 } // namespace cheetah
515 } // namespace ska
516 
Some limits and constants for FLDO.
Definition: Brdz.h:35
ReturnType operator()(panda::PoolResource< Cpu > &thread, std::shared_ptr< data::TimeFrequency< Cpu, NumericalRep >> data)
task interface warpper around run
ReturnType run(data::TimeFrequency< Cpu, NumericalRep > &data)
perform rfi clipping on the data
Definition: Rfim.cpp:504