SourceXtractorPlusPlus 0.21
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
SourceXtractor::Prefetcher Class Reference

#include <Prefetcher.h>

Inheritance diagram for SourceXtractor::Prefetcher:
Inheritance graph
[legend]
Collaboration diagram for SourceXtractor::Prefetcher:
Collaboration graph
[legend]

Classes

struct  EventType
 

Public Member Functions

 Prefetcher (const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
 
virtual ~Prefetcher ()
 
void receiveSource (std::unique_ptr< SourceInterface > source) override
 
void receiveProcessSignal (const ProcessSourcesEvent &event) override
 
template<typename Container >
void requestProperties (const Container &properties)
 
void wait ()
 
void synchronize ()
 
- Public Member Functions inherited from SourceXtractor::PipelineReceiver< SourceInterface >
virtual ~PipelineReceiver ()=default
 
- Public Member Functions inherited from SourceXtractor::PipelineEmitter< SourceInterface >
 ~PipelineEmitter () override=default
 
void setNextStage (std::shared_ptr< PipelineReceiver< SourceInterface > > next)
 
- Public Member Functions inherited from SourceXtractor::Observable< T >
virtual ~Observable ()=default
 Destructor.
 
virtual void addObserver (std::shared_ptr< Observer< T > > observer)
 Adds an Observer that will be notified when notify Observers is called.
 
virtual void removeObserver (std::shared_ptr< Observer< T > > observer)
 Removes a previously added Observer from the list of Observers to notify.
 

Private Member Functions

void requestProperty (const PropertyId &property_id)
 
void outputLoop ()
 

Private Attributes

std::shared_ptr< Euclid::ThreadPoolm_thread_pool
 Pointer to the pool of worker threads.
 
std::set< PropertyIdm_prefetch_set
 Properties to prefetch.
 
std::unique_ptr< std::threadm_output_thread
 Orchestration thread.
 
std::condition_variable m_new_output
 Notifies there is a new source done processing.
 
std::map< intptr_t, std::unique_ptr< SourceInterface > > m_finished_sources
 Finished sources.
 
std::deque< ProcessSourcesEventm_event_queue
 Queue of received ProcessSourceEvent, order preserved.
 
std::deque< EventTypem_received
 Queue of type of received events. Used to pass downstream events respecting the received order.
 
std::mutex m_queue_mutex
 
std::atomic_bool m_stop
 Termination condition for the output loop.
 
Euclid::Semaphore m_semaphore
 Keep the queue under control.
 

Additional Inherited Members

- Protected Member Functions inherited from SourceXtractor::PipelineEmitter< SourceInterface >
void sendSource (std::unique_ptr< SourceInterface > source) const
 
void sendProcessSignal (const ProcessSourcesEvent &event) const
 
- Protected Member Functions inherited from SourceXtractor::Observable< T >
void notifyObservers (const T &message) const
 

Detailed Description

The pre-fetcher allows later stages, as the grouping or the cleaning, to ask in advance for some compute intensive properties, so they can be done multi-threaded before it reaches them.

The pre-fetcher must handle also ProcessSourcesEvent, as they are synchronization points. When one is received, only sources detected before the event will be passed along. Everyone else will have to wait until there are no more soures prior to the event being processed. Then, they will be released and sent along.

Definition at line 40 of file Prefetcher.h.

Constructor & Destructor Documentation

◆ Prefetcher()

SourceXtractor::Prefetcher::Prefetcher ( const std::shared_ptr< Euclid::ThreadPool > & thread_pool,
unsigned max_queue_size )

Constructor

Parameters
thread_poolAlexandria thread pool

Definition at line 44 of file Prefetcher.cpp.

References m_output_thread, and outputLoop().

Here is the call graph for this function:

◆ ~Prefetcher()

SourceXtractor::Prefetcher::~Prefetcher ( )
virtual

Destructor

Definition at line 49 of file Prefetcher.cpp.

References std::thread::joinable(), m_output_thread, and wait().

Here is the call graph for this function:

Member Function Documentation

◆ outputLoop()

void SourceXtractor::Prefetcher::outputLoop ( )
private

◆ receiveProcessSignal()

void SourceXtractor::Prefetcher::receiveProcessSignal ( const ProcessSourcesEvent & event)
overridevirtual

Handle ProcessSourcesEvent. All sources received prior to this message need to be processed before sources coming after are passed along.

Parameters
message

Implements SourceXtractor::PipelineReceiver< SourceInterface >.

Definition at line 136 of file Prefetcher.cpp.

References Elements::Logging::debug(), Euclid::Configuration::logger, m_event_queue, m_new_output, m_queue_mutex, m_received, std::condition_variable::notify_one(), and SourceXtractor::Prefetcher::EventType::PROCESS_SOURCE.

Here is the call graph for this function:

◆ receiveSource()

void SourceXtractor::Prefetcher::receiveSource ( std::unique_ptr< SourceInterface > source)
overridevirtual

Trigger multi-threaded measurements on the source interface. Once they are done, the message will be passed along.

Parameters
message

Implements SourceXtractor::PipelineReceiver< SourceInterface >.

Definition at line 54 of file Prefetcher.cpp.

References Euclid::Semaphore::acquire(), std::lock(), m_finished_sources, m_new_output, m_prefetch_set, m_queue_mutex, m_received, m_semaphore, m_thread_pool, std::move(), std::condition_variable::notify_one(), SourceXtractor::Prefetcher::EventType::SOURCE, and Euclid::ThreadPool::submit().

Here is the call graph for this function:

◆ requestProperties()

template<typename Container >
void SourceXtractor::Prefetcher::requestProperties ( const Container & properties)
inline

Tell the prefetcher to compute this property

Template Parameters
ContainerAny iterable container with a set/list of properties
Parameters
propertiesPropertyId instances

Definition at line 77 of file Prefetcher.h.

References requestProperty().

Here is the call graph for this function:

◆ requestProperty()

void SourceXtractor::Prefetcher::requestProperty ( const PropertyId & property_id)
private

Definition at line 80 of file Prefetcher.cpp.

References Elements::Logging::debug(), Euclid::Configuration::logger, and m_prefetch_set.

Referenced by requestProperties().

Here is the call graph for this function:

◆ synchronize()

void SourceXtractor::Prefetcher::synchronize ( )

Wait until the queue is empty but don't stop the thread

Definition at line 151 of file Prefetcher.cpp.

References Euclid::ThreadPool::activeThreads(), Euclid::ThreadPool::checkForException(), Elements::Logging::fatal(), Euclid::Configuration::logger, m_queue_mutex, m_received, m_thread_pool, and std::this_thread::sleep_for().

Here is the call graph for this function:

◆ wait()

void SourceXtractor::Prefetcher::wait ( )

Wait for the multi-threaded computation to finish. This must be done as the segmentation may be completely finished, and the measurement queue empty, but some sources may still be here due to some compute-heavy property

Definition at line 146 of file Prefetcher.cpp.

References std::thread::join(), m_output_thread, and m_stop.

Referenced by ~Prefetcher().

Here is the call graph for this function:

Member Data Documentation

◆ m_event_queue

std::deque<ProcessSourcesEvent> SourceXtractor::Prefetcher::m_event_queue
private

Queue of received ProcessSourceEvent, order preserved.

Definition at line 119 of file Prefetcher.h.

Referenced by outputLoop(), and receiveProcessSignal().

◆ m_finished_sources

std::map<intptr_t, std::unique_ptr<SourceInterface> > SourceXtractor::Prefetcher::m_finished_sources
private

Finished sources.

Definition at line 117 of file Prefetcher.h.

Referenced by outputLoop(), and receiveSource().

◆ m_new_output

std::condition_variable SourceXtractor::Prefetcher::m_new_output
private

Notifies there is a new source done processing.

Definition at line 115 of file Prefetcher.h.

Referenced by outputLoop(), receiveProcessSignal(), and receiveSource().

◆ m_output_thread

std::unique_ptr<std::thread> SourceXtractor::Prefetcher::m_output_thread
private

Orchestration thread.

Definition at line 113 of file Prefetcher.h.

Referenced by Prefetcher(), wait(), and ~Prefetcher().

◆ m_prefetch_set

std::set<PropertyId> SourceXtractor::Prefetcher::m_prefetch_set
private

Properties to prefetch.

Definition at line 111 of file Prefetcher.h.

Referenced by receiveSource(), and requestProperty().

◆ m_queue_mutex

std::mutex SourceXtractor::Prefetcher::m_queue_mutex
private

Definition at line 123 of file Prefetcher.h.

Referenced by outputLoop(), receiveProcessSignal(), receiveSource(), and synchronize().

◆ m_received

std::deque<EventType> SourceXtractor::Prefetcher::m_received
private

Queue of type of received events. Used to pass downstream events respecting the received order.

Definition at line 121 of file Prefetcher.h.

Referenced by outputLoop(), receiveProcessSignal(), receiveSource(), and synchronize().

◆ m_semaphore

Euclid::Semaphore SourceXtractor::Prefetcher::m_semaphore
private

Keep the queue under control.

Definition at line 129 of file Prefetcher.h.

Referenced by outputLoop(), and receiveSource().

◆ m_stop

std::atomic_bool SourceXtractor::Prefetcher::m_stop
private

Termination condition for the output loop.

Definition at line 126 of file Prefetcher.h.

Referenced by outputLoop(), and wait().

◆ m_thread_pool

std::shared_ptr<Euclid::ThreadPool> SourceXtractor::Prefetcher::m_thread_pool
private

Pointer to the pool of worker threads.

Definition at line 109 of file Prefetcher.h.

Referenced by outputLoop(), receiveSource(), and synchronize().


The documentation for this class was generated from the following files: