Leviathan  0.8.0.0
Leviathan game engine
Leviathan::ThreadingManager Class Reference

Manages delayed execution of functions through use of QueuedTask and subclasses. More...

#include <ThreadingManager.h>

+ Inheritance diagram for Leviathan::ThreadingManager:

Public Member Functions

DLLEXPORT ThreadingManager (int basethreadspercore=DEFAULT_THREADS_PER_CORE)
 
virtual DLLEXPORT ~ThreadingManager ()
 
virtual DLLEXPORT bool Init ()
 Sets up the work queue. More...
 
virtual DLLEXPORT bool CheckInit ()
 Checks has Init worked. More...
 
virtual DLLEXPORT void Release ()
 
DLLEXPORT void QueueTask (std::shared_ptr< QueuedTask > task)
 Adds a task to the queue. More...
 
DLLEXPORT bool RemoveFromQueue (std::shared_ptr< QueuedTask > task)
 Removes a task from the queue. More...
 
DLLEXPORT void RemoveTasksFromQueue (std::vector< std::shared_ptr< QueuedTask >> &tasklist)
 Removes specific tasks from the queue. More...
 
DLLEXPORT FORCE_INLINE void QueueTask (QueuedTask *newdtask)
 Adds a task to the queue. More...
 
DLLEXPORT void FlushActiveThreads ()
 This function waits for all tasks to complete. More...
 
DLLEXPORT void WaitForAllTasksToFinish ()
 Blocks until all queued tasks are finished. More...
 
DLLEXPORT void WaitForWorkersToEmpty (Lock &guard)
 Blocks until all threads are empty. More...
 
DLLEXPORT void NotifyQueuerThread ()
 Notifies the queuer thread to check task setting. More...
 
DLLEXPORT void SetDisallowRepeatingTasks (bool disallow)
 Disallows repeating tasks to occur again. More...
 
DLLEXPORT void SetDiscardConditionalTasks (bool discard)
 Sets the task queuer to discard all conditional tasks. More...
 
DLLEXPORT void NotifyTaskFinished (std::shared_ptr< QueuedTask > task)
 Called by work threads when they are done. More...
 
DLLEXPORT void MakeThreadsWorkWithOgre ()
 Makes the threads work with Ogre. More...
 
DLLEXPORT void UnregisterGraphics ()
 Must be called if MakeThreadsWorkWithOgre has been called, BEFORE releasing graphics. More...
 
- Public Member Functions inherited from Leviathan::ThreadSafeGeneric< MutexType >
DLLEXPORT ThreadSafeGeneric ()
 
DLLEXPORT ~ThreadSafeGeneric ()
 
FORCE_INLINE void VerifyLock (RecursiveLock &guard) const
 
FORCE_INLINE void VerifyLock (Lock &lockit) const
 

Static Public Member Functions

static DLLEXPORT ThreadingManagerGet ()
 

Protected Attributes

bool AllowStartTasksFromQueue
 
bool StopProcessing
 
int WantedThreadCount
 
int TaksMustBeRanBeforeState
 
bool AllowRepeats
 Can tasks be repeated. More...
 
bool AllowConditionalWait
 
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
 List of the tasks queued by the application. More...
 
std::condition_variable_any TaskQueueNotify
 
std::vector< std::shared_ptr< TaskThread > > UsableThreads
 
std::thread WorkQueueHandler
 Thread used to set tasks to threads. More...
 
- Protected Attributes inherited from Leviathan::ThreadSafeGeneric< MutexType >
MutexType ObjectsLock
 

Static Protected Attributes

static ThreadingManagerstaticaccess = NULL
 

Friends

void RunTaskQueuerThread (ThreadingManager *manager)
 

Additional Inherited Members

- Public Types inherited from Leviathan::ThreadSafeGeneric< MutexType >
using LockT = typename LockTypeResolver< MutexType >::LType
 

Detailed Description

Manages delayed execution of functions through use of QueuedTask and subclasses.

