Leviathan  0.8.0.0
Leviathan game engine
Leviathan::ThreadingManager Class Reference

Manages delayed execution of functions through use of QueuedTask and subclasses. More...

#include <ThreadingManager.h>

+ Inheritance diagram for Leviathan::ThreadingManager:

Public Member Functions

DLLEXPORT ThreadingManager (int basethreadspercore=DEFAULT_THREADS_PER_CORE)
 
virtual DLLEXPORT ~ThreadingManager ()
 
virtual DLLEXPORT bool Init ()
 Sets up the work queue. More...
 
virtual DLLEXPORT bool CheckInit ()
 Checks has Init worked. More...
 
virtual DLLEXPORT void Release ()
 
DLLEXPORT void QueueTask (std::shared_ptr< QueuedTask > task)
 Adds a task to the queue. More...
 
DLLEXPORT bool RemoveFromQueue (std::shared_ptr< QueuedTask > task)
 Removes a task from the queue. More...
 
DLLEXPORT void RemoveTasksFromQueue (std::vector< std::shared_ptr< QueuedTask >> &tasklist)
 Removes specific tasks from the queue. More...
 
DLLEXPORT FORCE_INLINE void QueueTask (QueuedTask *newdtask)
 Adds a task to the queue. More...
 
DLLEXPORT void FlushActiveThreads ()
 This function waits for all tasks to complete. More...
 
DLLEXPORT void WaitForAllTasksToFinish ()
 Blocks until all queued tasks are finished. More...
 
DLLEXPORT void WaitForWorkersToEmpty (Lock &guard)
 Blocks until all threads are empty. More...
 
DLLEXPORT void NotifyQueuerThread ()
 Notifies the queuer thread to check task setting. More...
 
DLLEXPORT void SetDisallowRepeatingTasks (bool disallow)
 Disallows repeating tasks to occur again. More...
 
DLLEXPORT void SetDiscardConditionalTasks (bool discard)
 Sets the task queuer to discard all conditional tasks. More...
 
DLLEXPORT void NotifyTaskFinished (std::shared_ptr< QueuedTask > task)
 Called by work threads when they are done. More...
 
DLLEXPORT void MakeThreadsWorkWithOgre ()
 Makes the threads work with Ogre. More...
 
DLLEXPORT void UnregisterGraphics ()
 Must be called if MakeThreadsWorkWithOgre has been called, BEFORE releasing graphics. More...
 
- Public Member Functions inherited from Leviathan::ThreadSafeGeneric< MutexType >
DLLEXPORT ThreadSafeGeneric ()
 
DLLEXPORT ~ThreadSafeGeneric ()
 
FORCE_INLINE void VerifyLock (RecursiveLock &guard) const
 
FORCE_INLINE void VerifyLock (Lock &lockit) const
 

Static Public Member Functions

static DLLEXPORT ThreadingManagerGet ()
 

Protected Attributes

bool AllowStartTasksFromQueue
 
bool StopProcessing
 
int WantedThreadCount
 
int TaksMustBeRanBeforeState
 
bool AllowRepeats
 Can tasks be repeated. More...
 
bool AllowConditionalWait
 
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
 List of the tasks queued by the application. More...
 
std::condition_variable_any TaskQueueNotify
 
std::vector< std::shared_ptr< TaskThread > > UsableThreads
 
std::thread WorkQueueHandler
 Thread used to set tasks to threads. More...
 
- Protected Attributes inherited from Leviathan::ThreadSafeGeneric< MutexType >
MutexType ObjectsLock
 

Static Protected Attributes

static ThreadingManagerstaticaccess = NULL
 

Friends

void RunTaskQueuerThread (ThreadingManager *manager)
 

Additional Inherited Members

- Public Types inherited from Leviathan::ThreadSafeGeneric< MutexType >
using LockT = typename LockTypeResolver< MutexType >::LType
 

Detailed Description

Manages delayed execution of functions through use of QueuedTask and subclasses.

Definition at line 39 of file ThreadingManager.h.

Constructor & Destructor Documentation

◆ ThreadingManager()

DLLEXPORT Leviathan::ThreadingManager::ThreadingManager ( int  basethreadspercore = DEFAULT_THREADS_PER_CORE)

Definition at line 20 of file ThreadingManager.cpp.

21  :
25 {
26  WantedThreadCount = std::thread::hardware_concurrency() * basethreadspercore;
27 
28  staticaccess = this;
29 }
bool AllowRepeats
Can tasks be repeated.
static ThreadingManager * staticaccess
#define TASK_MUSTBERAN_BEFORE_EXIT
Default value to pass for ignoring this setting //.
Definition: QueuedTask.h:9

