SourceXtractorPlusPlus 0.21
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17/*
18 * MultiThreadedMeasurement.cpp
19 *
20 * Created on: May 23, 2018
21 * Author: mschefer
22 */
23
24#include <chrono>
25#include <ElementsKernel/Logging.h>
26#include <csignal>
27
30
31using namespace SourceXtractor;
32
34
35
41
43 m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
44}
45
47 m_input_done = true;
50 logger.debug() << "All worker threads done!";
51}
52
54 // Wait until all worker threads are done
56
57 // Wait until the output queue is empty
58 while (true) {
59 {
61 if (m_output_queue.empty()) {
62 break;
63 }
64 else if (m_thread_pool->checkForException(false)) {
65 logger.fatal() << "An exception was thrown from a worker thread";
67 }
68 else if (m_thread_pool->activeThreads() == 0) {
69 throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
70 }
71 }
73 }
74}
75
77 // Force computation of SourceID here, where the order is still deterministic
78 for (auto& source : *source_group) {
79 source.getProperty<SourceID>();
80 }
81
82 // Put the new SourceGroup into the input queue
84 auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
85 // Trigger measurements
86 for (auto& source : *source_group) {
88 }
89 // Pass to the output thread
90 {
93 }
95 };
97 (*lambda)();
98 };
101}
102
104 logger.debug() << "Starting output thread";
105 try {
106 measurement->outputThreadLoop();
107 }
108 catch (const Elements::Exception& e) {
109 logger.fatal() << "Output thread got an exception!";
110 logger.fatal() << e.what();
111 if (!measurement->m_abort_raised.exchange(true)) {
112 logger.fatal() << "Aborting the execution";
114 }
115 }
116 logger.debug() << "Stopping output thread";
117}
118
120 while (m_thread_pool->activeThreads() > 0) {
122
123 // Wait for something in the output queue
124 if (m_output_queue.empty()) {
126 }
127
128 // Process the output queue
129 while (!m_output_queue.empty()) {
130 sendSource(std::move(m_output_queue.front().second));
131 m_output_queue.pop_front();
132 }
133
135 m_output_queue.empty()) {
136 break;
137 }
138 }
139}
140
static Logging getLogger(const std::string &name="")
void submit(Task task)
size_t running() const
void block(bool throw_on_exception=true)
size_t queued() const
bool checkForException(bool rethrow=false)
size_t activeThreads() const
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
void receiveProcessSignal(const ProcessSourcesEvent &event) override
static void outputThreadStatic(MultithreadedMeasurement *measurement)
void sendProcessSignal(const ProcessSourcesEvent &event) const
void sendSource(std::unique_ptr< SourceGroupInterface > source) const
T join(T... args)
T joinable(T... args)
T move(T... args)
static Elements::Logging logger
T raise(T... args)
T sleep_for(T... args)
Event received by SourceGrouping to request the processing of some of the Sources stored.