Definition at line 39 of file ThreadingManager.h.

Constructor & Destructor Documentation

◆ ThreadingManager()

DLLEXPORT Leviathan::ThreadingManager::ThreadingManager ( int  basethreadspercore = DEFAULT_THREADS_PER_CORE)

Definition at line 37 of file ThreadingManager.cpp.

38  :
42 {
43  WantedThreadCount = std::thread::hardware_concurrency() * basethreadspercore;
44 
45  staticaccess = this;
46 }
bool AllowRepeats
Can tasks be repeated.
static ThreadingManager * staticaccess
#define TASK_MUSTBERAN_BEFORE_EXIT
Default value to pass for ignoring this setting //.
Definition: QueuedTask.h:9

◆ ~ThreadingManager()

DLLEXPORT Leviathan::ThreadingManager::~ThreadingManager ( )
virtual

Definition at line 48 of file ThreadingManager.cpp.

49 {
50 
51  // Joins all threads before quitting //
52  UsableThreads.clear();
53  staticaccess = nullptr;
54 }
std::vector< std::shared_ptr< TaskThread > > UsableThreads
static ThreadingManager * staticaccess

Member Function Documentation

◆ CheckInit()

DLLEXPORT bool Leviathan::ThreadingManager::CheckInit ( )
virtual

Checks has Init worked.

Definition at line 83 of file ThreadingManager.cpp.

84 {
85  // Check are there running threads //
86  GUARD_LOCK();
87 
88 
89  bool started = false;
90  int loopcount = 0;
91 
92  std::this_thread::yield();
93 
94  // This might need to be repeated for a while //
95  while(!started) {
96 
97  // Check that at least one thread is running //
98  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
99  // Check is this thread running //
100  if((*iter)->HasStarted()) {
101  // Something has started //
102  started = true;
103 
104  // Set the thread names //
105  int threadnumber = 0;
106  for(auto iter2 = UsableThreads.begin(); iter2 != UsableThreads.end();
107  ++iter2) {
108 
109  // Set the name //
111  (*iter2).get(), "Lev_Task_" + Convert::ToString(threadnumber));
112 
113  threadnumber++;
114  }
115 
116  return true;
117  }
118  }
119 
120  if(loopcount > 100) {
121  // Sleep a bit
122  std::this_thread::sleep_for(std::chrono::milliseconds(1));
123  }
124 
125  if(++loopcount > 1000) {
126 #ifndef LEVIATHAN_UE_PLUGIN
127  Logger::Get()->Error(
128  "ThreadingManager: CheckInit: no threads have started, after 1000 loops");
129 #endif // LEVIATHAN_UE_PLUGIN
130 
131  // No threads running //
132  return false;
133  }
134 
135  std::this_thread::yield();
136  }
137 
138  LEVIATHAN_ASSERT(0, "Shouldn't get out of that loop");
139  return false;
140 }
void SetThreadName(TaskThread *thread, const std::string &name)
std::vector< std::shared_ptr< TaskThread > > UsableThreads
#define LEVIATHAN_ASSERT(x, msg)
Definition: Define.h:92
static std::string ToString(const T &val)
Definition: Convert.h:72
static DLLEXPORT Logger * Get()
Definition: Logger.cpp:106
DLLEXPORT void Error(const std::string &data) override
Definition: Logger.cpp:177
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ FlushActiveThreads()

DLLEXPORT void Leviathan::ThreadingManager::FlushActiveThreads ( )

This function waits for all tasks to complete.

Definition at line 236 of file ThreadingManager.cpp.

237 {
238  // Disallow new tasks //
239  GUARD_LOCK_NAME(lockit);
240  AllowStartTasksFromQueue = false;
241 
242 
243  bool allavailable = false;
244 
245  // We want to skip wait on loop //
246  goto skipfirstwaitforthreadslabel;
247 
248  while(!allavailable) {
249 
250  // Wait for tasks to update //
251  TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(10));
252 
253  skipfirstwaitforthreadslabel:
254 
255 
256  // Set to true until a thread is busy //
257  allavailable = true;
258 
259  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
260  if((*iter)->HasRunningTask()) {
261  allavailable = false;
262  break;
263  }
264  }
265  }
266 
267  // Now free //
268 }
std::condition_variable_any TaskQueueNotify
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
std::vector< std::shared_ptr< TaskThread > > UsableThreads

