30template<
typename Lock>
45 : m_thread_pool(thread_pool), m_stop(false), m_semaphore(max_queue_size) {
64 auto lambda = [
this, source_addr, message =
std::move(message)]()
mutable {
66 message->getProperty(prop);
74 auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(
std::move(lambda))](){
86 logger.
debug() <<
"Starting prefetcher output loop";
105 ReverseLock<
decltype(output_lock)> release_lock(output_lock);
115 logger.
debug() <<
"Next source " <<
next.m_source_addr <<
" not done yet";
119 logger.
debug() <<
"Source " <<
next.m_source_addr <<
" sent downstream";
121 ReverseLock<
decltype(output_lock)> release_lock(output_lock);
133 logger.
debug() <<
"Stopping prefetcher output loop";
160 logger.
fatal() <<
"An exception was thrown from a worker thread";
164 throw Elements::Exception() <<
"No active threads and the queue is not empty! Please, report this as a bug";
static Elements::Logging logger
void debug(const std::string &logMessage)
static Logging getLogger(const std::string &name="")
void fatal(const std::string &logMessage)
bool checkForException(bool rethrow=false)
size_t activeThreads() const
static Elements::Logging logger