24 #include "cheetah/rfim/ampp/Rfim.h" 25 #include "cheetah/data/Units.h" 26 #include "panda/Error.h" 27 #include "panda/Log.h" 30 #include <type_traits> 38 template<
typename RfimTraits>
39 typename Rfim<RfimTraits>::ReturnType
Rfim<RfimTraits>::operator()(panda::PoolResource<Cpu>&, std::shared_ptr<cheetah::data::TimeFrequency<Cpu, NumericalRep>> data)
44 template<
typename RfimTraits>
45 Rfim<RfimTraits>::Rfim(Config
const& c,
typename RfimTraits::BandPassHandler
const& bp_handler)
47 , _zero_dm(c.zero_dm()?0:1)
49 , _fraction_bad_channels(0.0)
51 , _max_history(c.max_history())
52 , _use_mean_over_rms(0)
53 , _bandpass_data_ptr(
std::make_shared<Spectrum<NumericalRep>>())
54 , _bandpass_data(*_bandpass_data_ptr)
55 , _bandpass_handler(bp_handler)
57 assert(_max_history != 0);
58 _mean_buffer.set_capacity(_max_history);
59 _rms_buffer.set_capacity(_max_history);
62 template<
typename RfimTraits>
63 Rfim<RfimTraits>::~Rfim()
67 template<
typename RfimTraits>
68 Rfim<RfimTraits>::Rfim(Rfim&& c)
70 , _zero_dm(c._zero_dm)
71 , _bad_spectra(c._bad_spectra)
72 , _mean_buffer(
std::move(c._mean_buffer))
73 , _rms_buffer(
std::move(c._rms_buffer))
74 , _fraction_bad_channels(c._fraction_bad_channels)
75 , _sample_counter(c._sample_counter)
76 , _max_history(c._max_history)
77 , _use_mean_over_rms(c._use_mean_over_rms)
78 , _bandpass_data_ptr(c._bandpass_data_ptr)
79 , _bandpass_data(*_bandpass_data_ptr)
80 , _bandpass_handler(c._bandpass_handler)
87 MiniBand(std::size_t number_of_chans, data::DimensionSize<data::Frequency> next_band_index, BandPassConfig
const& config)
89 , band_mean(config.mean())
92 , number_of_channels(number_of_chans)
93 , next_band_index(next_band_index)
101 band_min = std::numeric_limits<float>::max();
109 std::size_t number_of_channels;
110 data::DimensionSize<data::Frequency> next_band_index;
111 BandPassConfig
const& _config;
116 template<
typename RfimTraits>
117 void Rfim<RfimTraits>::run_impl(data::TimeFrequency<Cpu, NumericalRep>
const& data, DataAdapter& adapter)
119 typedef typename data::TimeFrequency<Cpu, NumericalRep>::ConstSpectra Spectra;
120 data::DimensionSize<data::Frequency> nchannels(data.number_of_channels());
121 if((std::size_t)nchannels == 0)
return;
123 double sqr_nchannels = std::sqrt((
unsigned)nchannels);
124 unsigned nsamples = data.number_of_spectra();
126 float spectrum_rms = 0.;
131 unsigned const channels_per_band = nchannels / nbands;
132 std::vector<MiniBand> mini_bands;
135 mini_bands.reserve(nbands);
136 data::DimensionSize<data::Frequency> next_index(0);
137 for(
unsigned i = 0; i < nbands; ++i) {
138 next_index += data::DimensionSize<data::Frequency>(channels_per_band);
139 mini_bands.emplace_back(channels_per_band, next_index, _config.bandpass());
141 unsigned channels_last_band = nchannels - channels_per_band * nbands;
142 if(channels_last_band != 0) {
143 mini_bands.emplace_back(channels_last_band, next_index + channels_last_band, _config.bandpass());
148 std::unique_lock<std::mutex> bandpass_lock(_mutex);
150 if(_replacement_sample.size() != nchannels) {
151 _replacement_sample.resize(nchannels, 0);
152 _bandpass_data.resize(nchannels, _config.bandpass().mean());
155 for(
unsigned sample_number = 0U; sample_number < nsamples; ++sample_number) {
157 float spectrum_sum = 0.0;
158 float spectrum_sum_sq = 0.0;
159 std::vector<float> good_channels(nbands, 0.0);
162 for(
auto& band : mini_bands) {
168 Spectra sample = data.spectrum(sample_number);
169 if (!_rms_buffer.full())
171 adapter.mark_good(sample);
172 data::DimensionIndex<data::Frequency> current_channel(0);
173 for(
auto& mini_band : mini_bands) {
175 auto const& channel_value = sample[current_channel];
177 if (channel_value < mini_band.band_min) mini_band.band_min = channel_value;
179 mini_band.band_mean += channel_value;
180 mini_band.band_mean_sq += std::pow(channel_value,2);
182 while(++current_channel < mini_band.next_band_index);
184 current_channel -= mini_band.number_of_channels;
188 _bandpass_data[current_channel] = (_bandpass_data[current_channel] * _rms_buffer.size() + mini_band.band_min) / (_rms_buffer.size()+1);
189 _replacement_sample[current_channel] = _bandpass_data[current_channel];
191 while(++current_channel < mini_band.next_band_index);
196 spectrum_rms=std::numeric_limits<float>::max();
197 float mini_band_sum = 0.0;
198 for (MiniBand& mini_band : mini_bands) {
200 mini_band.band_mean /= mini_band.number_of_channels;
201 mini_band.band_sigma = sqrt(mini_band.band_mean_sq/mini_band.number_of_channels
202 - std::pow(mini_band.band_mean,2));
205 if(mini_band.band_sigma < spectrum_rms) spectrum_rms = mini_band.band_sigma;
207 mini_band_sum += mini_band.band_mean;
210 _rms_buffer.push_back(spectrum_rms);
211 _rms_run_ave = std::accumulate(_rms_buffer.begin(), _rms_buffer.end(),
212 0.0)/_rms_buffer.size();
218 if (_rms_buffer.full()) {
219 _mean_over_rms = 96.0 / _rms_run_ave;
244 _bandpass_data.start_time(data.start_time());
245 _bandpass_data.calculate_mean();
246 _bandpass_rms=_rms_run_ave;
247 _bandpass_handler(_bandpass_data);
251 _replacement_sample_rms = spectrum_rms;
253 _replacement_sample_mean = mini_band_sum / nbands;
268 _rms_run_ave -= (_rms_buffer.front()/_rms_buffer.size());
269 _rms_run_ave += (_rms_buffer.back()/_rms_buffer.capacity());
277 if(_use_mean_over_rms) {
279 spectrum_rms = std::abs(_mean_run_ave / _mean_over_rms);
280 _rms_buffer.push_back(spectrum_rms);
283 _rms_buffer.push_back(_replacement_sample_rms);
288 float margin = _config.channel_rejection_rms() * _rms_run_ave;
292 std::size_t total_good_channels = 0;
293 data::DimensionIndex<data::Frequency> current_channel(0);
294 for(
auto & channel_value : sample) {
295 if (channel_value - _bandpass_data[current_channel] > margin) {
298 adapter.mark_bad(channel_value, current_channel);
303 if(channels_per_band) {
304 band = (int)current_channel / channels_per_band;
309 ++good_channels[band];
310 ++total_good_channels;
311 spectrum_sum += channel_value;
317 _fraction_bad_channels += (float)(nchannels - total_good_channels)/nchannels;
318 spectrum_sum /= total_good_channels;
324 unsigned bad_bands = 0;
325 float good_channel_fraction = 0.8;
326 for (
unsigned b = 0; b < nbands; ++b){
327 if (good_channels[b] < good_channel_fraction * mini_bands[b].number_of_channels) {
336 if (total_good_channels < good_channel_fraction * nchannels)
337 bad_bands += nbands/2;
344 if (!_mean_buffer.full()) {
345 _mean_buffer.push_back(spectrum_sum);
346 _mean_run_ave = std::accumulate(_mean_buffer.begin(), _mean_buffer.end(),
347 0.0)/_mean_buffer.size();
358 _mean_run_ave -= _mean_buffer.front()/_mean_buffer.size();
359 _mean_run_ave += _mean_buffer.back()/_mean_buffer.size();
360 _mean_buffer.push_back(spectrum_sum);
369 float spectrum_rms_tolerance = _config.spectrum_rejection_rms() *
370 _bandpass_rms/sqr_nchannels;
387 if (spectrum_sum - _mean_run_ave > spectrum_rms_tolerance ||
388 bad_bands >= nbands / 2 ) {
390 adapter.mark_bad(sample);
398 _mean_buffer.pop_back();
399 _rms_buffer.pop_back();
400 _mean_buffer.push_back(_replacement_sample_mean);
401 _rms_buffer.push_back(_replacement_sample_rms);
408 spectrum_sum_sq = 0.0;
409 data::DimensionIndex<data::Frequency> current_channel(0);
410 for(
auto const& channel_value : sample) {
411 if (channel_value - _bandpass_data[current_channel]
412 < 3.0 * _rms_run_ave) {
413 _replacement_sample[current_channel] = channel_value;
414 adapter.mark_good(channel_value, current_channel);
416 spectrum_sum += _replacement_sample[current_channel];
417 spectrum_sum_sq += std::pow(_replacement_sample[current_channel],2);
421 _replacement_sample_mean = spectrum_sum / nchannels;
422 _replacement_sample_rms = sqrt(spectrum_sum_sq/nchannels -
423 std::pow(_replacement_sample_mean,2));
469 unsigned const report_stats_every = 1 * _max_history;
470 if (_sample_counter == 0) {
472 float fraction_bad_spectra = 100.0 *
473 (float)_bad_spectra / (
float)report_stats_every;
477 if (fraction_bad_spectra > 99.0) {
478 _rms_buffer.resize(0);
479 _mean_buffer.resize(0);
480 PANDA_LOG <<
"Lost track of the RFI model, retraining.";
483 float fraction_bad_channels = 100.0 *
484 _fraction_bad_channels / (float)report_stats_every ;
485 PANDA_LOG_DEBUG <<
"RFIstats: mean=" << _mean_run_ave <<
" rms=" << _rms_run_ave <<
" bad_spec=" 486 << fraction_bad_spectra <<
" bad_chan" 487 << fraction_bad_channels ;
490 _fraction_bad_channels = 0.0;
496 _bandpass_data.adjust_mean(_mean_run_ave);
497 _bandpass_rms=_rms_run_ave;
499 _sample_counter = _sample_counter % report_stats_every;
503 template<
typename RfimTraits>
508 this->run_impl(data, adapter);
Some limits and constants for FLDO.
ReturnType operator()(panda::PoolResource< Cpu > &thread, std::shared_ptr< data::TimeFrequency< Cpu, NumericalRep >> data)
task interface warpper around run
ReturnType run(data::TimeFrequency< Cpu, NumericalRep > &data)
perform rfi clipping on the data