◆ Get()

DLLEXPORT ThreadingManager * Leviathan::ThreadingManager::Get ( )
static

Definition at line 56 of file ThreadingManager.cpp.

57 {
58  return staticaccess;
59 }
static ThreadingManager * staticaccess

◆ Init()

DLLEXPORT bool Leviathan::ThreadingManager::Init ( )
virtual

Sets up the work queue.

Definition at line 63 of file ThreadingManager.cpp.

64 {
65 
66  GUARD_LOCK();
67 
68  // Start the queuer //
69  WorkQueueHandler = std::thread(RunTaskQueuerThread, this);
70 
71 
72  // Start appropriate amount of threads //
73 
74  for(int i = 0; i < WantedThreadCount; i++) {
75 
76 
77  UsableThreads.push_back(shared_ptr<TaskThread>(new TaskThread()));
78  }
79 
80  return true;
81 }
Object used by ThreadingManager to easily create properly initialized threads.
Definition: TaskThread.h:22
std::vector< std::shared_ptr< TaskThread > > UsableThreads
friend void RunTaskQueuerThread(ThreadingManager *manager)
std::thread WorkQueueHandler
Thread used to set tasks to threads.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ MakeThreadsWorkWithOgre()

DLLEXPORT void Leviathan::ThreadingManager::MakeThreadsWorkWithOgre ( )

Makes the threads work with Ogre.

Definition at line 341 of file ThreadingManager.cpp.

342 {
343 
345 
346  // Disallow new tasks //
347  {
348  GUARD_LOCK();
349  AllowStartTasksFromQueue = false;
350  }
351 
352  // Set our main thread's name //
353  // SetThreadNameImpl(-1, "LeviathanMain");
354 
355  // Wait for tasks to finish //
357 
358  // All threads are now available //
359 
360  // Call pre register function //
361 #ifdef LEVIATHAN_USING_OGRE
362  Ogre::Root::getSingleton().getRenderSystem()->preExtraThreadsStarted();
363 #endif // LEVIATHAN_USING_OGRE
364 
365  // Set the threads to run the register methods //
366  {
367  GUARD_LOCK_NAME(lockit);
368 
369 #ifdef LEVIATHAN_USING_OGRE
370  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
371 
372  //(*iter)->SetTaskAndNotify(
373  // std::make_shared<QueuedTask>(std::bind(RegisterOgreOnThread)));
374  // Wait for it to end //
375 #ifdef __GNUC__
376  while((*iter)->HasRunningTask()) {
377  try {
378  TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(50));
379  } catch(...) {
380  LOG_WARNING("ThreadingManager: MakeThreadsWorkWithOgre: "
381  "linux fix wait interrupted");
382  }
383  }
384 #endif
385  }
386 #endif // LEVIATHAN_USING_OGRE
387  }
388 
389  // Wait for threads to finish //
391 
392  // End registering functions //
393 #ifdef LEVIATHAN_USING_OGRE
394  Ogre::Root::getSingleton().getRenderSystem()->postExtraThreadsStarted();
395 #endif // LEVIATHAN_USING_OGRE
396 
397  // Allow new threads //
398  {
399  GUARD_LOCK();
401  }
402 }
DLLEXPORT void FlushActiveThreads()
This function waits for all tasks to complete.
std::condition_variable_any TaskQueueNotify
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
std::vector< std::shared_ptr< TaskThread > > UsableThreads
#define LOG_WARNING(x)
Definition: Define.h:83
#define QUICKTIME_THISSCOPE
Definition: TimingMonitor.h:20
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ NotifyQueuerThread()

