Pioneer
Loading...
Searching...
No Matches
TaskGraph.h
Go to the documentation of this file.
1// Copyright © 2008-2023 Pioneer Developers. See AUTHORS.txt for details
2// Licensed under the terms of the GPL v3. See licenses/GPL-3.txt
3
4#pragma once
5
6#include <atomic>
7#include <thread>
8#include <vector>
9
10#include "core/Semaphore.h"
11
12struct TaskRange {
13 uint32_t begin;
14 uint32_t end;
15};
16
19
20class JobQueue;
22class TaskGraph;
23
24// CompleteNotifier is a simple atomic helper class to track how many tasks
25// are running vs complete
27public:
29 m_dependants(0) {}
30
31 std::atomic<uint32_t> m_dependants;
32
33 bool IsComplete() { return m_dependants.load(std::memory_order_relaxed) == 0; }
34};
35
36// A Task is the building block of the TaskGraph system.
37// It represents a single unit of work to be done in parallel fashion.
38// Tasks are associated with a numeric range of elements to operate on;
39// if you have a large number of items to process, create multiple tasks
40// responsible for a subrange of the overall workload.
41//
42// Tasks are managed by raw pointers and should not be deleted by
43// user code - the owning TaskSet or TaskGraph will delete the task
44// when it is complete.
45//
46// Subclass and implement these methods:
47// OnExecute: responsible for carrying out the actual work done by the task,
48// runs on a worker thread.
49//
50// OnComplete: used to synchronize reporting of task results, called by the
51// task owner at a synchronization point on the task owner's thread.
52class Task {
53public:
54 Task(TaskRange range = {}) :
55 m_owner(nullptr),
56 m_range(range) {}
57 virtual ~Task() = default;
58
59 // Runs on a worker thread. Do all work in the task here.
60 virtual void OnExecute(TaskRange range) = 0;
61
62 // Run by the task owner on the owning thread when the task has finished.
63 // If the task has no owner, this function will not be called.
64 virtual void OnComplete(){};
65
66 // Sets the task owner (responsible for calling task completion callbacks)
68
69private:
70 friend class TaskGraph;
71 CompleteNotifier *m_owner;
72 TaskRange m_range;
73};
74
75// Helper task define to enable easy creation with lambdas.
76template <typename Function>
77class LambdaTask : public Task {
78public:
79 LambdaTask(TaskRange r, Function &&lambda) :
80 Task(r),
81 m_lambda(lambda) {}
82
83 void OnExecute(TaskRange range) override { m_lambda(range); }
84 void OnComplete() override {}
85
86private:
87 Function m_lambda;
88};
89
90// Represents a group of related tasks and is responsible for handling
91// post-task operations on the main thread.
92class TaskSet : public CompleteNotifier {
93public:
94 // A Handle is the runtime interface to a currently executing TaskSet.
95 // If you do not call the (blocking) TaskGraph->WaitForTaskSet(handle);
96 // you will need to check for the handle to be complete and manually
97 // trigger execution of the task completion callbacks via
98 // TaskGraph->CompleteTaskSet();
99 struct Handle {
100 Handle(const Handle &) = delete;
101 Handle &operator=(const Handle &) = delete;
102 Handle(Handle &&) = default;
103 Handle &operator=(Handle &&) = default;
104 bool IsComplete() { return !m_set || m_set->IsComplete(); }
105
106 private:
107 friend class TaskGraph;
108 Handle(TaskSet *set) :
109 m_set(set) {}
110
111 TaskSet *m_set;
112 };
113
115 m_tasks{},
116 m_executing(false) {}
117
118 // Add an individual task to this TaskSet.
119 // This operation will fail if the task set is currently executing.
120 bool AddTask(Task *task);
121
122 // Adds a lambda task to this TaskSet.
123 template <typename Function>
124 bool AddTaskLambda(TaskRange range, Function &&fn)
125 {
126 return AddTask(new LambdaTask<Function>(range, std::move(fn)));
127 }
128
129 bool IsExecuting() { return m_executing; }
130
131private:
132 friend class TaskGraph;
133 std::vector<Task *> m_tasks;
134 bool m_executing;
135};
136
137// The global task orchestrator.
138// Responsible for managing threads and executing tasks
140public:
141 TaskGraph();
142 ~TaskGraph();
143
144 // Set the total number of worker threads
145 void SetWorkerThreads(uint32_t numThreads);
146 uint32_t GetNumWorkerThreads() const;
147
148 // Queues all tasks in a TaskSet for execution. The TaskGraph now owns
149 // the underlying TaskSet and is responsible for deletion.
151 // Queues a single task without a TaskSet for execution
152 void QueueTask(Task *task);
153
154 // Queues all tasks in a task set to be executed on the main thread only
156 // Queues a single task to be executed on the main thread only
157 void QueueTaskPinned(Task *task);
158
159 // Wait for a queued task set to complete and run task completion callbacks.
160 // This will execute tasks on the calling thread until the given TaskSet
161 // handle has completed all queued tasks.
163
164 // Runs completion callbacks for a TaskSet and destroys the underlying
165 // TaskSet object when completed.
166 // Returns false if the task set was not completed.
168
169 // Run currently available tasks pinned to the main thread
170 void RunPinnedTasks();
171
172 // Return the JobQueue interface object for this task graph.
174
175 static uint32_t GetThreadNum();
176
177private:
179 struct ThreadData {
180 std::thread *threadHandle;
181 uint32_t threadNum;
182 TaskGraph *graph;
183 bool isJobThread;
184
185 void RunThread();
186 void WaitForTasks();
187 };
188
189 static thread_local ThreadData *tl_threadData;
190
191 bool TryRunTask(ThreadData *thread, bool allowJobs = true);
192 void ExecTask(Task *task);
193 bool HasTasks(ThreadData *thread);
194 static ThreadData *GetThreadData();
195
196 void WakeForNewTasks();
197 void WakeForFinishedTasks();
198
199 void WaitForFinishedTask();
200
201 std::vector<ThreadData *> m_threads;
202
203 // queue for short-lived high-priority tasks
204 AsyncTaskQueueImpl *m_taskQueue;
205 // queue of tasks to run on the main thread
206 AsyncTaskQueueImpl *m_pinnedTasks;
207
208 // implementation of the JobQueue interface for backwards compat
209 // with the old JobQueue system
210 TaskGraphJobQueueImpl *m_jobHandlerImpl;
211
212 // queue for long-lived low-priority background jobs
213 AsyncJobQueueImpl *m_jobQueue;
214 AsyncJobQueueImpl *m_jobFinishedQueue;
215
216 std::atomic<bool> m_isRunning;
217 std::atomic<uint32_t> m_numAliveThreads;
218
219 Semaphore m_newTasksSemaphore;
220 Semaphore m_finishedTasksSemaphore;
221};
Definition TaskGraph.cpp:21
Definition TaskGraph.cpp:20
Definition TaskGraph.h:26
std::atomic< uint32_t > m_dependants
Definition TaskGraph.h:31
bool IsComplete()
Definition TaskGraph.h:33
CompleteNotifier()
Definition TaskGraph.h:28
Definition JobQueue.h:108
Definition TaskGraph.h:77
LambdaTask(TaskRange r, Function &&lambda)
Definition TaskGraph.h:79
void OnComplete() override
Definition TaskGraph.h:84
void OnExecute(TaskRange range) override
Definition TaskGraph.h:83
Definition Semaphore.h:16
Definition TaskGraph.cpp:26
Definition TaskGraph.h:139
TaskSet::Handle QueueTaskSetPinned(TaskSet *set)
Definition TaskGraph.cpp:220
TaskGraph()
Definition TaskGraph.cpp:100
JobQueue * GetJobQueue()
Definition TaskGraph.cpp:315
static uint32_t GetThreadNum()
Definition TaskGraph.cpp:320
void QueueTaskPinned(Task *task)
Definition TaskGraph.cpp:244
void QueueTask(Task *task)
Definition TaskGraph.cpp:212
bool CompleteTaskSet(TaskSet::Handle &set)
Definition TaskGraph.cpp:281
TaskSet::Handle QueueTaskSet(TaskSet *set)
Definition TaskGraph.cpp:200
void RunPinnedTasks()
Definition TaskGraph.cpp:299
uint32_t GetNumWorkerThreads() const
Definition TaskGraph.cpp:168
void WaitForTaskSet(TaskSet::Handle &set)
Definition TaskGraph.cpp:252
~TaskGraph()
Definition TaskGraph.cpp:114
void SetWorkerThreads(uint32_t numThreads)
Definition TaskGraph.cpp:174
Definition TaskGraph.h:92
bool AddTask(Task *task)
Definition TaskGraph.cpp:88
bool AddTaskLambda(TaskRange range, Function &&fn)
Definition TaskGraph.h:124
TaskSet()
Definition TaskGraph.h:114
bool IsExecuting()
Definition TaskGraph.h:129
Definition TaskGraph.h:52
virtual void OnComplete()
Definition TaskGraph.h:64
void SetOwner(CompleteNotifier *)
Definition TaskGraph.cpp:78
virtual ~Task()=default
virtual void OnExecute(TaskRange range)=0
Task(TaskRange range={})
Definition TaskGraph.h:54
Definition TaskGraph.h:12
uint32_t begin
Definition TaskGraph.h:13
uint32_t end
Definition TaskGraph.h:14
Definition TaskGraph.h:99
Handle & operator=(Handle &&)=default
Handle & operator=(const Handle &)=delete
Handle(Handle &&)=default
Handle(const Handle &)=delete
bool IsComplete()
Definition TaskGraph.h:104