MADNESS  version 0.9
worldam.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31 
32  $Id$
33  */
34 
35 #ifndef MADNESS_WORLD_WORLDAM_H__INCLUDED
36 #define MADNESS_WORLD_WORLDAM_H__INCLUDED
37 
40 
41 #include <madness/world/bufar.h>
42 #include <madness/world/worldrmi.h>
43 #include <madness/world/worldfwd.h>
44 #include <vector>
45 #include <cstddef>
46 
47 namespace madness {
48 
49  /*
50  The RMI layer just does transport and does not know about World
51  or even necessarily about MPI. It also has no buffering or
52  virtualization of resources. In particular, we must be careful
53  about having too many outstanding sends and active message
54  handlers must be careful about what they do in order to avoid
55  deadlock --- especially problematic is a handler trying to send
56  a message.
57 
58  The WorldAM class provides a World-aware RMI capability that
59  limits the number of outstanding sends and can optionally manage
60  buffers. The issue of what handlers can do safely is handled by
61  the integration of WorldAM and the task queues ... if you have
62  an operation that might
63 
64  - send messages
65 
66  - take a long time
67 
68  - consume a lot of stack/heap (e.g., recursive algorithm)
69 
70  then the right thing to do is to send a task rather than
71  an active message.
72  */
73 
74  template <class Derived> class WorldObject;
75 
76  class AmArg;
78  typedef void (*am_handlerT)(const AmArg&);
79 
81  class AmArg {
82  private:
83  friend class WorldAmInterface;
84  template <class Derived> friend class WorldObject;
85 
86  friend AmArg* alloc_am_arg(std::size_t nbyte);
87 
88  unsigned char header[RMI::HEADER_LEN]; // !!!!!!!!! MUST BE FIRST !!!!!!!!!!
89  std::size_t nbyte; // Size of user payload
90  unsigned long worldid; // Id of associated world
91  am_handlerT func; // User function to call
92  ProcessID src; // Rank of process sending the message
93  unsigned int flags; // Misc. bit flags
94 
95  // On 32 bit machine AmArg is HEADER_LEN+4+4+4+4+4=84 bytes
96  // On 64 bit machine AmArg is HEADER_LEN+8+8+8+4+4=96 bytes
97 
98  // No copy constructor or assignment
99  AmArg(const AmArg&);
100  AmArg& operator=(const AmArg&);
101 
102  void set_src(ProcessID source) { src = source; }
103 
104  void set_worldid(unsigned long id) { worldid = id; }
105 
106  void set_func(am_handlerT handler) {
107  MADNESS_ASSERT(handler);
108  func = handler;
109  }
110 
111  void set_size(std::size_t numbyte) { nbyte = numbyte; }
112 
113  void set_pending() { flags |= 0x1ul; }
114 
115  bool is_pending() const { return flags & 0x1ul; }
116 
117  void clear_flags() { flags = 0; }
118 
119  am_handlerT get_func() const { return func; }
120 
121  archive::BufferInputArchive make_input_arch() const {
122  return archive::BufferInputArchive(buf(),size());
123  }
124 
125  archive::BufferOutputArchive make_output_arch() const {
126  return archive::BufferOutputArchive(buf(),size());
127  }
128 
129  public:
130  AmArg() {}
131 
133  unsigned char* buf() const { return (unsigned char*)(this) + sizeof(AmArg); }
134 
136  std::size_t size() const { return nbyte; }
137 
139  template <typename T>
141  return make_input_arch() & t;
142  }
143 
145  template <typename T>
147  return make_output_arch() & t;
148  }
149 
151  ProcessID get_src() const { return src; }
152 
153  // This is not inline in order to keep World opaque.
155  World* get_world() const { return World::world_from_id(worldid); }
156 
158  unsigned long get_worldid() const { return worldid; }
159  };
160 
161 
163  inline AmArg* alloc_am_arg(std::size_t nbyte) {
164  std::size_t narg = 1 + (nbyte+sizeof(AmArg)-1)/sizeof(AmArg);
165  AmArg *arg = new AmArg[narg];
166  arg->set_size(nbyte);
167  return arg;
168  }
169 
170 
171  inline AmArg* copy_am_arg(const AmArg& arg) {
172  AmArg* r = alloc_am_arg(arg.size());
173  memcpy(r, &arg, arg.size()+sizeof(AmArg));
174  return r;
175  }
176 
178  inline void free_am_arg(AmArg* arg) {
179  delete [] arg;
180  }
181 
182 
184  template <typename A, typename B, typename C, typename D, typename E, typename F, typename G, typename H, typename I, typename J>
185  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e, const F& f, const G& g, const H& h,
186  const I& i, const J& j) {
187  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,d,e,f,g,h,i,j));
188  *arg & a & b & c & d & e & f & g & h & i & j;
189  return arg;
190  }
191 
193  template <typename A, typename B, typename C, typename D, typename E, typename F, typename G, typename H, typename I>
194  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e, const F& f, const G& g, const H& h,
195  const I& i) {
196  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,d,e,f,g,h,i));
197  *arg & a & b & c & d & e & f & g & h & i;
198  return arg;
199  }
200 
202  template <typename A, typename B, typename C, typename D, typename E, typename F, typename G, typename H>
203  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e, const F& f, const G& g, const H& h) {
204  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,d,e,f,g,h));
205  *arg & a & b & c & d & e & f & g & h;
206  return arg;
207  }
208 
210  template <typename A, typename B, typename C, typename D, typename E, typename F, typename G>
211  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e, const F& f, const G& g) {
212  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,d,e,f,g));
213  *arg & a & b & c & d & e & f & g;
214  return arg;
215  }
216 
218  template <typename A, typename B, typename C, typename D, typename E, typename F>
219  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e, const F& f) {
220  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,e,d,f));
221  *arg & a & b & c & d & e & f;
222  return arg;
223  }
224 
226  template <typename A, typename B, typename C, typename D, typename E>
227  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d, const E& e) {
228  AmArg* arg = alloc_am_arg(archive::bufar_size(a,b,c,d,e));
229  *arg & a & b & c & d & e;
230  return arg;
231  }
232 
234  template <typename A, typename B, typename C, typename D>
235  inline AmArg* new_am_arg(const A& a, const B& b, const C& c, const D& d) {
237  *arg & a & b & c & d;
238  return arg;
239  }
240 
242  template <typename A, typename B, typename C>
243  inline AmArg* new_am_arg(const A& a, const B& b, const C& c) {
245  *arg & a & b & c;
246  return arg;
247  }
248 
250  template <typename A, typename B>
251  inline AmArg* new_am_arg(const A& a, const B& b) {
253  *arg & a & b;
254  return arg;
255  }
256 
258  template <typename A>
259  inline AmArg* new_am_arg(const A& a) {
261  *arg & a;
262  return arg;
263  }
264 
267  friend class WorldGopInterface;
268  friend class World;
269  public:
270  const int msg_len;
271  private:
272 
273 #ifdef HAVE_CRAYXT
274  static const int DEFAULT_NSEND = 512;
275 #else
276  static const int DEFAULT_NSEND = 32;
277 #endif
278 
279  // Multiple threads are making their way thru here ... must be careful
280  // to ensure updates are atomic and consistent
281 
282  int nsend;
283  ScopedArray<AmArg* volatile> managed_send_buf;
284  ScopedArray<RMI::Request> send_req;
285 
286  unsigned long worldid;
287  const ProcessID rank;
288  const int nproc;
289  volatile int cur_msg;
290  volatile unsigned long nsent;
291  volatile unsigned long nrecv;
292 
293  std::vector<int> map_to_comm_world;
294 
295  void free_managed_send_buf(int i) {
296  // WE ASSUME WE ARE INSIDE A CRITICAL SECTION WHEN IN HERE
297  if (managed_send_buf[i]) {
298  free_am_arg(managed_send_buf[i]);
299  managed_send_buf[i] = 0;
300  }
301  }
302 
304  int get_free_send_request() {
305  // WE ASSUME WE ARE INSIDE A CRITICAL SECTION WHEN IN HERE
306 // // Sequentially loop looking for next free request.
307 // while (!send_req[cur_msg].Test()) {
308 // cur_msg++;
309 // if (cur_msg >= NSEND) cur_msg = 0;
310 // myusleep(5);
311 // }
312 
313  // Wait for oldest request to complete
314  while (!send_req[cur_msg].Test()) {
315  // If the oldest message has still not completed then there is likely
316  // severe network or end-point congestion, so pause for 100us in a rather
317  // arbitrary attempt to decrease the injection rate. The server thread
318  // is still polling every 1us (which is required to suck data off the net
319  // and by some engines to ensure progress on sends).
320  myusleep(100);
321  }
322 
323  free_managed_send_buf(cur_msg);
324  int result = cur_msg;
325  cur_msg++;
326  if (cur_msg >= nsend) cur_msg = 0;
327 
328  return result;
329  }
330 
332  static void handler(void *buf, std::size_t nbyte) {
333  // It will be singled threaded since only the RMI receiver
334  // thread will invoke it ... however note that nrecv will
335  // be read by the main thread during fence operations.
336  AmArg* arg = static_cast<AmArg*>(buf);
337  am_handlerT func = arg->get_func();
338  World* w = arg->get_world();
339  MADNESS_ASSERT(arg->size() + sizeof(AmArg) == nbyte);
340  MADNESS_ASSERT(w);
341  MADNESS_ASSERT(func);
342  func(*arg);
343  //world->am.nrecv++; // Must be AFTER execution of the function
344  w->am.nrecv++; // Must be AFTER execution of the function
345  }
346 
348  RMI::Request isend(ProcessID dest, am_handlerT op, const AmArg* arg, int attr, bool managed) {
349  {
350  AmArg* argx = const_cast<AmArg*>(arg);
351 
352  argx->set_worldid(worldid);
353  argx->set_src(rank);
354  argx->set_func(op);
355  argx->clear_flags(); // Is this the right place for this?
356  }
357 
358  MADNESS_ASSERT(arg->get_world());
359  MADNESS_ASSERT(arg->get_func());
360 
361  // Map dest from world's communicator to comm_world
362  dest = map_to_comm_world[dest];
363 
364  lock(); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
365  nsent++;
366  int i = get_free_send_request();
367  send_req[i] = RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr);
368  if (managed) managed_send_buf[i] = (AmArg*)(arg);
369  unlock(); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
370  return send_req[i];
371  }
372 
373 
374  public:
375  WorldAmInterface(World& world);
376 
377  virtual ~WorldAmInterface();
378 
380  void fence() {}
381 
383  // RMI::Request isend(ProcessID dest, am_handlerT op, const AmArg* arg, int attr=RMI::ATTR_ORDERED);
384 
386  RMI::Request send(const ProcessID dest, am_handlerT op, const AmArg* arg,
387  const int attr=RMI::ATTR_ORDERED, const bool managed = true)
388  {
389  return isend(dest, op, arg, attr, managed);
390  }
391 
394  ScopedArray<int> ind(new int[nsend]);
395  lock(); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
396  int n = SafeMPI::Request::Testsome(nsend, send_req.get(), ind.get());
397  if (n != MPI_UNDEFINED) {
398  for (int i=0; i<n; ++i) {
399  free_managed_send_buf(ind[i]);
400  }
401  }
402  unlock(); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
403  }
404 
405 // RMI::Request isend(ProcessID dest, am_handlerT op, const AmArg* arg, int attr) {
406 // std::cerr << "ISEND_ING AM\n";
407 // return isend(dest, op, arg, attr, false);
408 // }
409 
410  };
411 }
412 
413 #endif // MADNESS_WORLD_WORLDAM_H__INCLUDED
unsigned long get_worldid() const
Return the world id.
Definition: worldam.h:158
void free_am_arg(AmArg *arg)
Frees an AmArg allocated with alloc_am_arg.
Definition: worldam.h:178
World * get_world() const
For incoming AM gives the associated world.
Definition: worldam.h:155
Tensor< double > B
Definition: tdse1d.cc:167
Mutex using pthread mutex operations.
Definition: worldmutex.h:94
void(* am_handlerT)(const AmArg &)
Type of AM handler functions.
Definition: worldam.h:78
World active message that extends an RMI message.
Definition: worldam.h:81
NDIM const Function< R, NDIM > & g
Definition: mra.h:2179
std::complex< double > func(int n, int t1, int t2, int t3, double xx, double yy, double zz)
Definition: wannier.cc:98
AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition: worldam.h:163
size_t bufar_size(const A &a, const B &b, const C &c, const D &d, const E &e, const F &f, const G &g, const H &h, const I &i, const J &j)
Convenience template computing the size of a buffer archive containing the arguments.
Definition: bufar.h:105
std::size_t size() const
Returns the size of the user's payload.
Definition: worldam.h:136
SafeMPI::Request Request
Definition: worldrmi.h:132
virtual ~WorldAmInterface()
Definition: worldam.cc:91
const int msg_len
Max length of user payload in message.
Definition: worldam.h:270
AmArg * new_am_arg(const A &a, const B &b, const C &c, const D &d, const E &e, const F &f, const G &g, const H &h, const I &i, const J &j)
Convenience template for serializing arguments into a new AmArg.
Definition: worldam.h:185
archive::BufferOutputArchive operator&(const T &t) const
Used to serialize arguments into outgoing message.
Definition: worldam.h:146
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Definition: worldrmi.h:260
Implements an archive wrapping a memory buffer.
#define MPI_UNDEFINED
Definition: stubmpi.h:24
friend class World
Definition: worldam.h:268
NDIM & f
Definition: mra.h:2179
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only) ...
Definition: tensor.h:2429
ProcessID get_src() const
For incoming AM gives the source process.
Definition: worldam.h:151
WorldAmInterface(World &world)
Definition: worldam.cc:44
static World * world_from_id(unsigned long id)
Convert world id to world pointer.
Definition: worldfwd.h:628
friend AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition: worldam.h:163
Wraps an archive around a memory buffer for output.
Definition: bufar.h:58
JLOOP2 NK jnz KLOOP2 mov C
Definition: mtxm_gen.h:10
Provides collectives that interoperate with the AM and task interfaces.
Definition: worldgop.h:147
void fence()
Currently a noop.
Definition: worldam.h:380
pcomplex_operatorT G
Definition: tdse1d.cc:168
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
archive::BufferInputArchive operator&(T &t) const
Used to deserialize arguments from incoming message.
Definition: worldam.h:140
FLOAT a(int j, FLOAT z)
Definition: y1.cc:86
static const attrT ATTR_ORDERED
Definition: worldrmi.h:139
RMI::Request send(const ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED, const bool managed=true)
Sends an unmanaged non-blocking active message.
Definition: worldam.h:386
Implements most parts of a globally addressable object (via unique ID)
Definition: worldam.h:74
A parallel world with full functionality wrapping an MPI communicator.
Definition: worldfwd.h:416
static const size_t HEADER_LEN
Definition: worldrmi.h:137
AmArg()
Definition: worldam.h:130
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
Scoped array.
Definition: scopedptr.h:85
const complexd I(0, 1)
Wraps an archive around a memory buffer for input.
Definition: bufar.h:206
Implements AM interface.
Definition: worldam.h:266
Definition: safempi.h:243
ptrT * get() const
Pointer accessor.
Definition: scopedptr.h:266
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
void free_managed_buffers()
Frees as many send buffers as possible.
Definition: worldam.h:393
AmArg * copy_am_arg(const AmArg &arg)
Definition: worldam.h:171
Implements World.
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
const double c
Definition: gfit.cc:200
FLOAT b(int j, FLOAT z)
Definition: y1.cc:79
static int Testsome(int incount, Request *requests, int *indices, Status *statuses)
Definition: safempi.h:314
void unlock() const
Free a mutex owned by this thread.
Definition: worldmutex.h:124
void lock() const
Acquire the mutex waiting if necessary.
Definition: worldmutex.h:118
unsigned char * buf() const
Returns a pointer to the user's payload (aligned in same way as AmArg)
Definition: worldam.h:133