DLLEXPORT void Leviathan::ThreadingManager::NotifyQueuerThread ( )

Notifies the queuer thread to check task setting.

Definition at line 444 of file ThreadingManager.cpp.

445 {
446 
447  TaskQueueNotify.notify_all();
448 }
std::condition_variable_any TaskQueueNotify

◆ NotifyTaskFinished()

DLLEXPORT void Leviathan::ThreadingManager::NotifyTaskFinished ( std::shared_ptr< QueuedTask task)

Called by work threads when they are done.

Definition at line 324 of file ThreadingManager.cpp.

325 {
326  // We need locking for re-adding it //
327  if(task->IsRepeating()) {
328  // Add back to queue //
329  GUARD_LOCK();
330 
331  // Or not if we should be quitting soon //
332  if(AllowRepeats)
333  WaitingTasks.push_back(task);
334  }
335 
336 
337  // We probably don't need to acquire a lock for this //
338  TaskQueueNotify.notify_all();
339 }
std::condition_variable_any TaskQueueNotify
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
bool AllowRepeats
Can tasks be repeated.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ QueueTask() [1/2]

DLLEXPORT void Leviathan::ThreadingManager::QueueTask ( std::shared_ptr< QueuedTask task)

Adds a task to the queue.

◆ QueueTask() [2/2]

DLLEXPORT FORCE_INLINE void Leviathan::ThreadingManager::QueueTask ( QueuedTask newdtask)
inline

Adds a task to the queue.

Parameters
newdtaskThe task to queue, the pointer will be deleted by this

Definition at line 73 of file ThreadingManager.h.

74  {
75  QueueTask(std::shared_ptr<QueuedTask>(newdtask));
76  }
DLLEXPORT void QueueTask(std::shared_ptr< QueuedTask > task)
Adds a task to the queue.

◆ Release()

DLLEXPORT void Leviathan::ThreadingManager::Release ( )
virtual

This will take a long time, since it will wait until all tasks are done

Todo:
Do something about unfinished tasks here

Definition at line 144 of file ThreadingManager.cpp.

145 {
146  // Disallow new tasks //
147  {
148  GUARD_LOCK();
149  AllowStartTasksFromQueue = false;
150  }
151 
152  // Wait for all to finish //
153  // WaitForAllTasksToFinish();
154 
155  {
156  GUARD_LOCK();
157  StopProcessing = true;
158  }
159  // Wait for the queuer to exit //
160  TaskQueueNotify.notify_all();
161  WorkQueueHandler.join();
162 
163  // Tell all threads to quit //
164  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
165  (*iter)->NotifyKill();
166  }
167 }
std::condition_variable_any TaskQueueNotify
std::vector< std::shared_ptr< TaskThread > > UsableThreads
std::thread WorkQueueHandler
Thread used to set tasks to threads.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ RemoveFromQueue()

DLLEXPORT bool Leviathan::ThreadingManager::RemoveFromQueue ( std::shared_ptr< QueuedTask task)

Removes a task from the queue.

Precondition
The task is added with QueueTask
Note
If the task is currectly being executed current thread spinlocsk untill it is done

Definition at line 180 of file ThreadingManager.cpp.

181 {
182  // Best case scenario is finding it in the wait queue //
183  {
184  GUARD_LOCK();
185 
186  auto end = WaitingTasks.end();
187  for(auto iter = WaitingTasks.begin(); iter != end; ++iter) {
188 
189  if((*iter).get() == task.get()) {
190 
191  WaitingTasks.erase(iter);
192  return true;
193  }
194  }
195  }
196 
197  // The worse case is it having finished already //
198  // And the worst case is it being currently executed //
199  bool wasrunning = false;
200  bool isrunning = false;
201 
202  do {
203 
204  isrunning = false;
205 
206  GUARD_LOCK();
207 
208  auto end = UsableThreads.end();
209  for(auto iter = UsableThreads.begin(); iter != end; ++iter) {
210 
211  if((*iter)->IsRunningTask(task)) {
212 
213  isrunning = true;
214  wasrunning = true;
215  break;
216  }
217  }
218 
219  } while(isrunning);
220 
221  return wasrunning;
222 }
std::vector< std::shared_ptr< TaskThread > > UsableThreads
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ RemoveTasksFromQueue()

