MatLogger2  1.0.0
Library for logging of numeric data to HDF5 MAT-files, which is RT-safe and multithreaded.
mat_appender.cpp
Go to the documentation of this file.
3 
4 #include <algorithm>
5 #include <atomic>
6 #include <list>
7 
8 #include "thread.h"
9 
10 namespace
11 {
12  template <typename Func>
13  double measure_sec(Func f)
14  {
15  auto tic = std::chrono::high_resolution_clock::now();
16 
17  f();
18 
19  auto toc = std::chrono::high_resolution_clock::now();
20 
21  return std::chrono::duration_cast<std::chrono::nanoseconds>(toc-tic).count()*1e-9;
22  }
23 }
24 
25 using namespace XBot::matlogger2;
26 
27 namespace XBot
28 {
29 
31 {
32  // weak pointers to all registered loggers
33  std::list<MatLogger2::WeakPtr> _loggers;
34 
35  // mutex for protecting _loggers
37 
38  // main function for the flusher thread
39  void flush_thread_main();
40 
41  // call flush_available_data() on all alive loggers
42  int flush_available_data_all();
43 
44  // callback that notifies when a enough data is available
45  void on_block_available(VariableBuffer::BufferInfo buf_info);
46 
47  // bytes available on the queue
49 
50  // pointer flusher thread
51  std::unique_ptr<ThreadType> _flush_thread;
52 
53  // mutex and condition variable for flusher thread
56 
57  // loggers use this flag to wake up the flusher thread
58  std::atomic<bool> _flush_thread_wake_up;
59 
60  // flag specifying if the flusher thread should exit
61  std::atomic<bool> _flush_thread_run;
62 
63  Impl();
64 
65 };
66 
68 {
69  return Ptr(new MatAppender);
70 }
71 
73  _available_bytes(0),
74  _flush_thread_wake_up(false),
75  _flush_thread_run(false)
76 {
77 
78 }
79 
80 
81 MatAppender::Impl& MatAppender::impl()
82 {
83  return *_impl;
84 }
85 
87 {
88  /* This callback is invoked whenever a new block is pushed into the queue
89  * on any registered logger
90  */
91 
92  const int NOTIFY_THRESHOLD_BYTES = 30e6;
93  const double NOTIFY_THRESHOLD_SPACE_AVAILABLE = 0.5;
94 
95  // increase available bytes count
97 
98  // if enough new data is available, or the queue is getting full, notify
99  // the flusher thread
100  if(_available_bytes > NOTIFY_THRESHOLD_BYTES ||
101  buf_info.variable_free_space < NOTIFY_THRESHOLD_SPACE_AVAILABLE)
102  {
103  std::lock_guard<MutexType> lock(_cond_mutex);
104 
105  _available_bytes = 0;
106  _flush_thread_wake_up = true;
107  _cond.notify_one();
108 
109  }
110 }
111 
112 MatAppender::MatAppender()
113 {
114  _impl = std::make_unique<Impl>();
115 }
116 
117 
118 bool MatAppender::add_logger(std::shared_ptr<MatLogger2> logger)
119 {
120  // check for nullptr
121  if(!logger)
122  {
123  fprintf(stderr, "Error in %s: null pointer provided as argument\n", __PRETTY_FUNCTION__);
124  return false;
125  }
126 
127  // acquire exclusive access to registered loggers list
128  std::lock_guard<MutexType> lock(impl()._loggers_mutex);
129 
130  // predicate that is used to tell if the provided logger is already registered
131  auto predicate = [logger](const auto& elem)
132  {
133  bool ret = false;
134  auto sp = elem.lock();
135 
136  if(sp && sp.get() == logger.get())
137  {
138  ret = true;
139  }
140 
141  return ret;
142 
143  };
144 
145  // check if the provided logger is already registered
146  auto it = std::find_if(impl()._loggers.begin(),
147  impl()._loggers.end(),
148  predicate);
149 
150  if(it != impl()._loggers.end())
151  {
152  fprintf(stderr, "Error in %s: trying to add same logger twice\n", __PRETTY_FUNCTION__);
153  return false;
154  }
155 
156  namespace pl = std::placeholders;
157 
159  // thread !!!
160  // All loggers keep a weak pointer to the logger manager; the on_block_available()
161  // callback is invoked only if the manager is alive; moreover, while the callback
162  // is being invoked, the manager is kept alive by calling weak_ptr<>::lock()
163  std::weak_ptr<MatAppender> self = shared_from_this();
164 
165  // set on_block_available() as callback, taking care of possible death of
166  // the manager while the callback is being processed
167  logger->set_on_data_available_callback(
168  [this, self](VariableBuffer::BufferInfo buf_info)
169  {
170  // lock weak-ptr by creating a shared pointer
171  auto self_shared_ptr = self.lock();
172 
173  // if we managed to lock the manager, it'll be kept alive till
174  // the current scope exit
175  if(self_shared_ptr)
176  {
177  impl().on_block_available(buf_info);
178  }
179  }
180  );
181 
182  // register the logger
183  impl()._loggers.emplace_back(logger);
184 
185  return true;
186 }
187 
189 
190 {
191  impl()._flush_thread_run = true;
192  impl()._flush_thread.reset(new ThreadType(&MatAppender::Impl::flush_thread_main,
193  _impl.get()
194  )
195  );
196 }
197 
199 {
200  int bytes = 0;
201 
202  // acquire exclusive access to registered loggers list
203  std::lock_guard<MutexType> lock(_loggers_mutex);
204 
205  // define function that processes a single registered logger,
206  // and marks it for removal if it is expired
207  auto process_or_remove = [&bytes](auto& logger_weak)
208  {
209  // !!! this is the main synchronization point that prevents loggers
210  // to be destruced while their data is being flushed to disk !!!
211  // We try to lock the current logger by creating a shared pointer
212  MatLogger2::Ptr logger = logger_weak.lock();
213 
214  // If we managed to lock it, it'll be kept alive till the scope exit
215  if(!logger)
216  {
217  printf("MatAppender: removing expired logger..\n");
218  return true; // removed expired logger
219  }
220 
221  bytes += logger->flush_available_data();
222 
223  return false; // don't remove
224  };
225 
226  // process all loggers, remove those that are expired
227  _loggers.remove_if(process_or_remove);
228 
229  return bytes;
230 }
231 
232 
234 {
235  uint64_t total_bytes = 0;
236  double work_time_total = 0;
237  double sleep_time_total = 0;
238 
239  // call flush_available_data() an all alive loggers, then wait for notifications
240  while(_flush_thread_run)
241  {
242 
243  int bytes = 0;
244  double work_time = measure_sec([this, &bytes, &total_bytes](){
245  bytes = flush_available_data_all();
246  total_bytes += bytes;
247  });
248 
249  printf("Worked for %.2f sec (%.1f MB flushed)..",
250  work_time, bytes*1e-6);
251  printf("..average load is %.2f \n", 1.0/(1.0+sleep_time_total/work_time_total));
252 
253  std::unique_lock<MutexType> lock(_cond_mutex);
254  double sleep_time = measure_sec([this, &lock](){
255  _cond.wait(lock, [this]{ return _flush_thread_wake_up.load(); });
256  });
257 
258  // reset condition
259  _flush_thread_wake_up = false;
260 
261 
262  work_time_total += work_time;
263  sleep_time_total += sleep_time;
264 
265  }
266 
267  printf("Flusher thread exiting.. Written %.1f MB\n", total_bytes*1e-6);
268 }
269 
270 
272 {
273  printf("%s\n", __PRETTY_FUNCTION__);
274 
275  if(!impl()._flush_thread)
276  {
277  return;
278  }
279 
280  // force the flusher thread to exit
281  {
282  std::lock_guard<MutexType> lock(impl()._cond_mutex);
283  impl()._flush_thread_run = false;
284  impl()._flush_thread_wake_up = true;
285  impl()._cond.notify_one();
286  }
287 
288  // join with the flusher thread
289  impl()._flush_thread->join();
290 
291 }
292 
293 
294 
295 
296 }
std::thread ThreadType
Definition: thread.h:21
void on_block_available(VariableBuffer::BufferInfo buf_info)
~MatAppender()
Destructor will join with the flusher thread if it was spawned by the user.
The MatAppender class allows to flush data to disk from multiple MatLogger2 loggers at once...
Definition: mat_appender.h:24
std::atomic< bool > _flush_thread_run
bool add_logger(std::shared_ptr< MatLogger2 > logger)
Register a MAT-logger to the appender.
static Ptr MakeInstance()
Returns a shared pointer to a new MatAppender object.
std::atomic< bool > _flush_thread_wake_up
std::list< MatLogger2::WeakPtr > _loggers
std::shared_ptr< MatLogger2 > Ptr
Definition: matlogger2.h:73
std::shared_ptr< MatAppender > Ptr
Definition: mat_appender.h:30
void start_flush_thread()
Spawn a thread that will automatically flush data to disk whenever enough data is available...
std::condition_variable CondVarType
Definition: thread.h:20
std::mutex MutexType
Definition: thread.h:19
std::unique_ptr< ThreadType > _flush_thread