MADNESS  version 0.9
worldthread.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31  $Id $
32 */
33 #ifndef MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
34 #define MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
35 
38 
39 #include <madness/TAU.h>
40 #include <madness/world/dqueue.h>
41 #include <vector>
42 #include <cstddef>
43 #include <cstdio>
44 #include <pthread.h>
46 #include <typeinfo>
47 #include <new>
48 
49 #ifdef MADNESS_TASK_PROFILING
50 #include <execinfo.h> // for backtrace_symbols
51 #ifndef USE_LIBIBERTY
52 #include <cxxabi.h> // for abi::__cxa_demangle
53 #else
54 extern "C" {
55  extern char * cplus_demangle (const char *mangled, int options);
56 #define DMGL_NO_OPTS 0 /* For readability... */
57 }
58 #endif
59 #include <sstream> // for std::istringstream
60 #include <cstring> // for strchr & strrchr
61 #endif // MADNESS_TASK_PROFILING
62 
63 #ifdef HAVE_INTEL_TBB
64 #include "tbb/tbb.h"
65 #endif
66 
67 
68 #ifndef _SC_NPROCESSORS_CONF
69 // Old macs don't have necessary support thru sysconf to determine the
70 // no. of processors so must use sysctl
71 #include <sys/types.h>
72 #include <sys/sysctl.h>
73 #endif
74 
75 namespace madness {
76 
77  // Forward decl.
78  class Barrier;
79  class ThreadPool;
80  class WorldTaskQueue;
81  class AtomicInt;
82  void error(const char *msg);
83 
85 
91  class ThreadBase {
92  friend class ThreadPool;
93  static bool bind[3];
94  static int cpulo[3];
95  static int cpuhi[3];
96  static pthread_key_t thread_key;
97 
98  static void* main(void* self);
99 
100  int pool_num;
101  pthread_t id;
102 
103  static void init_thread_key() {
104  const int rc = pthread_key_create(&thread_key, NULL);
105  if(rc != 0)
106  MADNESS_EXCEPTION("pthread_key_create failed", rc);
107  }
108 
109  static void delete_thread_key() {
110  pthread_key_delete(thread_key);
111  }
112 
113  void set_pool_thread_index(int i) { pool_num = i; }
114 
115 #if defined(HAVE_IBMBGQ) and defined(HPM)
116  static const int hpm_thread_id_all = -10;
117  static const int hpm_thread_id_main = -2;
118  static bool main_instrumented;
119  static bool all_instrumented;
120  static int hpm_thread_id;
121 #endif
122 
123  public:
124 
126  ThreadBase() : pool_num(-1) { }
127 
128  virtual ~ThreadBase() { }
129 
131  virtual void run() = 0;
132 
134  void start();
135 
137  static void exit() { pthread_exit(0); }
138 
140  const pthread_t& get_id() const { return id; }
141 
143  int get_pool_thread_index() const { return pool_num; }
144 
146  int cancel() const { return pthread_cancel(get_id()); }
147 
148 
150  static int num_hw_processors();
151 
153  static void set_affinity_pattern(const bool bind[3], const int cpu[3]);
154 
155  static void set_affinity(int logical_id, int ind=-1);
156 
158  return static_cast<ThreadBase*>(pthread_getspecific(thread_key));
159  }
160 
161 #if defined(HAVE_IBMBGQ) and defined(HPM)
162  static void set_hpm_thread_env(int hpm_thread_id);
163 #endif
164  }; // class ThreadBase
165 
167  class Thread : public ThreadBase {
168  void* (*f)(void *);
169  void* args;
170 
171  void run() { f(args); }
172 
173  public:
175  Thread() : f(0), args(0) { }
176 
178  Thread(void* (*f)(void *), void* args=0)
179  : f(f), args(args) {
181  }
182 
183  void start(void* (*f)(void *), void* args=0) {
184  this->f = f;
185  this->args = args;
187  }
188 
189  virtual ~Thread() {}
190  };
191 
192 
194 
209  unsigned long flags;
210  public:
211  static const unsigned long NTHREAD = 0xff; // Mask for nthread byte
212  static const unsigned long GENERATOR = 1ul<<8; // Mask for generator bit
213  static const unsigned long STEALABLE = GENERATOR<<1; // Mask for stealable bit
214  static const unsigned long HIGHPRIORITY = GENERATOR<<2; // Mask for priority bit
215 
216  explicit TaskAttributes(unsigned long flags = 0) : flags(flags) {}
217 
218  TaskAttributes(const TaskAttributes& attr) : flags(attr.flags) {}
219 
220  virtual ~TaskAttributes() {}
221 
222  bool is_generator() const { return flags&GENERATOR; }
223 
224  bool is_stealable() const { return flags&STEALABLE; }
225 
226  bool is_high_priority() const { return flags&HIGHPRIORITY; }
227 
228  void set_generator(bool generator_hint) {
229  if (generator_hint)
230  flags |= GENERATOR;
231  else
232  flags &= ~GENERATOR;
233  }
234 
235  void set_stealable(bool stealable) {
236  if (stealable) flags |= STEALABLE;
237  else flags &= ~STEALABLE;
238  }
239 
240  void set_highpriority(bool hipri) {
241  if (hipri)
242  flags |= HIGHPRIORITY;
243  else
244  flags &= ~HIGHPRIORITY;
245  }
246 
248 
256  void set_nthread(int nthread) {
257  MADNESS_ASSERT(nthread>=0 && nthread<256);
258  flags = (flags & (~NTHREAD)) | (nthread & NTHREAD);
259  }
260 
261  int get_nthread() const {
262  int n = flags & NTHREAD;
263  if (n == 0)
264  n = 1;
265  return n;
266  }
267 
268  template <typename Archive>
269  void serialize(Archive& ar) {
270  ar & flags;
271  }
272 
274  return TaskAttributes(GENERATOR);
275  }
276 
278  return TaskAttributes(HIGHPRIORITY);
279  }
280 
281  static TaskAttributes multi_threaded(int nthread) {
282  TaskAttributes t;
283  t.set_nthread(nthread);
284  return t;
285  }
286  };
287 
290  const int _nthread;
291  const int _id;
292  Barrier* _barrier;
293 
294  public:
296  : _nthread(nthread), _id(id), _barrier(barrier)
297  {}
298 
299 #if HAVE_INTEL_TBB
300  // I can not get the TaskThreadEnv to work with Barrier
301  // Need to figure out why
302  TaskThreadEnv(int nthread, int id)
303  : _nthread(nthread), _id(id), _barrier(NULL)
304  {};
305 #endif
306 
307  int nthread() const {return _nthread;}
308 
309  int id() const {return _id;}
310 
311  bool barrier() const {
312  if (_nthread == 1)
313  return true;
314  else {
315  MADNESS_ASSERT(_barrier);
316  return _barrier->enter(_id);
317  }
318  }
319  };
320 
321 
322 #ifdef MADNESS_TASK_PROFILING
323 
324  namespace profiling {
325 
327 
330  class TaskEvent {
331  private:
332  double times_[3];
333  std::pair<void*, unsigned short> id_;
334  unsigned short threads_;
335 
337 
344  static void print_demangled(std::ostream& os, const char* symbol) {
345  // Get the demagled symbol name
346  if(symbol) {
347  int status = 0;
348 #ifndef USE_LIBIBERTY
349  const char* name = abi::__cxa_demangle(symbol, 0, 0, &status);
350 #else
351  char* name = cplus_demangle(symbol, DMGL_NO_OPTS);
352 #endif
353  // Append the demangled symbol name to the output stream
354  if(status == 0) {
355  os << name << "\t";
356  free((void*)name);
357  } else {
358  os << symbol << "\t";
359  }
360  } else {
361  os << "UNKNOWN\t";
362  }
363  }
364 
366 
368  std::string get_name() const {
369 
370  // Get the backtrace symbol for the function address,
371  // which contains the function name.
372  void* const * func_ptr = const_cast<void* const *>(& id_.first);
373  char** bt_sym = backtrace_symbols(func_ptr, 1);
374 
375  // Extract the mangled function name from the backtrace
376  // symbol.
377  std::string mangled_name;
378 
379 #ifdef ON_A_MAC
380  // Format of bt_sym is:
381  // <frame #> <file name> <address> <mangled name> + <function offset>
382  std::istringstream iss(bt_sym[0]);
383  long frame;
384  std::string file, address;
385  iss >> frame >> file >> address >> mangled_name;
386 #else // Assume Linux
387  // Format of bt_sym is:
388  // <file>(<mangled name>+<function offset>) [<address>]
389  const char* first = strchr(bt_sym[0],'(');
390  if(first) {
391  ++first;
392  const char* last = strrchr(first,'+');
393  if(last)
394  mangled_name.assign(first, (last - first) - 1);
395  }
396 #endif // ON_A_MAC
397 
398  // Free the backtrace buffer
399  free(bt_sym);
400 
401  return mangled_name;
402  }
403 
404  public:
405 
407 
409 
415  void start(const std::pair<void*, unsigned short>& id,
416  const unsigned short threads, const double submit_time)
417  {
418  id_ = id;
419  threads_ = threads;
420  times_[0] = submit_time;
421  times_[1] = wall_time();
422  }
423 
425  void stop() { times_[2] = wall_time(); }
426 
428 
435  friend std::ostream& operator<<(std::ostream& os, const TaskEvent& te) {
436  // Add address to output stream
437  os << std::hex << std::showbase << te.id_.first <<
438  std::dec << std::noshowbase << "\t";
439 
440  // Print the name
441  switch(te.id_.second) {
442  case 1:
443  {
444  const std::string mangled_name = te.get_name();
445 
446  // Print the demangled name
447  if(! mangled_name.empty())
448  print_demangled(os, mangled_name.c_str());
449  else
450  os << "UNKNOWN\t";
451  }
452  break;
453  case 2:
454  print_demangled(os, static_cast<const char*>(te.id_.first));
455  break;
456  default:
457  os << "UNKNOWN\t";
458  }
459 
460  // Print:
461  // # of threads, submit time, start time, stop time
462  os << te.threads_;
463  const std::streamsize precision = os.precision();
464  os.precision(6);
465  os << std::fixed << "\t" << te.times_[0]
466  << "\t" << te.times_[1] << "\t" << te.times_[2];
467  os.precision(precision);
468  return os;
469  }
470 
471  }; // class TaskEvent
472 
474 
476  class TaskEventListBase {
477  private:
478  TaskEventListBase* next_;
479 
480  TaskEventListBase(const TaskEventListBase&);
481  TaskEventListBase& operator=(const TaskEventListBase&);
482 
483  public:
484 
486  TaskEventListBase() : next_(NULL) { }
487 
489  virtual ~TaskEventListBase() { }
490 
492  TaskEventListBase* next() const { return next_; }
493 
495 
497  void insert(TaskEventListBase* list) {
498  if(next_)
499  list->next_ = next_;
500  next_ = list;
501  }
502 
504 
508  friend inline std::ostream& operator<<(std::ostream& os, const TaskEventListBase& tel) {
509  return tel.print_events(os);
510  }
511 
512  private:
513 
515  virtual std::ostream& print_events(std::ostream&) const = 0;
516 
517  }; // class TaskEventList
518 
520 
523  class TaskEventList : public TaskEventListBase {
524  private:
525  unsigned int n_;
526  TaskEvent* events_;
527 
528  // Not allowed
529  TaskEventList(const TaskEventList&);
530  TaskEventList& operator=(const TaskEventList&);
531 
532  public:
533 
535  TaskEventList(const unsigned int nmax) :
536  TaskEventListBase(), n_(0ul), events_(new TaskEvent[nmax])
537  { }
538 
540  virtual ~TaskEventList() { delete [] events_; }
541 
543 
547  TaskEvent* event() { return events_ + (n_++); }
548 
549  private:
550 
552  virtual std::ostream& print_events(std::ostream& os) const {
553  const int thread_id = ThreadBase::this_thread()->get_pool_thread_index();
554  for(std::size_t i = 0; i < n_; ++i)
555  os << thread_id << "\t" << events_[i] << std::endl;
556  return os;
557  }
558 
559  }; // class TaskEventList
560 
562 
566  class TaskProfiler {
567  private:
568  TaskEventListBase* head_;
569  TaskEventListBase* tail_;
570 
571  static Mutex output_mutex_;
572 
573  // Not allowed
574  TaskProfiler(const TaskProfiler&);
575  TaskProfiler& operator=(const TaskProfiler&);
576 
577  public:
578  static const char* output_file_name_;
579  public:
584  TaskProfiler() : head_(NULL), tail_(NULL) { }
585 
587  ~TaskProfiler() {
588  // Cleanup linked list
589  TaskEventListBase* next = NULL;
590  while(head_ != NULL) {
591  next = head_->next();
592  delete head_;
593  head_ = next;
594  }
595  }
596 
598 
601  TaskEventList* new_list(const std::size_t nmax) {
602  // Create a new event list
603  TaskEventList* list = new TaskEventList(nmax);
604 
605  // Append the list to the tail of the linked list
606  if(head_ != NULL) {
607  tail_->insert(list);
608  tail_ = list;
609  } else {
610  head_ = list;
611  tail_ = list;
612  }
613  return list;
614  }
615 
617 
624  void write_to_file();
625  }; // class TaskProfiler
626 
627  } // namespace profiling
628 
629 #endif // MADNESS_TASK_PROFILING
630 
631 
633 
637 #ifdef HAVE_INTEL_TBB
638  public tbb::task,
639 #endif // HAVE_INTEL_TBB
640  public TaskAttributes
641  {
642  friend class ThreadPool;
643 
644  private:
645  Barrier* barrier;
646  AtomicInt count;
647 
648 #ifdef MADNESS_TASK_PROFILING
649  profiling::TaskEvent* task_event_;
650  double submit_time_;
651  std::pair<void*, unsigned short> id_;
652 
653  void set_event(profiling::TaskEvent* task_event) {
654  task_event_ = task_event;
655  }
656 
658  void submit() {
659  submit_time_ = wall_time();
660  this->get_id(id_);
661  }
662 #endif // MADNESS_TASK_PROFILING
663 
665 
668  template <typename T>
669  union FunctionPointerGrabber {
670  T in;
671  void* out;
672  };
673 
674  protected:
675 
676 
677  template <typename fnT>
678  static typename enable_if_c<detail::function_traits<fnT>::value ||
679  detail::memfunc_traits<fnT>::value>::type
680  make_id(std::pair<void*,unsigned short>& id, fnT fn) {
681  FunctionPointerGrabber<fnT> poop;
682  poop.in = fn;
683  id.first = poop.out;
684  id.second = 1ul;
685  }
686 
687  template <typename fnobjT>
689  detail::memfunc_traits<fnobjT>::value>::type
690  make_id(std::pair<void*,unsigned short>& id, const fnobjT&) {
691  id.first = reinterpret_cast<void*>(const_cast<char*>(typeid(fnobjT).name()));
692  id.second = 2ul;
693  }
694 
695  private:
696 
697  virtual void get_id(std::pair<void*,unsigned short>& id) const {
698  id.first = NULL;
699  id.second = 0ul;
700  }
701 
703  bool run_multi_threaded() {
704 #ifdef HAVE_INTEL_TBB
705  MADNESS_EXCEPTION("run_multi_threaded should not be called when using Intel TBB", 1);
706 #else
707  // As a thread enters this routine it increments the shared counter
708  // to generate a unique id without needing any thread-local storage.
709  // A downside is this does not preserve any relationships between thread
710  // numbering and the architecture ... more work ahead.
711  int nthread = get_nthread();
712  if (nthread == 1) {
713 #ifdef MADNESS_TASK_PROFILING
714  task_event_->start(id_, nthread, submit_time_);
715 #endif // MADNESS_TASK_PROFILING
716  TAU_START("PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
717  run(TaskThreadEnv(1,0,0));
718  TAU_STOP("PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
719 #ifdef MADNESS_TASK_PROFILING
720  task_event_->stop();
721 #endif // MADNESS_TASK_PROFILING
722  return true;
723  }
724  else {
725  int id = count++;
726  volatile bool barrier_flag;
727  barrier->register_thread(id, &barrier_flag);
728 
729 #ifdef MADNESS_TASK_PROFILING
730  if(id == 0)
731  task_event_->start(id_, nthread, submit_time_);
732 #endif // MADNESS_TASK_PROFILING
733 
734  TAU_START("PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
735  run(TaskThreadEnv(nthread, id, barrier));
736  TAU_STOP("PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
737 
738 #ifdef MADNESS_TASK_PROFILING
739  const bool cleanup = barrier->enter(id);
740  if(cleanup) task_event_->stop();
741  return cleanup;
742 #else
743  return barrier->enter(id);
744 #endif // MADNESS_TASK_PROFILING
745  }
746 #endif // HAVE_INTEL_TBB
747  }
748 
749  public:
751  : TaskAttributes()
752  , barrier(0)
753  {
754  count = 0;
755  }
756 
757  explicit PoolTaskInterface(const TaskAttributes& attr)
758  : TaskAttributes(attr)
759  , barrier(attr.get_nthread()>1 ? new Barrier(attr.get_nthread()) : 0)
760  {
761  count = 0;
762  }
763 
765 
769  void set_nthread(int nthread) {
770  if (nthread != get_nthread()) {
772  delete barrier;
773  if (nthread > 1)
774  barrier = new Barrier(nthread);
775  else
776  barrier = 0;
777  }
778  }
779 
780 #if HAVE_INTEL_TBB
781  tbb::task* execute() {
782  int nthread = get_nthread();
783  int id = count++;
784 // volatile bool barrier_flag;
785 // barrier->register_thread(id, &barrier_flag);
786 
787  run( TaskThreadEnv(nthread, id) );
788  return NULL;
789  }
790 
791  static inline void * operator new(std::size_t size) throw(std::bad_alloc);
792 
793 #endif // HAVE_INTEL_TBB
794 
796  static inline void operator delete(void* p, std::size_t size) throw() {
797  if(p != NULL) {
798 #ifdef HAVE_INTEL_TBB
799  tbb::task::destroy(*reinterpret_cast<tbb::task*>(p));
800 #else
801  ::operator delete(p);
802 #endif // HAVE_INTEL_TBB
803  }
804  }
805 
807 
814  virtual void run(const TaskThreadEnv& info) = 0;
815 
816  virtual ~PoolTaskInterface() {
817  delete barrier;
818  }
819  };
820 
823  public:
824  void run(const TaskThreadEnv& /*info*/) {}
825  virtual ~PoolTaskNull() {}
826  private:
827  virtual void get_id(std::pair<void*,unsigned short>& id) const {
829  }
830  };
831 
833 
836  class ThreadPoolThread : public Thread {
837  private:
838  // Thread local data for thread pool
839 #ifdef MADNESS_TASK_PROFILING
840  profiling::TaskProfiler profiler_;
841 #endif // MADNESS_TASK_PROFILING
842 
843  public:
846 
848  virtual ~ThreadPoolThread() { }
849 
850 #ifdef MADNESS_TASK_PROFILING
851  profiling::TaskProfiler& profiler() { return profiler_; }
853 #endif // MADNESS_TASK_PROFILING
854  };
855 
857 
859  class ThreadPool {
860  private:
861  friend class WorldTaskQueue;
862 
863  // Thread pool data
864  ThreadPoolThread *threads;
865  ThreadPoolThread main_thread;
867  int nthreads;
868  volatile bool finish;
869  AtomicInt nfinished;
870 
871  // Static data
872  static ThreadPool* instance_ptr;
873 #ifdef __bgq__
874 #warning WE NEED TO TUNE THE nmax PARAMETER
875 #endif
876  static const int nmax=128; // WAS 100 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! DEBUG
877  static double await_timeout;
878 
879 #if defined(HAVE_IBMBGQ) and defined(HPM)
880  static unsigned int main_hpmctx; // HPM context for main thread
881 #endif
882  ThreadPool(int nthread=-1);
884 
885  ThreadPool(const ThreadPool&); // Verboten
886  void operator=(const ThreadPool&); // Verboten
887 
889  int default_nthread();
890 
892  bool run_task(bool wait, ThreadPoolThread* this_thread) {
893 #if HAVE_INTEL_TBB
894  MADNESS_EXCEPTION("run_task should not be called when using Intel TBB", 1);
895 #else
896 
897  if (!wait && queue.empty()) return false;
898  std::pair<PoolTaskInterface*,bool> t = queue.pop_front(wait);
899 #ifdef MADNESS_TASK_PROFILING
900  profiling::TaskEventList* event_list =
901  this_thread->profiler().new_list(1);
902 #endif // MADNESS_TASK_PROFILING
903  // Task pointer might be zero due to stealing
904  if (t.second && t.first) {
905 #ifdef MADNESS_TASK_PROFILING
906  t.first->set_event(event_list->event());
907 #endif // MADNESS_TASK_PROFILING
908  if (t.first->run_multi_threaded()) // What we are here to do
909  delete t.first;
910  }
911  return t.second;
912 #endif
913  }
914 
915  bool run_tasks(bool wait, ThreadPoolThread* this_thread) {
916 #if HAVE_INTEL_TBB
917 // if (!wait && tbb_task_list->empty()) return false;
918 // tbb::task* t = &tbb_task_list->pop_front();
919 // if (t) {
920 // tbb_parent_task->increment_ref_count();
921 // tbb_parent_task->enqueue(*t);
922 // }
923 
924 // wait = (tbb_parent_task->ref_count() >= 1) ? false : true;
925 // return wait;
926 
927  MADNESS_EXCEPTION("run_tasks should not be called when using Intel TBB", 1);
928 #else
929 
930  PoolTaskInterface* taskbuf[nmax];
931  int ntask = queue.pop_front(nmax, taskbuf, wait);
932 #ifdef MADNESS_TASK_PROFILING
933  profiling::TaskEventList* event_list =
934  this_thread->profiler().new_list(ntask);
935 #endif // MADNESS_TASK_PROFILING
936  for (int i=0; i<ntask; ++i) {
937  if (taskbuf[i]) { // Task pointer might be zero due to stealing
938 #ifdef MADNESS_TASK_PROFILING
939  taskbuf[i]->set_event(event_list->event());
940 #endif // MADNESS_TASK_PROFILING
941  if (taskbuf[i]->run_multi_threaded()) {
942  delete taskbuf[i];
943  }
944  }
945  }
946  return (ntask>0);
947 #endif
948  }
949 
950  void thread_main(ThreadPoolThread* const thread);
951 
953  static void* pool_thread_main(void *v);
954 
955 
957  static ThreadPool* instance() {
958 #ifndef MADNESS_ASSERTIONS_DISABLE
959  if(! instance_ptr) {
960  std::cerr << "!!! ERROR: The thread pool has not been initialized.\n"
961  << "!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
962  MADNESS_EXCEPTION("ThreadPool::instance_ptr is NULL", 0);
963  }
964 #endif
965  return instance_ptr;
966  }
967 
968 
969  public:
970 
971 #if HAVE_INTEL_TBB
972  // all tasks run as children of tbb_parent_task
973  // be sure to allocate tasks with tbb_parent_task->allocate_child()
974  static tbb::empty_task* tbb_parent_task;
975  static tbb::task_scheduler_init* tbb_scheduler;
976 #endif
977 
979  static void begin(int nthread=-1);
980 
981  static void end();
982 
984  static void add(PoolTaskInterface* task) {
985 #ifdef MADNESS_TASK_PROFILING
986  task->submit();
987 #endif // MADNESS_TASK_PROFILING
988 #if HAVE_INTEL_TBB
989  ThreadPool::tbb_parent_task->increment_ref_count();
990  if (task->is_high_priority()) {
991  ThreadPool::tbb_parent_task->spawn(*task);
992  }
993  else {
994  ThreadPool::tbb_parent_task->enqueue(*task);
995  }
996 #else
997  if (!task) MADNESS_EXCEPTION("ThreadPool: inserting a NULL task pointer", 1);
998  int task_threads = task->get_nthread();
999  // Currently multithreaded tasks must be shoved on the end of the q
1000  // to avoid a race condition as multithreaded task is starting up
1001  if (task->is_high_priority() && (task_threads == 1)) {
1002  instance()->queue.push_front(task);
1003  }
1004  else {
1005  instance()->queue.push_back(task, task_threads);
1006  }
1007 #endif // HAVE_INTEL_TBB
1008  }
1009 
1010  template <typename opT>
1011  void scan(opT& op) {
1012  queue.scan(op);
1013  }
1014 
1015 
1017  static void add(const std::vector<PoolTaskInterface*>& tasks) {
1018 #if HAVE_INTEL_TBB
1019  MADNESS_EXCEPTION("Do not add tasks to the madness task queue when using Intel TBB.", 1);
1020 #else
1021  typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1022  for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1023  add(*it);
1024  }
1025 #endif
1026  }
1027 
1029 
1031  static bool run_task() {
1032 #ifdef MADNESS_TASK_PROFILING
1033  return instance()->run_tasks(false, static_cast<ThreadPoolThread*>(ThreadBase::this_thread()));
1034 #else
1035  return instance()->run_tasks(false, NULL);
1036 #endif // MADNESS_TASK_PROFILING
1037  }
1038 
1040  static std::size_t size() {
1041  return instance()->nthreads;
1042  }
1043 
1045  static std::size_t queue_size() {
1046  return instance()->queue.size();
1047  }
1048 
1050  static const DQStats& get_stats();
1051 
1053 
1055  template <typename Probe>
1056  static void await(const Probe& probe, bool dowork = true) {
1057  double start = cpu_time();
1058  const double timeout = await_timeout;
1059  int counter = 0;
1060 
1061  MutexWaiter waiter;
1062  while (!probe()) {
1063 
1064 #if HAVE_INTEL_TBB
1065  // TODO: Enable busy waiting with TBB.
1066  const bool working = false;
1067 #else
1068  const bool working = (dowork ? ThreadPool::run_task() : false);
1069 #endif // HAVE_INTEL_TBB
1070  const double current_time = cpu_time();
1071 
1072  if (working) {
1073  // Reset timeout logic
1074  waiter.reset();
1075  start = current_time;
1076  counter = 0;
1077  } else {
1078  // Check for timeout
1079  if(((current_time - start) > timeout) && (timeout > 1.0)) {
1080  std::cout << "!!MADNESS: Hung queue?\n";
1081  std::cout.flush();
1082 
1083  if(counter++ > 3)
1084  throw madness::MadnessException("ThreadPool::await() timeout",
1085  0, 1, __LINE__, __FUNCTION__, __FILE__);
1086  }
1087 
1088  waiter.wait();
1089  }
1090  }
1091  }
1092 
1094 #if HAVE_INTEL_TBB
1095  tbb_parent_task->decrement_ref_count();
1096  tbb::task::destroy(*tbb_parent_task);
1097  tbb_scheduler->terminate();
1098  delete(tbb_scheduler);
1099 #endif
1100  }
1101  };
1102 
1103 #ifdef HAVE_INTEL_TBB
1104  inline void * PoolTaskInterface::operator new(std::size_t size) throw(std::bad_alloc)
1105  {
1106  if(! ThreadPool::tbb_parent_task) {
1107  std::cerr << "!!! Error: Cannot allocate task object because the thread pool has not been initialized.\n";
1108  throw std::bad_alloc();
1109  }
1110  return ::operator new(size, ThreadPool::tbb_parent_task->allocate_child());
1111  }
1112 #endif // HAVE_INTEL_TBB
1113 
1114 }
1115 
1116 #endif // MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
bool is_stealable() const
Definition: worldthread.h:224
int id() const
Definition: worldthread.h:309
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Definition: worldthread.h:295
static void set_affinity_pattern(const bool bind[3], const int cpu[3])
Specify the affinity pattern or how to bind threads to cpus.
Definition: worldthread.cc:206
void error(const char *msg)
Definition: world.cc:128
int main(int argc, char **argv)
Definition: DFcode/mcpfit.cc:983
void set_nthread(int nthread)
Are you sure this is what you want to call?
Definition: worldthread.h:256
ThreadBase()
Default constructor ... must invoke start() to actually begin the thread.
Definition: worldthread.h:126
void start(void *(*f)(void *), void *args=0)
Definition: worldthread.h:183
static ThreadBase * this_thread()
Definition: worldthread.h:157
static TaskAttributes hipri()
Definition: worldthread.h:277
PoolTaskInterface()
Definition: worldthread.h:750
A no-op task used for various purposes.
Definition: worldthread.h:822
void start()
Start the thread running.
Definition: worldthread.cc:156
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero...
Definition: dqueue.h:210
~ThreadPool()
Definition: worldthread.h:1093
static std::size_t queue_size()
Returns number of tasks in the queue.
Definition: worldthread.h:1045
PoolTaskInterface(const TaskAttributes &attr)
Definition: worldthread.h:757
virtual ~PoolTaskNull()
Definition: worldthread.h:825
static void begin(int nthread=-1)
Please invoke while in single threaded environment.
Definition: worldthread.cc:443
Used to pass info about thread environment into users task.
Definition: worldthread.h:289
::std::string string
Definition: gtest-port.h:872
const pthread_t & get_id() const
Get the pthread id of this thread (if running)
Definition: worldthread.h:140
TaskAttributes(const TaskAttributes &attr)
Definition: worldthread.h:218
bool barrier() const
Definition: worldthread.h:311
Grossly simplified Boost-like type traits and templates.
Simplified thread wrapper to hide pthread complexity.
Definition: worldthread.h:167
void scan(opT &op)
Definition: worldthread.h:1011
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition: worldmutex.h:651
bool empty() const
Definition: dqueue.h:281
int get_nthread() const
Definition: worldthread.h:261
Definition: dqueue.h:51
Definition: worldmutex.h:72
void wait()
Definition: worldmutex.cc:43
static void await(const Probe &probe, bool dowork=true)
Gracefully wait for a condition to become true ... executes tasks if any in queue.
Definition: worldthread.h:1056
static const unsigned long STEALABLE
Definition: worldthread.h:213
virtual void run()=0
You implement this to do useful work.
void run(const TaskThreadEnv &)
Override this method to implement a multi-threaded task.
Definition: worldthread.h:824
Thread(void *(*f)(void *), void *args=0)
Create a thread and start it running f(args)
Definition: worldthread.h:178
Multi-threaded queue to manage and run tasks.
Definition: worldtask.h:393
Lowest level task interface.
Definition: worldthread.h:636
void scan(opT &op)
Definition: dqueue.h:188
virtual ~Thread()
Definition: worldthread.h:189
static const unsigned long GENERATOR
Definition: worldthread.h:212
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
ThreadPool thread object.
Definition: worldthread.h:836
virtual ~ThreadPoolThread()
Virtual destructor.
Definition: worldthread.h:848
static disable_if_c< detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value >::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition: worldthread.h:690
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition: worldthread.h:984
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:72
int cancel() const
Cancel this thread.
Definition: worldthread.h:146
static void exit()
A thread can call this to terminate its execution.
Definition: worldthread.h:137
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition: worldthread.h:1017
A singleton pool of threads for dynamic execution of tasks.
Definition: worldthread.h:859
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
void serialize(Archive &ar)
Definition: worldthread.h:269
Implements DQueue.
void reset()
Definition: worldmutex.h:87
void set_highpriority(bool hipri)
Definition: worldthread.h:240
bool is_high_priority() const
Definition: worldthread.h:226
static std::size_t size()
Returns number of threads in the pool.
Definition: worldthread.h:1040
An integer with atomic set, get, read+inc, read+dec, dec+test operations.
Definition: atomicint.h:73
static const unsigned long NTHREAD
Definition: worldthread.h:211
static TaskAttributes generator()
Definition: worldthread.h:273
int nthread() const
Definition: worldthread.h:307
Definition: worldmutex.h:622
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition: worldthread.h:1031
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition: worldmutex.h:640
int get_pool_thread_index() const
Get index of thread in ThreadPool (0,...,nthread-1) or -1 if not in ThreadPool.
Definition: worldthread.h:143
virtual ~PoolTaskInterface()
Definition: worldthread.h:816
void set_stealable(bool stealable)
Definition: worldthread.h:235
TaskAttributes(unsigned long flags=0)
Definition: worldthread.h:216
virtual ~ThreadBase()
Definition: worldthread.h:128
virtual ~TaskAttributes()
Definition: worldthread.h:220
Contains attributes of a task.
Definition: worldthread.h:208
ThreadPoolThread()
Default constructor.
Definition: worldthread.h:845
Thread()
Default constructor ... must invoke start() to actually begin the thread.
Definition: worldthread.h:175
std::ostream & operator<<(std::ostream &s, const ContractedGaussianShell &c)
Definition: chem/molecularbasis.cc:38
static enable_if_c< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: worldthread.h:680
static void set_affinity(int logical_id, int ind=-1)
Definition: worldthread.cc:224
static void end()
Definition: worldthread.cc:498
Simplified thread wrapper to hide pthread complexity.
Definition: worldthread.h:91
disable_if from Boost for conditionally instantiating templates based on type
Definition: enable_if.h:68
static const DQStats & get_stats()
Returns queue statistics.
Definition: worldthread.cc:522
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: worldthread.h:769
#define MADNESS_EXCEPTION(msg, value)
Definition: worldexc.h:88
#define TAU_STOP(a)
Definition: TAU.h:7
void set_generator(bool generator_hint)
Definition: worldthread.h:228
static int num_hw_processors()
Get no. of actual hardware processors.
Definition: worldthread.cc:172
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
bool is_generator() const
Definition: worldthread.h:222
double wall_time()
Returns the wall time in seconds relative to arbitrary origin.
Definition: world.cc:248
static const unsigned long HIGHPRIORITY
Definition: worldthread.h:214
#define TAU_START(a)
Definition: TAU.h:6
Most exceptions thrown in MADNESS should be derived from these.
Definition: worldexc.h:53
static TaskAttributes multi_threaded(int nthread)
Definition: worldthread.h:281