DLLEXPORT void Leviathan::ThreadingManager::RemoveTasksFromQueue ( std::vector< std::shared_ptr< QueuedTask >> &  tasklist)

Removes specific tasks from the queue.

Precondition
The tasks are added with QueueTask and tasklist has the list of tasks to remove
Postcondition
The tasklist is empty and the tasks won't be ran
Note
If the task is currectly being executed current thread spinlocsk untill it is done

Definition at line 224 of file ThreadingManager.cpp.

226 {
227  // Just go one by one and remove them all //
228  for(size_t i = 0; i < tasklist.size(); i++) {
229 
230  RemoveFromQueue(tasklist[i]);
231  }
232 
233  tasklist.clear();
234 }
DLLEXPORT bool RemoveFromQueue(std::shared_ptr< QueuedTask > task)
Removes a task from the queue.

◆ SetDisallowRepeatingTasks()

DLLEXPORT void Leviathan::ThreadingManager::SetDisallowRepeatingTasks ( bool  disallow)

Disallows repeating tasks to occur again.

Note
This should only be called by the Engine class just before quitting

Definition at line 450 of file ThreadingManager.cpp.

451 {
452  AllowRepeats = disallow;
453 }
bool AllowRepeats
Can tasks be repeated.

◆ SetDiscardConditionalTasks()

DLLEXPORT void Leviathan::ThreadingManager::SetDiscardConditionalTasks ( bool  discard)

Sets the task queuer to discard all conditional tasks.

Note
This should only be called by the Engine

Definition at line 455 of file ThreadingManager.cpp.

456 {
457  AllowConditionalWait = !discard;
458 }

◆ UnregisterGraphics()

DLLEXPORT void Leviathan::ThreadingManager::UnregisterGraphics ( )

Must be called if MakeThreadsWorkWithOgre has been called, BEFORE releasing graphics.

Definition at line 404 of file ThreadingManager.cpp.

405 {
406 
407  // Wait for threads to finish //
409 
410  {
411  GUARD_LOCK_NAME(lockit);
412 
413 #ifdef LEVIATHAN_USING_OGRE
414 
415  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
416 
417  //(*iter)->SetTaskAndNotify(
418  // std::make_shared<QueuedTask>(std::bind(UnregisterOgreOnThread)));
419  // Wait for it to end //
420  // #ifdef __GNUC__
421  // while((*iter)->HasRunningTask()){
422  // try{
423  // TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(50));
424  // }
425  // catch(...){
426  // LOG_WARNING("ThreadingManager: MakeThreadsWorkWithOgre: "
427  // "linux fix wait interrupted");
428  // }
429  // }
430  // #endif
431  }
432 #endif // LEVIATHAN_USING_OGRE
433  }
434 
436 
437  // Allow new threads //
438  {
439  GUARD_LOCK();
441  }
442 }
DLLEXPORT void FlushActiveThreads()
This function waits for all tasks to complete.
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
std::vector< std::shared_ptr< TaskThread > > UsableThreads
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ WaitForAllTasksToFinish()

DLLEXPORT void Leviathan::ThreadingManager::WaitForAllTasksToFinish ( )

Blocks until all queued tasks are finished.

Warning
This function will ignore MustBeRanBefore return value by passing TASK_MUSTBERAN_BEFORE_EXIT
Bug:
This doesn't properly handle tasks that are repeating

Definition at line 270 of file ThreadingManager.cpp.

