SourceXtractorPlusPlus
0.21
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SEImplementation
src
lib
Prefetcher
Prefetcher.cpp
Go to the documentation of this file.
1
18
#include <ElementsKernel/Logging.h>
19
#include "AlexandriaKernel/memory_tools.h"
20
#include "
SEImplementation/Prefetcher/Prefetcher.h
"
21
22
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Prefetcher"
);
23
24
25
namespace
SourceXtractor
{
26
30
template
<
typename
Lock>
31
struct
ReverseLock
{
32
explicit
ReverseLock
(
Lock
&
lock
) :
m_lock
(
lock
) {
33
m_lock
.unlock();
34
}
35
36
~ReverseLock
() {
37
m_lock
.lock();
38
}
39
40
private
:
41
Lock
&
m_lock
;
42
};
43
44
Prefetcher::Prefetcher
(
const
std::shared_ptr<Euclid::ThreadPool>
&
thread_pool
,
unsigned
max_queue_size
)
45
: m_thread_pool(
thread_pool
), m_stop(
false
), m_semaphore(
max_queue_size
) {
46
m_output_thread
= Euclid::make_unique<std::thread>(&
Prefetcher::outputLoop
,
this
);
47
}
48
49
Prefetcher::~Prefetcher
() {
50
if
(
m_output_thread
->
joinable
())
51
wait
();
52
}
53
54
void
Prefetcher::receiveSource
(
std::unique_ptr<SourceInterface>
message) {
55
m_semaphore
.
acquire
();
56
57
intptr_t
source_addr
=
reinterpret_cast<
intptr_t
>
(message.get());
58
{
59
std::lock_guard<std::mutex>
queue_lock
(
m_queue_mutex
);
60
m_received
.emplace_back(
EventType::SOURCE
,
source_addr
);
61
}
62
63
// Pre-fetch in separate threads
64
auto
lambda = [
this
,
source_addr
, message =
std::move
(message)]()
mutable
{
65
for
(
auto
&
prop
:
m_prefetch_set
) {
66
message->getProperty(
prop
);
67
}
68
{
69
std::lock_guard<std::mutex>
lock
(
m_queue_mutex
);
70
m_finished_sources
.emplace(
source_addr
,
std::move
(message));
71
}
72
m_new_output
.
notify_one
();
73
};
74
auto
lambda_copyable
= [lambda =
std::make_shared<decltype(lambda)>
(
std::move
(lambda))](){
75
(*lambda)();
76
};
77
m_thread_pool
->
submit
(
lambda_copyable
);
78
}
79
80
void
Prefetcher::requestProperty
(
const
PropertyId
&
property_id
) {
81
m_prefetch_set
.emplace(
property_id
);
82
logger
.
debug
() <<
"Requesting prefetch of "
<<
property_id
.getString();
83
}
84
85
void
Prefetcher::outputLoop
() {
86
logger
.
debug
() <<
"Starting prefetcher output loop"
;
87
88
while
(
m_thread_pool
->
activeThreads
() > 0) {
89
std::unique_lock<std::mutex>
output_lock
(
m_queue_mutex
);
90
91
// Wait for something new
92
m_new_output
.
wait_for
(
output_lock
,
std::chrono::milliseconds
(1000));
93
94
// Process the output queue
95
// This is, release sources when the front of the received has been processed
96
while
(!
m_received
.empty()) {
97
auto
next
=
m_received
.front();
98
// If the front is a ProcessSourceEvent, everything received before is done,
99
// so pass downstream
100
if
(
next
.m_event_type ==
EventType::PROCESS_SOURCE
) {
101
auto
event
=
m_event_queue
.front();
102
m_event_queue
.pop_front();
103
logger
.
debug
() <<
"ProcessSourceEvent released"
;
104
{
105
ReverseLock
<
decltype
(
output_lock
)>
release_lock
(
output_lock
);
106
sendProcessSignal
(
event
);
107
}
108
m_received
.pop_front();
109
continue
;
110
}
111
// Find if the matching source is done
112
auto
processed
=
m_finished_sources
.find(
next
.m_source_addr);
113
// If not, we can't keep going, so exit here
114
if
(
processed
==
m_finished_sources
.end()) {
115
logger
.
debug
() <<
"Next source "
<<
next
.m_source_addr <<
" not done yet"
;
116
break
;
117
}
118
// If it is, send it downstream
119
logger
.
debug
() <<
"Source "
<<
next
.m_source_addr <<
" sent downstream"
;
120
{
121
ReverseLock
<
decltype
(
output_lock
)>
release_lock
(
output_lock
);
122
sendSource
(
std::move
(
processed
->second));
123
}
124
m_finished_sources
.erase(
processed
);
125
m_received
.pop_front();
126
m_semaphore
.
release
();
127
}
128
129
if
(
m_stop
&&
m_received
.empty()) {
130
break
;
131
}
132
}
133
logger
.
debug
() <<
"Stopping prefetcher output loop"
;
134
}
135
136
void
Prefetcher::receiveProcessSignal
(
const
ProcessSourcesEvent
& message) {
137
{
138
std::lock_guard<std::mutex>
output_lock
(
m_queue_mutex
);
139
m_received
.emplace_back(
EventType::PROCESS_SOURCE
);
140
m_event_queue
.emplace_back(message);
141
}
142
m_new_output
.
notify_one
();
143
logger
.
debug
() <<
"ProcessSourceEvent received"
;
144
}
145
146
void
Prefetcher::wait
() {
147
m_stop
=
true
;
148
m_output_thread
->
join
();
149
}
150
151
void
Prefetcher::synchronize
() {
152
// Wait until the output queue is empty
153
while
(
true
) {
154
{
155
std::unique_lock<std::mutex>
output_lock
(
m_queue_mutex
);
156
if
(
m_received
.empty()) {
157
break
;
158
}
159
else
if
(
m_thread_pool
->
checkForException
(
false
)) {
160
logger
.
fatal
() <<
"An exception was thrown from a worker thread"
;
161
m_thread_pool
->
checkForException
(
true
);
162
}
163
else
if
(
m_thread_pool
->
activeThreads
() == 0) {
164
throw
Elements::Exception
() <<
"No active threads and the queue is not empty! Please, report this as a bug"
;
165
}
166
}
167
std::this_thread::sleep_for
(
std::chrono::milliseconds
(100));
168
}
169
}
170
171
}
// end of namespace SourceXtractor
logger
static Elements::Logging logger
Definition
Prefetcher.cpp:22
Prefetcher.h
Elements::Exception
Elements::Logging
Elements::Logging::debug
void debug(const std::string &logMessage)
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
Elements::Logging::fatal
void fatal(const std::string &logMessage)
Euclid::Semaphore::acquire
void acquire()
Euclid::Semaphore::release
void release()
Euclid::ThreadPool::submit
void submit(Task task)
Euclid::ThreadPool::checkForException
bool checkForException(bool rethrow=false)
Euclid::ThreadPool::activeThreads
size_t activeThreads() const
SourceXtractor::PipelineEmitter< SourceInterface >::sendProcessSignal
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition
PipelineStage.h:92
SourceXtractor::PipelineEmitter< SourceInterface >::sendSource
void sendSource(std::unique_ptr< SourceInterface > source) const
Definition
PipelineStage.h:85
SourceXtractor::Prefetcher::requestProperty
void requestProperty(const PropertyId &property_id)
Definition
Prefetcher.cpp:80
SourceXtractor::Prefetcher::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition
Prefetcher.cpp:136
SourceXtractor::Prefetcher::m_event_queue
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition
Prefetcher.h:119
SourceXtractor::Prefetcher::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition
Prefetcher.h:109
SourceXtractor::Prefetcher::m_queue_mutex
std::mutex m_queue_mutex
Definition
Prefetcher.h:123
SourceXtractor::Prefetcher::outputLoop
void outputLoop()
Definition
Prefetcher.cpp:85
SourceXtractor::Prefetcher::Prefetcher
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition
Prefetcher.cpp:44
SourceXtractor::Prefetcher::m_new_output
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition
Prefetcher.h:115
SourceXtractor::Prefetcher::m_received
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order.
Definition
Prefetcher.h:121
SourceXtractor::Prefetcher::wait
void wait()
Definition
Prefetcher.cpp:146
SourceXtractor::Prefetcher::m_semaphore
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition
Prefetcher.h:129
SourceXtractor::Prefetcher::m_prefetch_set
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition
Prefetcher.h:111
SourceXtractor::Prefetcher::m_finished_sources
std::map< intptr_t, std::unique_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition
Prefetcher.h:117
SourceXtractor::Prefetcher::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition
Prefetcher.h:113
SourceXtractor::Prefetcher::receiveSource
void receiveSource(std::unique_ptr< SourceInterface > source) override
Definition
Prefetcher.cpp:54
SourceXtractor::Prefetcher::synchronize
void synchronize()
Definition
Prefetcher.cpp:151
SourceXtractor::Prefetcher::~Prefetcher
virtual ~Prefetcher()
Definition
Prefetcher.cpp:49
SourceXtractor::Prefetcher::m_stop
std::atomic_bool m_stop
Termination condition for the output loop.
Definition
Prefetcher.h:126
SourceXtractor::PropertyId
Identifier used to set and retrieve properties.
Definition
PropertyId.h:40
std::chrono::milliseconds
std::function
std::intptr_t
std::thread::join
T join(T... args)
std::thread::joinable
T joinable(T... args)
std::lock
T lock(T... args)
std::move
T move(T... args)
Euclid::Configuration::logger
static Elements::Logging logger
SourceXtractor
Definition
Aperture.h:30
std::next
T next(T... args)
std::condition_variable::notify_one
T notify_one(T... args)
std::this_thread::sleep_for
T sleep_for(T... args)
SourceXtractor::Prefetcher::EventType::PROCESS_SOURCE
@ PROCESS_SOURCE
Definition
Prefetcher.h:100
SourceXtractor::Prefetcher::EventType::SOURCE
@ SOURCE
Definition
Prefetcher.h:100
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored.
Definition
PipelineStage.h:33
SourceXtractor::ReverseLock
Definition
Prefetcher.cpp:31
SourceXtractor::ReverseLock::~ReverseLock
~ReverseLock()
Definition
Prefetcher.cpp:36
SourceXtractor::ReverseLock::m_lock
Lock & m_lock
Definition
Prefetcher.cpp:41
SourceXtractor::ReverseLock::ReverseLock
ReverseLock(Lock &lock)
Definition
Prefetcher.cpp:32
std::condition_variable::wait_for
T wait_for(T... args)
Generated by
1.10.0