12 template <
typename Func>
13 double measure_sec(Func f)
15 auto tic = std::chrono::high_resolution_clock::now();
19 auto toc = std::chrono::high_resolution_clock::now();
21 return std::chrono::duration_cast<std::chrono::nanoseconds>(toc-tic).count()*1e-9;
39 void flush_thread_main();
42 int flush_available_data_all();
74 _flush_thread_wake_up(false),
75 _flush_thread_run(false)
92 const int NOTIFY_THRESHOLD_BYTES = 30e6;
93 const double NOTIFY_THRESHOLD_SPACE_AVAILABLE = 0.5;
112 MatAppender::MatAppender()
114 _impl = std::make_unique<Impl>();
123 fprintf(stderr,
"Error in %s: null pointer provided as argument\n", __PRETTY_FUNCTION__);
131 auto predicate = [logger](
const auto& elem)
134 auto sp = elem.lock();
136 if(sp && sp.get() == logger.get())
146 auto it = std::find_if(impl().
_loggers.begin(),
147 impl()._loggers.end(),
150 if(it != impl()._loggers.end())
152 fprintf(stderr,
"Error in %s: trying to add same logger twice\n", __PRETTY_FUNCTION__);
156 namespace pl = std::placeholders;
163 std::weak_ptr<MatAppender>
self = shared_from_this();
167 logger->set_on_data_available_callback(
171 auto self_shared_ptr =
self.lock();
177 impl().on_block_available(buf_info);
183 impl()._loggers.emplace_back(logger);
191 impl()._flush_thread_run =
true;
207 auto process_or_remove = [&bytes](
auto& logger_weak)
217 printf(
"MatAppender: removing expired logger..\n");
221 bytes += logger->flush_available_data();
227 _loggers.remove_if(process_or_remove);
235 uint64_t total_bytes = 0;
236 double work_time_total = 0;
237 double sleep_time_total = 0;
244 double work_time = measure_sec([
this, &bytes, &total_bytes](){
246 total_bytes += bytes;
249 printf(
"Worked for %.2f sec (%.1f MB flushed)..",
250 work_time, bytes*1e-6);
251 printf(
"..average load is %.2f \n", 1.0/(1.0+sleep_time_total/work_time_total));
254 double sleep_time = measure_sec([
this, &lock](){
262 work_time_total += work_time;
263 sleep_time_total += sleep_time;
267 printf(
"Flusher thread exiting.. Written %.1f MB\n", total_bytes*1e-6);
273 printf(
"%s\n", __PRETTY_FUNCTION__);
282 std::lock_guard<MutexType> lock(impl().
_cond_mutex);
283 impl()._flush_thread_run =
false;
284 impl()._flush_thread_wake_up =
true;
285 impl()._cond.notify_one();
289 impl()._flush_thread->join();
void on_block_available(VariableBuffer::BufferInfo buf_info)
~MatAppender()
Destructor will join with the flusher thread if it was spawned by the user.
The MatAppender class allows to flush data to disk from multiple MatLogger2 loggers at once...
int flush_available_data_all()
std::atomic< bool > _flush_thread_run
bool add_logger(std::shared_ptr< MatLogger2 > logger)
Register a MAT-logger to the appender.
static Ptr MakeInstance()
Returns a shared pointer to a new MatAppender object.
std::atomic< bool > _flush_thread_wake_up
std::list< MatLogger2::WeakPtr > _loggers
std::shared_ptr< MatLogger2 > Ptr
std::shared_ptr< MatAppender > Ptr
double variable_free_space
void start_flush_thread()
Spawn a thread that will automatically flush data to disk whenever enough data is available...
std::condition_variable CondVarType
std::unique_ptr< ThreadType > _flush_thread