◆ ~ThreadingManager()

DLLEXPORT Leviathan::ThreadingManager::~ThreadingManager ( )
virtual

Definition at line 31 of file ThreadingManager.cpp.

32 {
33 
34  // Joins all threads before quitting //
35  UsableThreads.clear();
36  staticaccess = nullptr;
37 }
std::vector< std::shared_ptr< TaskThread > > UsableThreads
static ThreadingManager * staticaccess

Member Function Documentation

◆ CheckInit()

DLLEXPORT bool Leviathan::ThreadingManager::CheckInit ( )
virtual

Checks has Init worked.

Definition at line 66 of file ThreadingManager.cpp.

67 {
68  // Check are there running threads //
69  GUARD_LOCK();
70 
71 
72  bool started = false;
73  int loopcount = 0;
74 
75  std::this_thread::yield();
76 
77  // This might need to be repeated for a while //
78  while(!started) {
79 
80  // Check that at least one thread is running //
81  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
82  // Check is this thread running //
83  if((*iter)->HasStarted()) {
84  // Something has started //
85  started = true;
86 
87  // Set the thread names //
88  int threadnumber = 0;
89  for(auto iter2 = UsableThreads.begin(); iter2 != UsableThreads.end();
90  ++iter2) {
91 
92  // Set the name //
94  (*iter2).get(), "Lev_Task_" + Convert::ToString(threadnumber));
95 
96  threadnumber++;
97  }
98 
99  return true;
100  }
101  }
102 
103  if(loopcount > 100) {
104  // Sleep a bit
105  std::this_thread::sleep_for(std::chrono::milliseconds(1));
106  }
107 
108  if(++loopcount > 1000) {
109 #ifndef LEVIATHAN_UE_PLUGIN
110  Logger::Get()->Error(
111  "ThreadingManager: CheckInit: no threads have started, after 1000 loops");
112 #endif // LEVIATHAN_UE_PLUGIN
113 
114  // No threads running //
115  return false;
116  }
117 
118  std::this_thread::yield();
119  }
120 
121  LEVIATHAN_ASSERT(0, "Shouldn't get out of that loop");
122  return false;
123 }
void SetThreadName(TaskThread *thread, const std::string &name)
std::vector< std::shared_ptr< TaskThread > > UsableThreads
#define LEVIATHAN_ASSERT(x, msg)
Definition: Define.h:102
static std::string ToString(const T &val)
Definition: Convert.h:72
static DLLEXPORT Logger * Get()
Definition: Logger.cpp:106
DLLEXPORT void Error(const std::string &data) override
Definition: Logger.cpp:177
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ FlushActiveThreads()

DLLEXPORT void Leviathan::ThreadingManager::FlushActiveThreads ( )

This function waits for all tasks to complete.

Definition at line 219 of file ThreadingManager.cpp.

220 {
221  // Disallow new tasks //
222  GUARD_LOCK_NAME(lockit);
223  AllowStartTasksFromQueue = false;
224 
225 
226  bool allavailable = false;
227 
228  // We want to skip wait on loop //
229  goto skipfirstwaitforthreadslabel;
230 
231  while(!allavailable) {
232 
233  // Wait for tasks to update //
234  TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(10));
235 
236  skipfirstwaitforthreadslabel:
237 
238 
239  // Set to true until a thread is busy //
240  allavailable = true;
241 
242  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
243  if((*iter)->HasRunningTask()) {
244  allavailable = false;
245  break;
246  }
247  }
248  }
249 
250  // Now free //
251 }
std::condition_variable_any TaskQueueNotify
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
std::vector< std::shared_ptr< TaskThread > > UsableThreads

◆ Get()

DLLEXPORT ThreadingManager * Leviathan::ThreadingManager::Get ( )
static

Definition at line 39 of file ThreadingManager.cpp.

40 {
41  return staticaccess;
42 }
static ThreadingManager * staticaccess

◆ Init()

DLLEXPORT bool Leviathan::ThreadingManager::Init ( )
virtual

Sets up the work queue.

Definition at line 46 of file ThreadingManager.cpp.

