MADNESS  version 0.9
worldrmi.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31  $Id$
32 */
33 #ifndef MADNESS_WORLD_WORLDRMI_H__INCLUDED
34 #define MADNESS_WORLD_WORLDRMI_H__INCLUDED
35 
36 #include <madness/world/safempi.h>
39 #include <utility>
40 #include <list>
41 
42 /*
43  There is just one server thread and it is the only one
44  messing with the recv buffers, so there is no need for
45  mutex on recv related data.
46 
47  Multiple threads (including the server) may send hence
48  we need to be careful about send-related data.
49 
50  When MPI is initialized we need to use init_thread with
51  multiple required.
52 
53  This RMI service operates only in COMM_WORLD. It easy enough
54  to extend to other communicators but the point is to have
55  only one server thread for all possible uses. You just
56  have to translate rank_in_comm into rank_in_world by
57  getting the groups from both communicators using
58  MPI_Comm_group and then creating a map from ranks in
59  comm to ranks in world using MPI_Group_translate_ranks.
60 
61  The class is a singleton ... i.e., there is only one instance of it
62  that is made the first time that you call RMI::instance().
63 
64  Handler routines should have this type
65 
66  typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
67 
68  There are few user accessible routines.
69 
70  RMI::Request RMI::isend(const void* buf, size_t nbyte, int dest,
71  rmi_handlerT func, unsigned int attr=0)
72  - to send an asynchronous message
73  - RMI::Request has the same interface as SafeMPI::Request
74  (right now it is a SafeMPI::Request but this is not guaranteed)
75 
76  void RMI::begin()
77  - to start the server thread
78 
79  void RMI::end()
80  - to terminate the server thread
81 
82  bool RMI::get_debug()
83  - to get the debug flag
84 
85  void RMI::set_debug(bool)
86  - to set the debug flag
87 
88 */
89 
90 namespace madness {
91 
92  // This is the generic low-level interface for a message handler
93  typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
94 
95  struct qmsg {
96  typedef uint16_t counterT;
97  typedef uint32_t attrT;
98  size_t len;
100  int i; // buffer index
102  attrT attr;
103  counterT count;
104 
105  qmsg(size_t len, rmi_handlerT func, int i, int src, attrT attr, counterT count)
106  : len(len), func(func), i(i), src(src), attr(attr), count(count) {}
107 
108  bool operator<(const qmsg& other) const {
109  return count < other.count;
110  }
111 
112  qmsg() {}
113  }; // struct qmsg
114 
115 
116  // Holds message passing statistics
117  struct RMIStats {
118  uint64_t nmsg_sent;
119  uint64_t nbyte_sent;
120  uint64_t nmsg_recv;
121  uint64_t nbyte_recv;
122 
124  : nmsg_sent(0), nbyte_sent(0), nmsg_recv(0), nbyte_recv(0) {}
125  };
126 
127  class RMI {
128  typedef uint16_t counterT;
129  typedef uint32_t attrT;
130  public:
131 
133 
134  // Choose header length to hold at least sizeof(header) and
135  // also to ensure good alignment of the user payload.
136  static const size_t ALIGNMENT = 64;
137  static const size_t HEADER_LEN = ALIGNMENT;
138  static const attrT ATTR_UNORDERED=0x0;
139  static const attrT ATTR_ORDERED=0x1;
140 
141 
142  private:
143 
144  class RmiTask
145 #if HAVE_INTEL_TBB
146  : public tbb::task, private madness::Mutex
147 #else
148  : public madness::ThreadBase, private madness::Mutex
149 #endif // HAVE_INTEL_TBB
150  {
151  public:
152 
153  struct header {
155  attrT attr;
156  }; // struct header
157 
158  std::list< std::pair<int,size_t> > hugeq; // q for incoming huge messages
159 
160  SafeMPI::Intracomm comm;
161  const int nproc; // No. of processes in comm world
162  const ProcessID rank; // Rank of this process
163  volatile bool finished; // True if finished
164 
165  ScopedArray<volatile counterT> send_counters;
166  ScopedArray<counterT> recv_counters;
167  std::size_t max_msg_len_;
168  std::size_t nrecv_;
169  std::size_t maxq_;
170  ScopedArray<void*> recv_buf; // Will be at least ALIGNMENT aligned ... +1 for huge messages
172 
174  ScopedArray<int> ind;
176  int n_in_q;
177 
178  static inline bool is_ordered(attrT attr) { return attr & ATTR_ORDERED; }
179 
180  void process_some();
181 
182  RmiTask();
183  virtual ~RmiTask();
184 
185 #if HAVE_INTEL_TBB
186  tbb::task* execute() {
187  // Process some messages
188  process_some();
189  if(! finished) {
190  tbb::task::increment_ref_count();
191  tbb::task::recycle_as_safe_continuation();
192  }
193  return NULL;
194  }
195 #else
196  void run() {
197  try {
198  while (! finished) process_some();
199  } catch(...) {
200  delete this;
201  throw;
202  }
203  delete this;
204  }
205 #endif // HAVE_INTEL_TBB
206 
207  void exit() {
208  if (debugging)
209  std::cerr << rank << ":RMI: sending exit request to server thread" << std::endl;
210 
211  // Set finished flag
212  finished = true;
213  myusleep(10000);
214  }
215 
216  static void huge_msg_handler(void *buf, size_t nbytein);
217 
218  Request isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, attrT attr);
219 
220  void post_pending_huge_msg();
221 
222  void post_recv_buf(int i);
223 
224  }; // class RmiTask
225 
226 #if HAVE_INTEL_TBB
227  static tbb::task* tbb_rmi_parent_task;
228 #endif // HAVE_INTEL_TBB
229 
230  static RmiTask* task_ptr; // Pointer to the singleton instance
231  static RMIStats stats;
232  static volatile bool debugging; // True if debugging
233 
234  static const size_t DEFAULT_MAX_MSG_LEN = 3*512*1024;
235 #ifdef HAVE_CRAYXT
236  static const int DEFAULT_NRECV=128;
237 #else
238  static const int DEFAULT_NRECV=32;
239 #endif
240 
241  // Not allowed
242  RMI(const RMI&);
243  RMI& operator=(const RMI&);
244 
245  public:
246 
247  static std::size_t max_msg_len() {
248  return (task_ptr ? task_ptr->max_msg_len_ : DEFAULT_MAX_MSG_LEN);
249  }
250  static std::size_t maxq() {
251  MADNESS_ASSERT(task_ptr);
252  return task_ptr->maxq_;
253  }
254  static std::size_t nrecv() {
255  MADNESS_ASSERT(task_ptr);
256  return task_ptr->nrecv_;
257  }
258 
259  static Request
260  isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED) {
261  if(!task_ptr) {
262  std::cerr <<
263  "!! MADNESS RMI error: Attempting to send a message when the RMI thread is not running\n"
264  "!! MADNESS RMI error: This typically occurs when an active message is sent or a remote task is spawned after calling madness::finalize()\n";
265  MADNESS_EXCEPTION("!! MADNESS error: The RMI thread is not running", (task_ptr != NULL));
266  }
267  return task_ptr->isend(buf, nbyte, dest, func, attr);
268  }
269 
270  static void begin() {
271  MADNESS_ASSERT(task_ptr == NULL);
272 #if HAVE_INTEL_TBB
273  tbb_rmi_parent_task = new( tbb::task::allocate_root() ) tbb::empty_task;
274  tbb_rmi_parent_task->set_ref_count(2);
275 
276  task_ptr = new( tbb_rmi_parent_task->allocate_child() ) RmiTask();
277  tbb::task::enqueue(*task_ptr, tbb::priority_high);
278 #else
279  task_ptr = new RmiTask();
280  task_ptr->start();
281 #endif // HAVE_INTEL_TBB
282  }
283 
284  static void end() {
285  if(task_ptr) {
286  task_ptr->exit();
287 #if HAVE_INTEL_TBB
288  tbb_rmi_parent_task->wait_for_all();
289  tbb::task::destroy(*tbb_rmi_parent_task);
290 #endif // HAVE_INTEL_TBB
291  task_ptr = NULL;
292  }
293  }
294 
295  static void set_debug(bool status) { debugging = status; }
296 
297  static bool get_debug() { return debugging; }
298 
299  static const RMIStats& get_stats() { return stats; }
300  }; // class RMI
301 
302 } // namespace madness
303 
304 #endif // MADNESS_WORLD_WORLDRMI_H__INCLUDED
ProcessID src
Definition: worldrmi.h:101
Serializes calls to MPI in case it does not support THREAD_MULTIPLE.
Mutex using pthread mutex operations.
Definition: worldmutex.h:94
static void begin()
Definition: worldrmi.h:270
int i
Definition: worldrmi.h:100
uint64_t nmsg_sent
Definition: worldrmi.h:118
std::complex< double > func(int n, int t1, int t2, int t3, double xx, double yy, double zz)
Definition: wannier.cc:98
static const attrT ATTR_UNORDERED
Definition: worldrmi.h:138
RMIStats()
Definition: worldrmi.h:123
SafeMPI::Request Request
Definition: worldrmi.h:132
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy...
Definition: safempi.h:437
static bool get_debug()
Definition: worldrmi.h:297
uint64_t nbyte_sent
Definition: worldrmi.h:119
uint64_t nbyte_recv
Definition: worldrmi.h:121
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Definition: worldrmi.h:260
static std::size_t nrecv()
Definition: worldrmi.h:254
Definition: worldrmi.h:153
Definition: worldrmi.h:117
size_t len
Definition: worldrmi.h:98
uint64_t nmsg_recv
Definition: worldrmi.h:120
qmsg()
Definition: worldrmi.h:112
static const attrT ATTR_ORDERED
Definition: worldrmi.h:139
Implements Dqueue, Thread, ThreadBase and ThreadPool.
static std::size_t maxq()
Definition: worldrmi.h:250
Definition: worldrmi.h:95
uint32_t attrT
Definition: worldrmi.h:97
static const RMIStats & get_stats()
Definition: worldrmi.h:299
void(* rmi_handlerT)(void *buf, size_t nbyte)
Definition: worldrmi.h:93
static std::size_t max_msg_len()
Definition: worldrmi.h:247
static const size_t HEADER_LEN
Definition: worldrmi.h:137
uint16_t counterT
Definition: worldrmi.h:96
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
attrT attr
Definition: worldrmi.h:155
static void set_debug(bool status)
Definition: worldrmi.h:295
Definition: safempi.h:243
rmi_handlerT func
Definition: worldrmi.h:154
counterT count
Definition: worldrmi.h:103
qmsg(size_t len, rmi_handlerT func, int i, int src, attrT attr, counterT count)
Definition: worldrmi.h:105
#define HAVE_INTEL_TBB
Definition: config.h:65
static const size_t ALIGNMENT
Definition: worldrmi.h:136
Simplified thread wrapper to hide pthread complexity.
Definition: worldthread.h:91
#define MADNESS_EXCEPTION(msg, value)
Definition: worldexc.h:88
Definition: worldrmi.h:127
static void end()
Definition: worldrmi.h:284
attrT attr
Definition: worldrmi.h:102
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
bool operator<(const qmsg &other) const
Definition: worldrmi.h:108
rmi_handlerT func
Definition: worldrmi.h:99