Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SpsTask.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/sps/detail/SpsTask.h"
25 #include "panda/TupleUtilities.h"
26 #include <limits>
27 
28 
29 namespace ska {
30 namespace cheetah {
31 namespace sps {
32 
33 
34 template<typename DmHandler, typename SpHandler, typename CommonTraits>
35 SpsTask<DmHandler, SpHandler, CommonTraits>::SpsTask(DmHandler dm_handler, SpHandler sp_handler)
36  : _dm_handler(std::move(dm_handler))
37  , _sp_handler(std::move(sp_handler))
38 {
39 }
40 
41 template<typename DmHandler, typename SpHandler, typename CommonTraits>
42 SpsTask<DmHandler, SpHandler, CommonTraits>::~SpsTask()
43 {
44 }
45 
46 template<typename DmHandler, typename SpHandler, typename CommonTraits>
47 template<typename DataType>
49 {
50  return _dm_handler(std::forward<DataType>(d));
51 }
52 
53 template<typename DmHandler, typename SpHandler, typename CommonTraits>
54 template<typename DataType>
56 {
57  return _sp_handler(std::forward<DataType>(d));
58 }
59 
60 template<typename DmHandler, typename SpHandler, typename CommonTraits>
61 std::shared_ptr<panda::ResourceJob> SpsTask<DmHandler, SpHandler, CommonTraits>::submit(BufferType)
62 {
63  PANDA_LOG_WARN << "no sps algo defined";
64  return std::make_shared<panda::ResourceJob>();
65 }
66 
67 
68 template<typename DmHandler, typename SpHandler, typename CommonTraits
69  ,typename PoolType, typename... Algos>
71  DmHandler dm_handler
72  , SpHandler sp_handler
73  , PoolType& pool
74  , Algos&&... algos)
75  : BaseT(dm_handler, sp_handler)
76  , _algos(std::forward<Algos>(algos)...)
77  , _pool(&pool)
78 {
79 }
80 
81 template<typename DmHandler, typename SpHandler, typename CommonTraits
82  ,typename PoolType, typename... Algos>
83 std::shared_ptr<panda::ResourceJob> SpecificSpsTask<DmHandler, SpHandler, CommonTraits, PoolType, Algos...>::submit(BufferType buffer)
84 {
85  std::shared_ptr<SpecificSpsTask> self = this->shared_from_this();
86  return _pool->submit(self, std::move(buffer));
87 }
88 
89 template<typename DmHandler, typename SpHandler, typename CommonTraits
90  ,typename PoolType, typename... Algos>
91 template<typename Arch>
92 void SpecificSpsTask<DmHandler, SpHandler, CommonTraits, PoolType, Algos...>::operator()(panda::PoolResource<Arch>& resource, BufferType buffer)
93 {
94  auto& algo = _algos.template get<Arch>();
95  algo(resource, buffer, this->_dm_handler, this->_sp_handler);
96 }
97 
98 namespace {
99  template<typename Arch>
100  struct BufferOverlap
101  {
102  template<typename AlgoTuple>
103  inline void operator()(AlgoTuple const& algos, std::size_t& max_size)
104  {
105  std::size_t bs = algos.template get<Arch>().buffer_overlap();
106  if(bs > max_size) {
107  max_size = bs;
108  }
109  }
110  };
111 
112  template<typename Arch>
113  struct SetDedispersionStrategy
114  {
115  template<typename AlgoTuple, typename Data>
116  inline void operator()(AlgoTuple& algos, std::size_t& memory, std::size_t memory_limit, Data const& data )
117  {
118  std::size_t mem = algos.template get<Arch>().set_dedispersion_strategy(memory_limit, data);
119  if(mem < memory) memory = mem;
120  }
121  };
122 } // namespace
123 
124 template<typename DmHandler, typename SpHandler, typename CommonTraits>
126 {
127  return 0;
128 }
129 
130 template<typename DmHandler, typename SpHandler, typename CommonTraits
131  ,typename PoolType, typename... Algos>
133 {
134  std::size_t buffer_overlap=0;
135  panda::ForEach<typename ImplementationsType::Architectures, BufferOverlap>::exec(_algos, buffer_overlap);
136  return buffer_overlap;
137 }
138 
139 template<typename DmHandler, typename SpHandler, typename CommonTraits>
141  std::size_t memory_min
142  ,TimeFrequencyType const&)
143 {
144  return memory_min;
145 }
146 
147 template<typename DmHandler, typename SpHandler, typename CommonTraits
148  ,typename PoolType, typename... Algos>
150  std::size_t memory_limit
151  ,TimeFrequencyType const& data)
152 {
153  std::size_t memory=(memory_limit==0)?std::numeric_limits<std::size_t>::max():memory_limit;
154  panda::ForEach<typename ImplementationsType::Architectures, SetDedispersionStrategy>::exec(_algos, memory, memory_limit, data);
155  return memory;
156 }
157 
158 template<typename DmHandler, typename SpHandler, typename CommonTraits>
159 void SpsTask<DmHandler, SpHandler, CommonTraits>::finish(BufferFillerType& agg_buf_filler)
160 {
161  panda::FinishTask<SpsTask> finish(*this);
162  agg_buf_filler.full_buffer_handler( [&](typename BufferFillerType::AggregationBufferType)
163  {
164  PANDA_LOG_WARN << "Sps has no implementation active";
165  }
166  );
167  if(agg_buf_filler.flush())
168  {
169  finish.wait();
170  }
171 }
172 
173 template<typename DmHandler, typename SpHandler, typename CommonTraits
174  ,typename PoolType, typename... Algos>
176 {
177  panda::FinishTask<SpecificSpsTask> finish(*this);
178  agg_buf_filler.full_buffer_handler( [&](typename BufferFillerType::AggregationBufferType buffer)
179  {
180  auto job = _pool->submit(finish, std::move(buffer));
181  job->wait();
182  if(job->status() != ska::panda::ResourceJob::JobStatus::Finished )
183  {
184  finish.finished();
185  }
186  }
187  );
188  if(agg_buf_filler.flush())
189  {
190  finish.wait();
191  }
192 }
193 
194 
195 } // namespace sps
196 } // namespace cheetah
197 } // namespace ska
void finish(BufferFillerType &buffer_filler) override
block until all buffers are flushed and processed
Definition: SpsTask.cpp:175
void call_dm_handler(DataType &&d) const
call the dm handler directly with the provided data
Definition: SpsTask.cpp:48
Single pulse search asynchronous task.
Definition: SpsTask.h:47
std::size_t buffer_overlap() const override
the number of samples to be copied from the end of one buffer in tot the beginning of the next ...
Definition: SpsTask.cpp:132
Some limits and constants for FLDO.
Definition: Brdz.h:35
std::size_t set_dedispersion_strategy(std::size_t memory_min, TimeFrequencyType const &data) override
specify parameters to generate a suitable dedispersion compute strategy
Definition: SpsTask.cpp:149
void call_sp_handler(DataType &&d) const
call the sp handler directly with the provided data
Definition: SpsTask.cpp:55
void operator()(panda::PoolResource< Arch > &, BufferType buffer)
execute Sps task on a given accelerator
Definition: SpsTask.cpp:92