47 {
48 
49  GUARD_LOCK();
50 
51  // Start the queuer //
52  WorkQueueHandler = std::thread(RunTaskQueuerThread, this);
53 
54 
55  // Start appropriate amount of threads //
56 
57  for(int i = 0; i < WantedThreadCount; i++) {
58 
59 
60  UsableThreads.push_back(shared_ptr<TaskThread>(new TaskThread()));
61  }
62 
63  return true;
64 }
Object used by ThreadingManager to easily create properly initialized threads.
Definition: TaskThread.h:22
std::vector< std::shared_ptr< TaskThread > > UsableThreads
friend void RunTaskQueuerThread(ThreadingManager *manager)
std::thread WorkQueueHandler
Thread used to set tasks to threads.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ MakeThreadsWorkWithOgre()

DLLEXPORT void Leviathan::ThreadingManager::MakeThreadsWorkWithOgre ( )

Makes the threads work with Ogre.

Definition at line 324 of file ThreadingManager.cpp.

325 {
326 
328 
329  // Disallow new tasks //
330  {
331  GUARD_LOCK();
332  AllowStartTasksFromQueue = false;
333  }
334 
335  // Set our main thread's name //
336  // SetThreadNameImpl(-1, "LeviathanMain");
337 
338  // Wait for tasks to finish //
340 
341  // All threads are now available //
342 
343  // TODO: if BSF thread setup is needed it should be here
344 
345  // Wait for threads to finish //
347 
348  // End registering functions //
349 
350  // Allow new tasks to run //
351  {
352  GUARD_LOCK();
354  }
355 }
DLLEXPORT void FlushActiveThreads()
This function waits for all tasks to complete.
#define QUICKTIME_THISSCOPE
Definition: TimingMonitor.h:20
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ NotifyQueuerThread()

DLLEXPORT void Leviathan::ThreadingManager::NotifyQueuerThread ( )

Notifies the queuer thread to check task setting.

Definition at line 393 of file ThreadingManager.cpp.

394 {
395 
396  TaskQueueNotify.notify_all();
397 }
std::condition_variable_any TaskQueueNotify

◆ NotifyTaskFinished()

DLLEXPORT void Leviathan::ThreadingManager::NotifyTaskFinished ( std::shared_ptr< QueuedTask task)

Called by work threads when they are done.

Definition at line 307 of file ThreadingManager.cpp.

308 {
309  // We need locking for re-adding it //
310  if(task->IsRepeating()) {
311  // Add back to queue //
312  GUARD_LOCK();
313 
314  // Or not if we should be quitting soon //
315  if(AllowRepeats)
316  WaitingTasks.push_back(task);
317  }
318 
319 
320  // We probably don't need to acquire a lock for this //
321  TaskQueueNotify.notify_all();
322 }
std::condition_variable_any TaskQueueNotify
virtual DLLEXPORT bool IsRepeating()
Function called by ThreadingManager AFTER running the task //.
Definition: QueuedTask.cpp:36
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
bool AllowRepeats
Can tasks be repeated.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ QueueTask() [1/2]

DLLEXPORT void Leviathan::ThreadingManager::QueueTask ( std::shared_ptr< QueuedTask task)

Adds a task to the queue.

Definition at line 152 of file ThreadingManager.cpp.

153 {
154  {
155  GUARD_LOCK();
156 
157  WaitingTasks.push_back(task);
158  }
159  // Notify the thread //
160  TaskQueueNotify.notify_all();
161 }
std::condition_variable_any TaskQueueNotify
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ QueueTask() [2/2]

DLLEXPORT FORCE_INLINE void Leviathan::ThreadingManager::QueueTask ( QueuedTask newdtask)
inline

Adds a task to the queue.

Parameters
newdtaskThe task to queue, the pointer will be deleted by this

Definition at line 73 of file ThreadingManager.h.

74  {
75  QueueTask(std::shared_ptr<QueuedTask>(newdtask));
76  }
DLLEXPORT void QueueTask(std::shared_ptr< QueuedTask > task)
Adds a task to the queue.

◆ Release()

DLLEXPORT void Leviathan::ThreadingManager::Release ( )
virtual

This will take a long time, since it will wait until all tasks are done

Todo:
Do something about unfinished tasks here

Definition at line 127 of file ThreadingManager.cpp.

128 {
129  // Disallow new tasks //
130  {
131  GUARD_LOCK();
132  AllowStartTasksFromQueue = false;
133  }
134 
135  // Wait for all to finish //
136  // WaitForAllTasksToFinish();
137 
138  {
139  GUARD_LOCK();
140  StopProcessing = true;
141  }
142  // Wait for the queuer to exit //
143  TaskQueueNotify.notify_all();
144  WorkQueueHandler.join();
145 
146  // Tell all threads to quit //
147  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
148  (*iter)->NotifyKill();
149  }
150 }
std::condition_variable_any TaskQueueNotify
std::vector< std::shared_ptr< TaskThread > > UsableThreads
std::thread WorkQueueHandler
Thread used to set tasks to threads.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ RemoveFromQueue()