271 {
272  // Use this lock the entire function //
273  GUARD_LOCK();
274 
275  // See if empty right now and loop until it is //
276  while(true) {
277 
278  WaitForWorkersToEmpty(guard);
279 
280  // Unlock us for a second
281  guard.unlock();
282  // Make the queuer run //
283  TaskQueueNotify.notify_all();
284  guard.lock();
285 
286  WaitForWorkersToEmpty(guard);
287  WaitForWorkersToEmpty(guard);
288 
289  if(WaitingTasks.empty())
290  return;
291  }
292 
293  WaitForWorkersToEmpty(guard);
294 }
DLLEXPORT void WaitForWorkersToEmpty(Lock &guard)
Blocks until all threads are empty.
std::condition_variable_any TaskQueueNotify
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
#define GUARD_LOCK()
Definition: ThreadSafe.h:111

◆ WaitForWorkersToEmpty()

DLLEXPORT void ThreadingManager::WaitForWorkersToEmpty ( Lock guard)

Blocks until all threads are empty.

Definition at line 296 of file ThreadingManager.cpp.

297 {
298 
299  // Wait for threads to empty up //
300  bool allavailable = false;
301 
302  // We want to skip wait on loop //
303  goto skipfirstwaitforthreadslabel2;
304 
305  while(!allavailable) {
306 
307  // Wait for tasks to update //
308  TaskQueueNotify.wait_for(guard, std::chrono::milliseconds(1));
309 
310  skipfirstwaitforthreadslabel2:
311 
312  // Set to true until a thread is busy //
313  allavailable = true;
314 
315  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
316  if((*iter)->HasRunningTask()) {
317  allavailable = false;
318  break;
319  }
320  }
321  }
322 }
std::condition_variable_any TaskQueueNotify
std::vector< std::shared_ptr< TaskThread > > UsableThreads

Friends And Related Function Documentation

◆ RunTaskQueuerThread

void RunTaskQueuerThread ( ThreadingManager manager)
friend
Todo:
Improve performance

Member Data Documentation

◆ AllowConditionalWait

bool Leviathan::ThreadingManager::AllowConditionalWait
protected

Controls whether tasks can be conditional. Setting this to false will remove all tasks that cannot be ran instantly

Definition at line 131 of file ThreadingManager.h.

◆ AllowRepeats

bool Leviathan::ThreadingManager::AllowRepeats
protected

Can tasks be repeated.

Definition at line 127 of file ThreadingManager.h.

◆ AllowStartTasksFromQueue

bool Leviathan::ThreadingManager::AllowStartTasksFromQueue
protected

Definition at line 117 of file ThreadingManager.h.

◆ staticaccess

ThreadingManager * Leviathan::ThreadingManager::staticaccess = NULL
staticprotected

Definition at line 142 of file ThreadingManager.h.

◆ StopProcessing

bool Leviathan::ThreadingManager::StopProcessing
protected

Definition at line 118 of file ThreadingManager.h.

◆ TaksMustBeRanBeforeState

int Leviathan::ThreadingManager::TaksMustBeRanBeforeState
protected

Used to allow QueuedTask::MustBeRanBefore function to work, shared between staticaccess worker thread and the main object

Definition at line 124 of file ThreadingManager.h.

◆ TaskQueueNotify

std::condition_variable_any Leviathan::ThreadingManager::TaskQueueNotify
protected

Definition at line 136 of file ThreadingManager.h.

◆ UsableThreads

std::vector<std::shared_ptr<TaskThread> > Leviathan::ThreadingManager::UsableThreads
protected

Definition at line 137 of file ThreadingManager.h.

◆ WaitingTasks

std::list<std::shared_ptr<QueuedTask> > Leviathan::ThreadingManager::WaitingTasks
protected

List of the tasks queued by the application.

Definition at line 135 of file ThreadingManager.h.

◆ WantedThreadCount

int Leviathan::ThreadingManager::WantedThreadCount
protected

Definition at line 120 of file ThreadingManager.h.

◆ WorkQueueHandler

std::thread Leviathan::ThreadingManager::WorkQueueHandler
protected

Thread used to set tasks to threads.

Definition at line 140 of file ThreadingManager.h.


The documentation for this class was generated from the following files: