Leviathan  0.8.0.0
Leviathan game engine
ThreadingManager.cpp
Go to the documentation of this file.
1 // ------------------------------------ //
2 #include "ThreadingManager.h"
3 
4 #include "QueuedTask.h"
6 #include "Utility/Convert.h"
7 
8 #include <thread>
9 using namespace Leviathan;
10 using namespace std;
11 // ------------------------------------ //
12 
13 // ------------------ Utility functions for threads to run ------------------ //
14 // TODO: BSF may need some handling
16 
18 
19 // ------------------ ThreadingManager ------------------ //
21  /*= DEFAULT_THREADS_PER_CORE*/) :
22  AllowStartTasksFromQueue(true),
23  StopProcessing(false), TaksMustBeRanBeforeState(TASK_MUSTBERAN_BEFORE_EXIT),
24  AllowRepeats(true), AllowConditionalWait(true)
25 {
26  WantedThreadCount = std::thread::hardware_concurrency() * basethreadspercore;
27 
28  staticaccess = this;
29 }
30 
32 {
33 
34  // Joins all threads before quitting //
35  UsableThreads.clear();
36  staticaccess = nullptr;
37 }
38 
40 {
41  return staticaccess;
42 }
43 
45 // ------------------------------------ //
47 {
48 
49  GUARD_LOCK();
50 
51  // Start the queuer //
52  WorkQueueHandler = std::thread(RunTaskQueuerThread, this);
53 
54 
55  // Start appropriate amount of threads //
56 
57  for(int i = 0; i < WantedThreadCount; i++) {
58 
59 
60  UsableThreads.push_back(shared_ptr<TaskThread>(new TaskThread()));
61  }
62 
63  return true;
64 }
65 
67 {
68  // Check are there running threads //
69  GUARD_LOCK();
70 
71 
72  bool started = false;
73  int loopcount = 0;
74 
75  std::this_thread::yield();
76 
77  // This might need to be repeated for a while //
78  while(!started) {
79 
80  // Check that at least one thread is running //
81  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
82  // Check is this thread running //
83  if((*iter)->HasStarted()) {
84  // Something has started //
85  started = true;
86 
87  // Set the thread names //
88  int threadnumber = 0;
89  for(auto iter2 = UsableThreads.begin(); iter2 != UsableThreads.end();
90  ++iter2) {
91 
92  // Set the name //
94  (*iter2).get(), "Lev_Task_" + Convert::ToString(threadnumber));
95 
96  threadnumber++;
97  }
98 
99  return true;
100  }
101  }
102 
103  if(loopcount > 100) {
104  // Sleep a bit
105  std::this_thread::sleep_for(std::chrono::milliseconds(1));
106  }
107 
108  if(++loopcount > 1000) {
109 #ifndef LEVIATHAN_UE_PLUGIN
110  Logger::Get()->Error(
111  "ThreadingManager: CheckInit: no threads have started, after 1000 loops");
112 #endif // LEVIATHAN_UE_PLUGIN
113 
114  // No threads running //
115  return false;
116  }
117 
118  std::this_thread::yield();
119  }
120 
121  LEVIATHAN_ASSERT(0, "Shouldn't get out of that loop");
122  return false;
123 }
124 
125 
126 
128 {
129  // Disallow new tasks //
130  {
131  GUARD_LOCK();
132  AllowStartTasksFromQueue = false;
133  }
134 
135  // Wait for all to finish //
136  // WaitForAllTasksToFinish();
137 
138  {
139  GUARD_LOCK();
140  StopProcessing = true;
141  }
142  // Wait for the queuer to exit //
143  TaskQueueNotify.notify_all();
144  WorkQueueHandler.join();
145 
146  // Tell all threads to quit //
147  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
148  (*iter)->NotifyKill();
149  }
150 }
151 // ------------------------------------ //
152 DLLEXPORT void Leviathan::ThreadingManager::QueueTask(shared_ptr<QueuedTask> task)
153 {
154  {
155  GUARD_LOCK();
156 
157  WaitingTasks.push_back(task);
158  }
159  // Notify the thread //
160  TaskQueueNotify.notify_all();
161 }
162 
164 {
165  // Best case scenario is finding it in the wait queue //
166  {
167  GUARD_LOCK();
168 
169  auto end = WaitingTasks.end();
170  for(auto iter = WaitingTasks.begin(); iter != end; ++iter) {
171 
172  if((*iter).get() == task.get()) {
173 
174  WaitingTasks.erase(iter);
175  return true;
176  }
177  }
178  }
179 
180  // The worse case is it having finished already //
181  // And the worst case is it being currently executed //
182  bool wasrunning = false;
183  bool isrunning = false;
184 
185  do {
186 
187  isrunning = false;
188 
189  GUARD_LOCK();
190 
191  auto end = UsableThreads.end();
192  for(auto iter = UsableThreads.begin(); iter != end; ++iter) {
193 
194  if((*iter)->IsRunningTask(task)) {
195 
196  isrunning = true;
197  wasrunning = true;
198  break;
199  }
200  }
201 
202  } while(isrunning);
203 
204  return wasrunning;
205 }
206 
208  std::vector<shared_ptr<QueuedTask>>& tasklist)
209 {
210  // Just go one by one and remove them all //
211  for(size_t i = 0; i < tasklist.size(); i++) {
212 
213  RemoveFromQueue(tasklist[i]);
214  }
215 
216  tasklist.clear();
217 }
218 // ------------------------------------ //
220 {
221  // Disallow new tasks //
222  GUARD_LOCK_NAME(lockit);
223  AllowStartTasksFromQueue = false;
224 
225 
226  bool allavailable = false;
227 
228  // We want to skip wait on loop //
229  goto skipfirstwaitforthreadslabel;
230 
231  while(!allavailable) {
232 
233  // Wait for tasks to update //
234  TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(10));
235 
236  skipfirstwaitforthreadslabel:
237 
238 
239  // Set to true until a thread is busy //
240  allavailable = true;
241 
242  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
243  if((*iter)->HasRunningTask()) {
244  allavailable = false;
245  break;
246  }
247  }
248  }
249 
250  // Now free //
251 }
252 
254 {
255  // Use this lock the entire function //
256  GUARD_LOCK();
257 
258  // See if empty right now and loop until it is //
259  while(true) {
260 
261  WaitForWorkersToEmpty(guard);
262 
263  // Unlock us for a second
264  guard.unlock();
265  // Make the queuer run //
266  TaskQueueNotify.notify_all();
267  guard.lock();
268 
269  WaitForWorkersToEmpty(guard);
270  WaitForWorkersToEmpty(guard);
271 
272  if(WaitingTasks.empty())
273  return;
274  }
275 
276  WaitForWorkersToEmpty(guard);
277 }
278 
280 {
281 
282  // Wait for threads to empty up //
283  bool allavailable = false;
284 
285  // We want to skip wait on loop //
286  goto skipfirstwaitforthreadslabel2;
287 
288  while(!allavailable) {
289 
290  // Wait for tasks to update //
291  TaskQueueNotify.wait_for(guard, std::chrono::milliseconds(1));
292 
293  skipfirstwaitforthreadslabel2:
294 
295  // Set to true until a thread is busy //
296  allavailable = true;
297 
298  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
299  if((*iter)->HasRunningTask()) {
300  allavailable = false;
301  break;
302  }
303  }
304  }
305 }
306 // ------------------------------------ //
308 {
309  // We need locking for re-adding it //
310  if(task->IsRepeating()) {
311  // Add back to queue //
312  GUARD_LOCK();
313 
314  // Or not if we should be quitting soon //
315  if(AllowRepeats)
316  WaitingTasks.push_back(task);
317  }
318 
319 
320  // We probably don't need to acquire a lock for this //
321  TaskQueueNotify.notify_all();
322 }
323 // ------------------------------------ //
325 {
326 
328 
329  // Disallow new tasks //
330  {
331  GUARD_LOCK();
332  AllowStartTasksFromQueue = false;
333  }
334 
335  // Set our main thread's name //
336  // SetThreadNameImpl(-1, "LeviathanMain");
337 
338  // Wait for tasks to finish //
339  FlushActiveThreads();
340 
341  // All threads are now available //
342 
343  // TODO: if BSF thread setup is needed it should be here
344 
345  // Wait for threads to finish //
346  FlushActiveThreads();
347 
348  // End registering functions //
349 
350  // Allow new tasks to run //
351  {
352  GUARD_LOCK();
353  AllowStartTasksFromQueue = true;
354  }
355 }
356 
358 {
359  // Wait for threads to finish //
360  FlushActiveThreads();
361 
362  {
363  GUARD_LOCK_NAME(lockit);
364 
365  for(auto iter = UsableThreads.begin(); iter != UsableThreads.end(); ++iter) {
366 
367  //(*iter)->SetTaskAndNotify(
368  // std::make_shared<QueuedTask>(std::bind(UnregisterOgreOnThread)));
369  // Wait for it to end //
370  // #ifdef __GNUC__
371  // while((*iter)->HasRunningTask()){
372  // try{
373  // TaskQueueNotify.wait_for(lockit, std::chrono::milliseconds(50));
374  // }
375  // catch(...){
376  // LOG_WARNING("ThreadingManager: MakeThreadsWorkWithOgre: "
377  // "linux fix wait interrupted");
378  // }
379  // }
380  // #endif
381  }
382  }
383 
384  FlushActiveThreads();
385 
386  // Allow new tasks to run //
387  {
388  GUARD_LOCK();
389  AllowStartTasksFromQueue = true;
390  }
391 }
392 // ------------------------------------ //
394 {
395 
396  TaskQueueNotify.notify_all();
397 }
398 
400 {
401  AllowRepeats = disallow;
402 }
403 
405 {
406  AllowConditionalWait = !discard;
407 }
408 // ------------------------------------ //
410 {
411 
412  // Lock the object //
413  GUARD_LOCK_OTHER(manager);
414 
415  while(!manager->StopProcessing) {
416 
417  // Wait until task queue needs work //
418  manager->TaskQueueNotify.wait_for(guard, std::chrono::milliseconds(100));
419 
420  // Quickly continue if it is empty //
421  if(!manager->AllowStartTasksFromQueue || manager->WaitingTasks.empty()) {
422 
423  continue;
424  }
425 
426  // Keep iterator consistent with the whole loop, (to avoid excessive calling of
427  // CanBeRan) //
428  auto taskiter = manager->WaitingTasks.begin();
429  // Used to iterate again, but just checking if they can be ran (allows more important
430  // tasks to run first) //
431  auto nonimportantiter = manager->WaitingTasks.begin();
432 
433  // We need some common values for tasks to use for checking if they can run //
434  QueuedTaskCheckValues commontaskcheck;
435 
436  // Find an empty thread and queue tasks //
437  for(auto iter = manager->UsableThreads.begin(); iter != manager->UsableThreads.end();
438  ++iter) {
439 
440  if(!(*iter)->HasRunningTask()) {
441 
442  // Break if no tasks //
443  if(manager->WaitingTasks.empty())
444  break;
445 
446  // Queue a task //
447  shared_ptr<QueuedTask> tmptask;
448 
449  // Try to find a suitable one //
450  for(; taskiter != manager->WaitingTasks.end();) {
451 
452  // Check does the task want to run now //
453  if((*taskiter)->MustBeRanBefore(manager->TaksMustBeRanBeforeState)) {
454  // Check is allowed to run //
455  if((*taskiter)->CanBeRan(&commontaskcheck)) {
456  // Run it! //
457  tmptask = (*taskiter);
458  // Erase, might be temporary //
459  taskiter = manager->WaitingTasks.erase(taskiter);
460 
461  // Just to be safe, TODO: performance could be improved //
462  nonimportantiter = taskiter;
463 
464  break;
465  } else if(!manager->AllowConditionalWait) {
466  // Discard it //
467  taskiter = manager->WaitingTasks.erase(taskiter);
468  // Just to be safe, TODO: performance could be improved //
469  nonimportantiter = taskiter;
470  }
471  } else if(!manager->AllowConditionalWait) {
472  // Discard it //
473  taskiter = manager->WaitingTasks.erase(taskiter);
474  // Just to be safe, TODO: performance could be improved //
475  nonimportantiter = taskiter;
476  }
477 
478  ++taskiter;
479  }
480 
481  if(!tmptask) {
482  // Check with the other iterator, too //
483  for(; nonimportantiter != manager->WaitingTasks.end();) {
484  // Check is allowed to run //
485  if((*nonimportantiter)->CanBeRan(&commontaskcheck)) {
486  // Run it! //
487  tmptask = (*nonimportantiter);
488  // Erase, might be temporary //
489  nonimportantiter = manager->WaitingTasks.erase(nonimportantiter);
490 
491  // Just to be safe, TODO: performance could be improved //
492  taskiter = nonimportantiter;
493 
494  break;
495  } else if(!manager->AllowConditionalWait) {
496  // Discard it //
497  nonimportantiter = manager->WaitingTasks.erase(taskiter);
498 
499  // Just to be safe, TODO: performance could be improved //
500  taskiter = nonimportantiter;
501  }
502 
503  ++nonimportantiter;
504  }
505  }
506 
507  // If still nothing, nothing cannot run //
508  if(!tmptask)
509  break;
510 
511  // This won't actually finish it so to re-queue it, if it repeats, we use the
512  // callback called when it is finished
513  (*iter)->SetTaskAndNotify(tmptask);
514  }
515  }
516  }
517 }
518 // ------------------------------------ //
519 #ifdef _WIN32
520 
521 const DWORD MS_VC_EXCEPTION = 0x406D1388;
522 
523 #pragma pack(push, 8)
524 typedef struct tagTHREADNAME_INFO {
525  DWORD dwType; // Must be 0x1000.
526  LPCSTR szName; // Pointer to name (in user addr space).
527  DWORD dwThreadID; // Thread ID (-1=caller thread).
528  DWORD dwFlags; // Reserved for future use, must be zero.
529 } THREADNAME_INFO;
530 #pragma pack(pop)
531 
532 void Leviathan::SetThreadName(TaskThread* thread, const string& name)
533 {
534  // Skip this if there is no debugger //
535  if(!IsDebuggerPresent())
536  return;
537 
538  // Get the native handle //
539  DWORD nativehandle = GetThreadId(thread->GetInternalThreadObject().native_handle());
540 
541  SetThreadNameImpl(nativehandle, name);
542 }
543 
544 void Leviathan::SetThreadNameImpl(DWORD threadid, const string& name)
545 {
546  // Do this trick as shown on MSDN //
547  THREADNAME_INFO info;
548  info.dwType = 0x1000;
549  // Set the name //
550  info.szName = name.c_str();
551  info.dwThreadID = threadid;
552  info.dwFlags = 0;
553 
554  __try {
555  RaiseException(
556  MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
557  } __except(EXCEPTION_EXECUTE_HANDLER) {
558  }
559 }
560 #elif __linux
561 
562 void Leviathan::SetThreadName(TaskThread* thread, const string& name)
563 {
564 
565  pthread_setname_np(thread->GetInternalThreadObject().native_handle(), name.c_str());
566 }
567 
568 #else
569 #error Do the Mac os thread name
570 
571 
572 
573 #endif // _WIN32
DLLEXPORT void WaitForWorkersToEmpty(Lock &guard)
Blocks until all threads are empty.
DLLEXPORT void SetDiscardConditionalTasks(bool discard)
Sets the task queuer to discard all conditional tasks.
virtual DLLEXPORT bool Init()
Sets up the work queue.
DLLEXPORT void FlushActiveThreads()
This function waits for all tasks to complete.
DLLEXPORT std::thread & GetInternalThreadObject()
Returns the internal ThisThread variable.
Definition: TaskThread.cpp:167
Object used by ThreadingManager to easily create properly initialized threads.
Definition: TaskThread.h:22
void RunTaskQueuerThread(ThreadingManager *manager)
DLLEXPORT void QueueTask(std::shared_ptr< QueuedTask > task)
Adds a task to the queue.
void SetThreadName(TaskThread *thread, const std::string &name)
virtual DLLEXPORT ~ThreadingManager()
std::condition_variable_any TaskQueueNotify
DLLEXPORT void RemoveTasksFromQueue(std::vector< std::shared_ptr< QueuedTask >> &tasklist)
Removes specific tasks from the queue.
DLLEXPORT void WaitForAllTasksToFinish()
Blocks until all queued tasks are finished.
static DLLEXPORT ThreadingManager * Get()
#define GUARD_LOCK_NAME(y)
Definition: ThreadSafe.h:116
DLLEXPORT ThreadingManager(int basethreadspercore=DEFAULT_THREADS_PER_CORE)
std::vector< std::shared_ptr< TaskThread > > UsableThreads
DLLEXPORT void MakeThreadsWorkWithOgre()
Makes the threads work with Ogre.
#define QUICKTIME_THISSCOPE
Definition: TimingMonitor.h:20
virtual DLLEXPORT bool IsRepeating()
Function called by ThreadingManager AFTER running the task //.
Definition: QueuedTask.cpp:36
DLLEXPORT void NotifyQueuerThread()
Notifies the queuer thread to check task setting.
Manages delayed execution of functions through use of QueuedTask and subclasses.
std::list< std::shared_ptr< QueuedTask > > WaitingTasks
List of the tasks queued by the application.
DLLEXPORT void RegisterOgreOnThread()
#define GUARD_LOCK_OTHER(x)
Definition: ThreadSafe.h:124
#define LEVIATHAN_ASSERT(x, msg)
Definition: Define.h:104
DLLEXPORT bool RemoveFromQueue(std::shared_ptr< QueuedTask > task)
Removes a task from the queue.
static std::string ToString(const T &val)
Definition: Convert.h:72
virtual DLLEXPORT bool CheckInit()
Checks has Init worked.
Object passed to tasks which has common values.
Definition: QueuedTask.h:21
DLLEXPORT void NotifyTaskFinished(std::shared_ptr< QueuedTask > task)
Called by work threads when they are done.
static DLLEXPORT Logger * Get()
Definition: Logger.cpp:106
virtual DLLEXPORT void Release()
DLLEXPORT void SetDisallowRepeatingTasks(bool disallow)
Disallows repeating tasks to occur again.
DLLEXPORT void UnregisterOgreOnThread()
#define DLLEXPORT
Definition: Include.h:84
DLLEXPORT void UnregisterGraphics()
Must be called if MakeThreadsWorkWithOgre has been called, BEFORE releasing graphics.
static ThreadingManager * staticaccess
The access mask controls which registered functions and classes a script sees.
Definition: GameModule.h:12
#define TASK_MUSTBERAN_BEFORE_EXIT
Default value to pass for ignoring this setting //.
Definition: QueuedTask.h:9
DLLEXPORT void Error(const std::string &data) override
Definition: Logger.cpp:177
#define GUARD_LOCK()
Definition: ThreadSafe.h:111
std::unique_lock< std::mutex > Lock
Definition: ThreadSafe.h:18