DLLEXPORT bool Leviathan::ThreadingManager::RemoveFromQueue ( std::shared_ptr< QueuedTask task)

Removes a task from the queue.

Precondition
The task is added with QueueTask
Note
If the task is currectly being executed current thread spinlocsk untill it is done

Definition at line 163 of file ThreadingManager.cpp.

164 {
165  // Best case scenario is finding it in the wait queue //
166  {
167  GUARD_LOCK();
168 
169  auto end = WaitingTasks.end();
170  for(auto iter = WaitingTasks.begin(); iter != end; ++iter) {
171 
172  if((*iter).get() == task.get()) {
173 
174  WaitingTasks.erase(iter);
175  return true;
176  }
177  }
178  }
179 
180  // The worse case is it having finished already //
181  // And the worst case is it being currently executed //
182  bool wasrunning = false;
183  bool isrunning = false;
184 
185  do {
186 
187  isrunning = false;
188 
189  GUARD_LOCK();
190 
191  auto end = UsableThreads.end();
192  for(auto iter = UsableThreads.begin(); iter != end; ++iter) {
193 
194  if((*iter)->IsRunningTask(task)) {
195 
196  isrunning = true;
197  wasrunning = true;
198  break;
199  }
200  }
201 
202  } while(isrunning);
203 
204  return wasrunning;
205 }
std::vector< std::shared_ptr< TaskThread > > UsableThreads
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ RemoveTasksFromQueue()

DLLEXPORT void Leviathan::ThreadingManager::RemoveTasksFromQueue ( std::vector< std::shared_ptr< QueuedTask >> &  tasklist)

Removes specific tasks from the queue.

Precondition
The tasks are added with QueueTask and tasklist has the list of tasks to remove
Postcondition
The tasklist is empty and the tasks won't be ran
Note
If the task is currectly being executed current thread spinlocsk untill it is done

Definition at line 207 of file ThreadingManager.cpp.

209 {
210  // Just go one by one and remove them all //
211  for(size_t i = 0; i < tasklist.size(); i++) {
212 
213  RemoveFromQueue(tasklist[i]);
214  }
215 
216  tasklist.clear();
217 }
DLLEXPORT bool RemoveFromQueue(std::shared_ptr< QueuedTask > task)
Removes a task from the queue.

◆ SetDisallowRepeatingTasks()

DLLEXPORT void Leviathan::ThreadingManager::SetDisallowRepeatingTasks ( bool  disallow)

Disallows repeating tasks to occur again.

Note
This should only be called by the Engine class just before quitting

Definition at line 399 of file ThreadingManager.cpp.

400 {
401  AllowRepeats = disallow;
402 }
bool AllowRepeats
Can tasks be repeated.

◆ SetDiscardConditionalTasks()

DLLEXPORT void Leviathan::ThreadingManager::SetDiscardConditionalTasks ( bool  discard)

Sets the task queuer to discard all conditional tasks.

Note
This should only be called by the Engine

Definition at line 404 of file ThreadingManager.cpp.

405 {
406  AllowConditionalWait = !discard;
407 }

◆ UnregisterGraphics()

DLLEXPORT void Leviathan::ThreadingManager::UnregisterGraphics ( )

Must be called if MakeThreadsWorkWithOgre has been called, BEFORE releasing graphics.

Definition at line 357 of file ThreadingManager.cpp.

358 {
359  // Wait for threads to finish //
361 
362  {
363  GUARD_LOCK_NAME(lockit);
364 
365  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
366 
367  //(*iter)->SetTaskAndNotify(
368  // std::make_shared<QueuedTask>(std::bind(UnregisterOgreOnThread)));
369  // Wait for it to end //
370  // #ifdef __GNUC__
371  // while((*iter)->HasRunningTask()){
372  // try{
373  // TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(50));
374  // }
375  // catch(...){
376  // LOG_WARNING("ThreadingManager: MakeThreadsWorkWithOgre: "
377  // "linux fix wait interrupted");
378  // }
379  // }
380  // #endif
381  }
382  }
383 
385 
386  // Allow new tasks to run //
387  {
388  GUARD_LOCK();
390  }
391 }
DLLEXPORT void FlushActiveThreads()
This function waits for all tasks to complete.
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
std::vector< std::shared_ptr< TaskThread > > UsableThreads
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ WaitForAllTasksToFinish()

