SourceXtractorPlusPlus
0.21
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <chrono>
25
#include <ElementsKernel/Logging.h>
26
#include <csignal>
27
28
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
29
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
30
31
using namespace
SourceXtractor
;
32
33
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
34
35
36
MultithreadedMeasurement::~MultithreadedMeasurement
() {
37
if
(
m_output_thread
->
joinable
()) {
38
m_output_thread
->
join
();
39
}
40
}
41
42
void
MultithreadedMeasurement::startThreads
() {
43
m_output_thread
= Euclid::make_unique<std::thread>(
outputThreadStatic
,
this
);
44
}
45
46
void
MultithreadedMeasurement::stopThreads
() {
47
m_input_done
=
true
;
48
m_thread_pool
->
block
();
49
m_output_thread
->
join
();
50
logger
.debug() <<
"All worker threads done!"
;
51
}
52
53
void
MultithreadedMeasurement::synchronizeThreads
() {
54
// Wait until all worker threads are done
55
m_thread_pool
->
block
();
56
57
// Wait until the output queue is empty
58
while
(
true
) {
59
{
60
std::unique_lock<std::mutex>
output_lock
(
m_output_queue_mutex
);
61
if
(
m_output_queue
.empty()) {
62
break
;
63
}
64
else
if
(
m_thread_pool
->
checkForException
(
false
)) {
65
logger
.fatal() <<
"An exception was thrown from a worker thread"
;
66
m_thread_pool
->
checkForException
(
true
);
67
}
68
else
if
(
m_thread_pool
->
activeThreads
() == 0) {
69
throw
Elements::Exception
() <<
"No active threads and the queue is not empty! Please, report this as a bug"
;
70
}
71
}
72
std::this_thread::sleep_for
(
std::chrono::milliseconds
(100));
73
}
74
}
75
76
void
MultithreadedMeasurement::receiveSource
(
std::unique_ptr<SourceGroupInterface>
source_group
) {
77
// Force computation of SourceID here, where the order is still deterministic
78
for
(
auto
&
source
: *
source_group
) {
79
source
.getProperty<
SourceID
>();
80
}
81
82
// Put the new SourceGroup into the input queue
83
auto
order_number
=
m_group_counter
;
84
auto
lambda = [
this
,
order_number
,
source_group
=
std::move
(
source_group
)]()
mutable
{
85
// Trigger measurements
86
for
(
auto
&
source
: *
source_group
) {
87
m_source_to_row
(
source
);
88
}
89
// Pass to the output thread
90
{
91
std::unique_lock<std::mutex>
output_lock
(
m_output_queue_mutex
);
92
m_output_queue
.emplace_back(
order_number
,
std::move
(
source_group
));
93
}
94
m_new_output
.
notify_one
();
95
};
96
auto
lambda_copyable
= [lambda =
std::make_shared<decltype(lambda)>
(
std::move
(lambda))](){
97
(*lambda)();
98
};
99
m_thread_pool
->
submit
(
lambda_copyable
);
100
++
m_group_counter
;
101
}
102
103
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
*
measurement
) {
104
logger
.debug() <<
"Starting output thread"
;
105
try
{
106
measurement
->outputThreadLoop();
107
}
108
catch
(
const
Elements::Exception
& e) {
109
logger
.fatal() <<
"Output thread got an exception!"
;
110
logger
.fatal() << e.what();
111
if
(!
measurement
->m_abort_raised.exchange(
true
)) {
112
logger
.fatal() <<
"Aborting the execution"
;
113
::raise
(
SIGTERM
);
114
}
115
}
116
logger
.debug() <<
"Stopping output thread"
;
117
}
118
119
void
MultithreadedMeasurement::outputThreadLoop
() {
120
while
(
m_thread_pool
->
activeThreads
() > 0) {
121
std::unique_lock<std::mutex>
output_lock
(
m_output_queue_mutex
);
122
123
// Wait for something in the output queue
124
if
(
m_output_queue
.empty()) {
125
m_new_output
.
wait_for
(
output_lock
,
std::chrono::milliseconds
(100));
126
}
127
128
// Process the output queue
129
while
(!
m_output_queue
.empty()) {
130
sendSource
(
std::move
(
m_output_queue
.front().second));
131
m_output_queue
.pop_front();
132
}
133
134
if
(
m_input_done
&&
m_thread_pool
->
running
() +
m_thread_pool
->
queued
() == 0 &&
135
m_output_queue
.empty()) {
136
break
;
137
}
138
}
139
}
140
141
void
MultithreadedMeasurement::receiveProcessSignal
(
const
ProcessSourcesEvent
&
event
) {
142
sendProcessSignal
(
event
);
143
}
MultithreadedMeasurement.h
SourceID.h
Elements::Exception
Elements::Logging
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
Euclid::ThreadPool::submit
void submit(Task task)
Euclid::ThreadPool::running
size_t running() const
Euclid::ThreadPool::block
void block(bool throw_on_exception=true)
Euclid::ThreadPool::queued
size_t queued() const
Euclid::ThreadPool::checkForException
bool checkForException(bool rethrow=false)
Euclid::ThreadPool::activeThreads
size_t activeThreads() const
SourceXtractor::MultithreadedMeasurement
Definition
MultithreadedMeasurement.h:38
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
~MultithreadedMeasurement() override
Definition
MultithreadedMeasurement.cpp:36
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition
MultithreadedMeasurement.h:71
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition
MultithreadedMeasurement.cpp:119
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::synchronizeThreads
void synchronizeThreads() override
Definition
MultithreadedMeasurement.cpp:53
SourceXtractor::MultithreadedMeasurement::receiveSource
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
Definition
MultithreadedMeasurement.cpp:76
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
Definition
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::stopThreads
void stopThreads() override
Definition
MultithreadedMeasurement.cpp:46
SourceXtractor::MultithreadedMeasurement::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition
MultithreadedMeasurement.cpp:141
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition
MultithreadedMeasurement.h:62
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition
MultithreadedMeasurement.cpp:103
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition
MultithreadedMeasurement.cpp:42
SourceXtractor::PipelineEmitter< SourceGroupInterface >::sendProcessSignal
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition
PipelineStage.h:92
SourceXtractor::PipelineEmitter< SourceGroupInterface >::sendSource
void sendSource(std::unique_ptr< SourceGroupInterface > source) const
Definition
PipelineStage.h:85
SourceXtractor::SourceID
Definition
SourceID.h:33
std::chrono::milliseconds
std::function
std::thread::join
T join(T... args)
std::thread::joinable
T joinable(T... args)
std::move
T move(T... args)
Euclid::Configuration::logger
static Elements::Logging logger
SourceXtractor
Definition
Aperture.h:30
std::condition_variable::notify_one
T notify_one(T... args)
std::raise
T raise(T... args)
std::this_thread::sleep_for
T sleep_for(T... args)
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored.
Definition
PipelineStage.h:33
std::condition_variable::wait_for
T wait_for(T... args)
Generated by
1.10.0