DP3
MSWriter.h
Go to the documentation of this file.
1 // MSWriter.h: DP3 step writing to an MS
2 // Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
3 // SPDX-License-Identifier: GPL-3.0-or-later
4 
5 #ifndef DP3_STEPS_MSWRITER_H_
6 #define DP3_STEPS_MSWRITER_H_
7 
8 #include "OutputStep.h"
9 
10 #include <memory>
11 #include <thread>
12 
13 #include <aocommon/lane.h>
14 
15 #include <casacore/tables/Tables/ColumnDesc.h>
16 #include <casacore/tables/Tables/ScalarColumn.h>
17 #include <casacore/tables/Tables/ArrayColumn.h>
18 
19 #include "base/StManParsetKeys.h"
20 #include "common/Timer.h"
21 
22 namespace casacore {
23 class Table;
24 }
25 
26 namespace dp3 {
27 namespace common {
28 class ParameterSet;
29 }
30 
31 namespace steps {
32 class InputStep;
33 
34 std::unique_ptr<casacore::DataManager> MakeStMan(
35  const std::string& type_name, const std::string& instance_name,
36  const casacore::Record& record = casacore::Record());
37 
39 
55 
56 class MSWriter : public OutputStep {
57  public:
58  explicit MSWriter(const std::string& out_name, const common::ParameterSet&,
59  const std::string& prefix);
60 
61  ~MSWriter() override;
62 
63  common::Fields getRequiredFields() const override {
65  }
66 
69  bool process(std::unique_ptr<base::DPBuffer> buffer) override;
70 
72  void finish() override;
73 
75  void show(std::ostream&) const override;
76 
78  void updateInfo(const base::DPInfo&) override;
79 
81  void showTimings(std::ostream&, double duration) const override;
82 
84  static void WriteHistory(casacore::Table& ms,
85  const common::ParameterSet& parset);
86 
87  static void UpdateBeam(casacore::Table& main_table,
88  const std::string& out_col_name,
89  const base::DPInfo& info);
90 
92  static void UpdatePhaseCentre(const std::string& out_name,
93  const casacore::MDirection& new_phase_dir);
94 
96  static void UpdateSpw(const std::string& out_name, const base::DPInfo& info);
97 
99  static void UpdateObs(const std::string& out_name, const base::DPInfo& info);
100 
101  static std::string InsertNumberInFilename(const std::string& name,
102  size_t number);
103 
104  private:
105  void StartNewMs();
106  void FinishMs();
107 
111  void CreateMs(const std::string& out_name, unsigned int tile_size,
112  unsigned int tile_n_chan);
113 
115  void CopySubTables(casacore::Table& original_table);
116 
120  void ProcessBuffer(base::DPBuffer& buffer);
121 
125  void WriteData(casacore::Table& out, base::DPBuffer& buf);
126 
130  void WriteMeta(casacore::Table& out, const base::DPBuffer& buf);
131 
134  void CopyMeta(const casacore::Table& in, casacore::Table& out,
135  bool copy_time_info);
136 
138  template <typename T>
139  void FillSca(const T& value, casacore::Table& out,
140  const casacore::String& column_name) {
141  casacore::ScalarColumn<T> out_col(out, column_name);
142  out_col.fillColumn(value);
143  }
144 
146  template <typename T>
147  void FillArr(const casacore::Array<T>& value, casacore::Table& out,
148  const casacore::String& column_name) {
149  casacore::ArrayColumn<T> out_col(out, column_name);
150  out_col.fillColumn(value);
151  }
152 
154  template <typename T>
155  void CopySca(const casacore::Table& in, casacore::Table& out,
156  const casacore::String& column_name) {
157  casacore::ROScalarColumn<T> in_col(in, column_name);
158  casacore::ScalarColumn<T> out_col(out, column_name);
159  out_col.putColumn(in_col.getColumn());
160  }
161 
163  template <typename T>
164  void CopyArr(const casacore::Table& in, casacore::Table& out,
165  const casacore::String& column_name) {
166  casacore::ROArrayColumn<T> in_col(in, column_name);
167  casacore::ArrayColumn<T> out_col(out, column_name);
168  out_col.putColumn(in_col.getColumn());
169  }
170 
172  std::string name_;
174  std::string out_name_;
176  std::string chunk_name_;
177  casacore::Table ms_;
178  common::ParameterSet parset_;
179  casacore::String data_col_name_;
180  casacore::String flag_col_name_;
181  casacore::String weight_col_name_;
182  bool overwrite_;
183  bool copy_corr_data_;
184  bool copy_model_data_;
185  unsigned int tile_size_;
186  unsigned int tile_n_chan_;
187  unsigned int nr_times_flush_;
188  unsigned int nr_done_;
191  double chunk_duration_ = 0.0;
193  double chunk_start_time_ = 0.0;
194  size_t current_chunk_index_ = 0;
195 
196  std::string vds_dir_;
197  std::string cluster_desc_;
198  base::StManParsetKeys st_man_keys_;
199  // For now, metadata compression is turned on only when explicitly turned on
200  // in compilation. Once we're confident enough about this feature, we can
201  // enable it by default.
202  bool scalar_flags_ = METADATA_COMPRESSION_DEFAULT;
203  bool uvw_compression_ = METADATA_COMPRESSION_DEFAULT;
204  bool antenna_compression_ = METADATA_COMPRESSION_DEFAULT;
205 
207  common::NSTimer timer_;
208 
214  common::NSTimer writer_timer_;
215 
220  common::NSTimer create_task_timer_;
221 
234  aocommon::Lane<std::unique_ptr<base::DPBuffer>> write_queue_{3};
235 
237  void CreateTask(std::unique_ptr<base::DPBuffer> buffer);
238 
243  std::thread write_queue_thread_;
244 
246  void WriteQueueProcess();
247 
251  bool use_write_thread_{false};
252 
260  bool is_write_thread_active_{false};
261 
263  void StopWriteThread();
264 };
265 
266 } // namespace steps
267 } // namespace dp3
268 
269 #endif
Buffer holding the data of a timeslot/band.
Definition: DPBuffer.h:92
General info about DP3 data processing attributes like averaging.
Definition: DPInfo.h:35
Definition: Fields.h:16
Implements a map of Key-Value pairs.
Definition: ParameterSet.h:31
DP3 step writing to an MS.
Definition: MSWriter.h:56
static void UpdateSpw(const std::string &out_name, const base::DPInfo &info)
Update the SPECTRAL_WINDOW table for averaged channels.
void finish() override
Finish the processing of this step and subsequent steps.
MSWriter(const std::string &out_name, const common::ParameterSet &, const std::string &prefix)
static void UpdateBeam(casacore::Table &main_table, const std::string &out_col_name, const base::DPInfo &info)
static void UpdateObs(const std::string &out_name, const base::DPInfo &info)
Update the OBSERVATION table with the correct start and end time.
void updateInfo(const base::DPInfo &) override
Update the general info.
static std::string InsertNumberInFilename(const std::string &name, size_t number)
void show(std::ostream &) const override
Show the step parameters.
static void WriteHistory(casacore::Table &ms, const common::ParameterSet &parset)
Write the parset info into the HISTORY table of the MS.
void showTimings(std::ostream &, double duration) const override
Show the timings.
common::Fields getRequiredFields() const override
Get the fields required by the current step.
Definition: MSWriter.h:63
static void UpdatePhaseCentre(const std::string &out_name, const casacore::MDirection &new_phase_dir)
Update the FIELD table with the new phase center.
bool process(std::unique_ptr< base::DPBuffer > buffer) override
Base class for output steps.
Definition: OutputStep.h:15
static constexpr dp3::common::Fields kWeightsField
Definition: Step.h:64
static constexpr dp3::common::Fields kUvwField
Definition: Step.h:66
static constexpr dp3::common::Fields kDataField
Definition: Step.h:60
static constexpr dp3::common::Fields kFlagsField
Definition: Step.h:62
Definition: InputStep.h:21
BaseTimer< std::chrono::steady_clock > NSTimer
Definition: Timer.h:129
std::unique_ptr< casacore::DataManager > MakeStMan(const std::string &type_name, const std::string &instance_name, const casacore::Record &record=casacore::Record())
This file has generic helper routines for testing steps.
Definition: AntennaConfig.h:53