DLLEXPORT void Leviathan::ThreadingManager::WaitForAllTasksToFinish ( )

Blocks until all queued tasks are finished.

Warning
This function will ignore MustBeRanBefore return value by passing TASK_MUSTBERAN_BEFORE_EXIT
Bug:
This doesn't properly handle tasks that are repeating

Definition at line 253 of file ThreadingManager.cpp.

254 {
255  // Use this lock the entire function //
256  GUARD_LOCK();
257 
258  // See if empty right now and loop until it is //
259  while(true) {
260 
261  WaitForWorkersToEmpty(guard);
262 
263  // Unlock us for a second
264  guard.unlock();
265  // Make the queuer run //
266  TaskQueueNotify.notify_all();
267  guard.lock();
268 
269  WaitForWorkersToEmpty(guard);
270  WaitForWorkersToEmpty(guard);
271 
272  if(WaitingTasks.empty())
273  return;
274  }
275 
276  WaitForWorkersToEmpty(guard);
277 }
DLLEXPORT void WaitForWorkersToEmpty(Lock &guard)
Blocks until all threads are empty.
std::condition_variable_any TaskQueueNotify
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ WaitForWorkersToEmpty()

DLLEXPORT void ThreadingManager::WaitForWorkersToEmpty ( Lock guard)

Blocks until all threads are empty.

Definition at line 279 of file ThreadingManager.cpp.

280 {
281 
282  // Wait for threads to empty up //
283  bool allavailable = false;
284 
285  // We want to skip wait on loop //
286  goto skipfirstwaitforthreadslabel2;
287 
288  while(!allavailable) {
289 
290  // Wait for tasks to update //
291  TaskQueueNotify.wait_for(guard, std::chrono::milliseconds(1));
292 
293  skipfirstwaitforthreadslabel2:
294 
295  // Set to true until a thread is busy //
296  allavailable = true;
297 
298  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
299  if((*iter)->HasRunningTask()) {
300  allavailable = false;
301  break;
302  }
303  }
304  }
305 }
std::condition_variable_any TaskQueueNotify
std::vector< std::shared_ptr< TaskThread > > UsableThreads

Friends And Related Function Documentation

◆ RunTaskQueuerThread

void RunTaskQueuerThread ( ThreadingManager manager)
friend
Todo:
Improve performance

Member Data Documentation

◆ AllowConditionalWait

bool Leviathan::ThreadingManager::AllowConditionalWait
protected

Controls whether tasks can be conditional. Setting this to false will remove all tasks that cannot be ran instantly

Definition at line 131 of file ThreadingManager.h.

◆ AllowRepeats

bool Leviathan::ThreadingManager::AllowRepeats
protected

Can tasks be repeated.

Definition at line 127 of file ThreadingManager.h.

◆ AllowStartTasksFromQueue

bool Leviathan::ThreadingManager::AllowStartTasksFromQueue
protected

Definition at line 117 of file ThreadingManager.h.

◆ staticaccess

ThreadingManager * Leviathan::ThreadingManager::staticaccess = NULL
staticprotected

Definition at line 142 of file ThreadingManager.h.

◆ StopProcessing

bool Leviathan::ThreadingManager::StopProcessing
protected

Definition at line 118 of file ThreadingManager.h.

◆ TaksMustBeRanBeforeState

int Leviathan::ThreadingManager::TaksMustBeRanBeforeState
protected

Used to allow QueuedTask::MustBeRanBefore function to work, shared between staticaccess worker thread and the main object

Definition at line 124 of file ThreadingManager.h.

◆ TaskQueueNotify

std::condition_variable_any Leviathan::ThreadingManager::TaskQueueNotify
protected

Definition at line 136 of file ThreadingManager.h.

◆ UsableThreads

std::vector<std::shared_ptr<TaskThread> > Leviathan::ThreadingManager::UsableThreads
protected

Definition at line 137 of file ThreadingManager.h.

◆ WaitingTasks

std::list<std::shared_ptr<QueuedTask> > Leviathan::ThreadingManager::WaitingTasks
protected

List of the tasks queued by the application.

Definition at line 135 of file ThreadingManager.h.

◆ WantedThreadCount

int Leviathan::ThreadingManager::WantedThreadCount
protected

Definition at line 120 of file ThreadingManager.h.

◆ WorkQueueHandler

std::thread Leviathan::ThreadingManager::WorkQueueHandler
protected

Thread used to set tasks to threads.

Definition at line 140 of file ThreadingManager.h.


The documentation for this class was generated from the following files: