MADNESS  version 0.9
worldgop.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31 
32  $Id$
33  */
34 
35 
36 #ifndef MADNESS_WORLD_WORLDGOP_H__INCLUDED
37 #define MADNESS_WORLD_WORLDGOP_H__INCLUDED
38 
41 
44 
46 #include <madness/world/bufar.h>
47 #include <madness/world/worldfwd.h>
50 #include <madness/world/group.h>
52 
53 
54 namespace madness {
55 
56  // Forward declarations
57  class World;
58  class WorldAmInterface;
59  class WorldTaskQueue;
60  namespace detail {
61 
62  class DeferredCleanup;
63 
64  } // namespace detail
65 
66  template <typename T>
67  struct WorldSumOp {
68  inline T operator()(const T& a, const T& b) const {
69  return a+b;
70  }
71  };
72 
73  template <typename T>
74  struct WorldMultOp {
75  inline T operator()(const T& a, const T& b) const {
76  return a*b;
77  }
78  };
79 
80  template <typename T>
81  struct WorldMaxOp {
82  inline T operator()(const T& a, const T& b) const {
83  return a>b? a : b;
84  }
85  };
86 
87  template <typename T>
88  struct WorldAbsMaxOp {
89  inline T operator()(const T& a, const T& b) const {
90  return abs(a)>abs(b)? abs(a) : abs(b);
91  }
92  };
93 
94  template <typename T>
95  struct WorldMinOp {
96  inline T operator()(const T& a, const T& b) const {
97  return a<b? a : b;
98  }
99  };
100 
101  template <typename T>
102  struct WorldAbsMinOp {
103  inline T operator()(const T& a, const T& b) const {
104  return abs(a)<abs(b)? abs(a) : abs(b);
105  }
106  };
107 
108  template <typename T>
109  struct WorldBitAndOp {
110  inline T operator()(const T& a, const T& b) const {
111  return a & b;
112  }
113  };
114 
115  template <typename T>
116  struct WorldBitOrOp {
117  inline T operator()(const T& a, const T& b) const {
118  return a | b;
119  }
120  };
121 
122  template <typename T>
123  struct WorldBitXorOp {
124  inline T operator()(const T& a, const T& b) const {
125  return a ^ b;
126  }
127  };
128 
129  template <typename T>
131  inline T operator()(const T& a, const T& b) const {
132  return a && b;
133  }
134  };
135 
136  template <typename T>
137  struct WorldLogicOrOp {
138  inline T operator()(const T& a, const T& b) const {
139  return a || b;
140  }
141  };
142 
143 
145 
148  private:
149  World& world_;
151  bool debug_;
152 
154 
155  // Message tags
156  struct PointToPointTag { };
157  struct LazySyncTag { };
158  struct GroupLazySyncTag { };
159  struct BcastTag { };
160  struct GroupBcastTag { };
161  struct ReduceTag { };
162  struct GroupReduceTag { };
163  struct AllReduceTag { };
164  struct GroupAllReduceTag { };
165 
166 
168 
173  template <typename keyT, typename valueT>
174  class DelayedSend : public CallbackInterface {
175  private:
176  World& world_;
177  const ProcessID dest_;
178  const keyT key_;
179  Future<valueT> value_;
180 
181  // Not allowed
182  DelayedSend(const DelayedSend<keyT, valueT>&);
183  DelayedSend<keyT, valueT>& operator=(const DelayedSend<keyT, valueT>&);
184 
185  public:
186 
188 
191  DelayedSend(World& world, const ProcessID dest,
192  const keyT& key, const Future<valueT>& value) :
193  world_(world), dest_(dest), key_(key), value_(value)
194  { }
195 
196  virtual ~DelayedSend() { }
197 
199 
202  virtual void notify() {
203  MADNESS_ASSERT(value_.probe());
204  world_.gop.send_internal(dest_, key_, value_.get());
205  delete this;
206  }
207  }; // class DelayedSend
208 
210 
214  template <typename valueT, typename keyT>
215  static Future<valueT> recv_internal(const keyT& key) {
216  return detail::DistCache<keyT>::template get_cache_value<valueT>(key);
217  }
218 
220 
227  template <typename keyT, typename valueT>
228  typename disable_if<is_future<valueT> >::type
229  send_internal(const ProcessID dest, const keyT& key, const valueT& value) const {
230  typedef detail::DistCache<keyT> dist_cache;
231 
232  if(world_.rank() == dest) {
233  // When dest is this process, skip the task and set the future immediately.
234  dist_cache::set_cache_value(key, value);
235  } else {
236  // Spawn a remote task to set the value
237  world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
238  value, TaskAttributes::hipri());
239  }
240  }
241 
243 
251  template <typename keyT, typename valueT>
252  void send_internal(ProcessID dest, const keyT& key, const Future<valueT>& value) const {
253  typedef detail::DistCache<keyT> dist_cache;
254 
255  if(world_.rank() == dest) {
256  dist_cache::set_cache_value(key, value);
257  } else {
258  // The destination is not this node, so send it to the destination.
259  if(value.probe()) {
260  // Spawn a remote task to set the value
261  world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
262  value.get(), TaskAttributes::hipri());
263  } else {
264  // The future is not ready, so create a callback object that will
265  // send value to the destination node when it is ready.
266  DelayedSend<keyT, valueT>* delayed_send_callback =
267  new DelayedSend<keyT, valueT>(world_, dest, key, value);
268  const_cast<Future<valueT>&>(value).register_callback(delayed_send_callback);
269 
270  }
271  }
272  }
273 
275 
281  template <typename keyT>
282  void lazy_sync_parent(const ProcessID parent, const keyT& key,
283  const ProcessID, const ProcessID) const
284  {
285  send_internal(parent, key, key.proc());
286  }
287 
289 
299  template <typename keyT, typename opT>
300  void lazy_sync_children(const ProcessID child0, const ProcessID child1,
301  const keyT& key, opT& op, const ProcessID) const
302  {
303  // Signal children to execute the operation.
304  if(child0 != -1)
305  send_internal(child0, key, 1);
306  if(child1 != -1)
307  send_internal(child1, key, 1);
308 
309  // Execute the operation on this process.
310  op();
311  }
312 
314 
317  template <typename tagT, typename keyT, typename opT>
318  void lazy_sync_internal(const ProcessID parent, const ProcessID child0,
319  const ProcessID child1, const keyT& key, const opT& op) const {
320  typedef ProcessKey<keyT, tagT> key_type;
321 
322  // Get signals from parent and children.
323  madness::Future<ProcessID> child0_signal = (child0 != -1 ?
324  recv_internal<ProcessID>(key_type(key, child0)) :
326  madness::Future<ProcessID> child1_signal = (child1 != -1 ?
327  recv_internal<ProcessID>(key_type(key, child1)) :
329  madness::Future<ProcessID> parent_signal = (parent != -1 ?
330  recv_internal<ProcessID>(key_type(key, parent)) :
332 
333  // Construct the task that notifies children to run the operation
334  key_type my_key(key, world_.rank());
335  typedef void (WorldGopInterface::*lazy_sync_childrenT)(const ProcessID, const ProcessID,
336  const key_type&, opT&, const ProcessID) const;
337  world_.taskq.add(*this, lazy_sync_childrenT(& WorldGopInterface::template lazy_sync_children<key_type, opT>),
338  child0_signal, child1_signal, my_key, op, parent_signal,
340 
341  // Send signal to parent
342  if(parent != -1) {
343  if(child0_signal.probe() && child1_signal.probe())
344  send_internal(parent, my_key, world_.rank());
345  else {
346  typedef void (WorldGopInterface::*lazy_sync_parentT)(const ProcessID, const key_type&,
347  const ProcessID, const ProcessID) const;
348  world_.taskq.add(*this, lazy_sync_parentT(& WorldGopInterface::template lazy_sync_parent<key_type>),
349  parent, my_key, child0_signal, child1_signal,
351  }
352  }
353  }
354 
355 
356  template <typename keyT, typename valueT, typename taskfnT>
357  static void bcast_handler(const AmArg& arg) {
358  // Deserialize message arguments
359  taskfnT taskfn;
360  keyT key;
361  valueT value;
362  ProcessID root;
363 
364  arg & taskfn & key & value & root;
365 
366  // Add task to queue
367  arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key,
368  value, root, TaskAttributes::hipri());
369  }
370 
371  template <typename keyT, typename valueT, typename taskfnT>
372  static void group_bcast_handler(const AmArg& arg) {
373  // Deserialize message arguments
374  taskfnT taskfn;
375  keyT key;
376  valueT value;
377  ProcessID group_root;
378  DistributedID group_key;
379 
380  arg & taskfn & key & value & group_root & group_key;
381 
382  // Get the local group
383  const Future<Group> group = Group::get_group(group_key);
384 
385  // Add task to queue
386  arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key, value,
387  group_root, group, TaskAttributes::hipri());
388  }
389 
390 
392 
395  template <typename keyT, typename valueT>
396  void bcast_task(const keyT& key, const valueT& value, const ProcessID root) const {
397  typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
398  const ProcessID) const;
399 
400  // Compute binary tree data
401  ProcessID parent = -1, child0 = -1, child1 = -1;
402  world_.mpi.binary_tree_info(root, parent, child0, child1);
403 
404  // Set the local data, except on the root process
405  if(parent != -1)
407 
408  // Precompute send checks
409  const bool send0 = (child0 != -1);
410  const bool send1 = (child1 != -1);
411 
412  // Send active messages to children
413  if(send0 || send1) { // Check that this process has children in the binary tree
414 
415  // Get handler function and arguments
416  void (*handler)(const AmArg&) =
417  & WorldGopInterface::template bcast_handler<keyT, valueT, taskfnT>;
418  AmArg* const args = new_am_arg(
419  & WorldGopInterface::template bcast_task<keyT, valueT>,
420  key, value, root);
421 
422  // Send active message to children
423  // Note: Only the last message is "managed" so the args buffer
424  // can be reused.
425  if(send0)
426  world_.am.send(child0, handler, args, RMI::ATTR_ORDERED, !send1);
427  if(send1)
428  world_.am.send(child1, handler, args, RMI::ATTR_ORDERED, true);
429  }
430  }
431 
432  template <typename keyT, typename valueT>
433  void group_bcast_task(const keyT& key, const valueT& value,
434  const ProcessID group_root, const Group& group) const
435  {
436  typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
437  const ProcessID, const Group&) const;
438 
439  // Set the local data, except on the root process
440  ProcessID parent = -1, child0 = -1, child1 = -1;
441  group.make_tree(group_root, parent, child0, child1);
442 
443  // Set the local data
444  if(parent != -1) {
446  group.remote_update();
447  }
448 
449  // Precompute send checks
450  const bool send0 = (child0 != -1);
451  const bool send1 = (child1 != -1);
452 
453  if(send0 || send1) { // Check that this process has children in the binary tree
454 
455  // Get handler function and arguments
456  void (*handler)(const AmArg&) =
457  & WorldGopInterface::template group_bcast_handler<keyT, valueT, taskfnT>;
458  AmArg* const args = new_am_arg(
459  & WorldGopInterface::template group_bcast_task<keyT, valueT>,
460  key, value, group_root, group.id());
461 
462  // Send active message to children
463  if(send0)
464  world_.am.send(child0, handler, args, RMI::ATTR_ORDERED, !send1);
465  if(send1)
466  world_.am.send(child1, handler, args, RMI::ATTR_ORDERED, true);
467  }
468  }
469 
471 
484  template <typename tagT, typename keyT, typename valueT>
485  void bcast_internal(const keyT& key, Future<valueT>& value, const ProcessID root) const {
486  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
487  MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
488 
489  // Add operation tag to key
490  typedef TaggedKey<keyT, tagT> key_type;
491  const key_type tagged_key(key);
492 
493  if(world_.size() > 1) { // Do nothing for the trivial case
494  if(world_.rank() == root) {
495  // This process owns the data to be broadcast.
496 
497  // Spawn remote tasks that will set the local cache for this
498  // broadcast on other nodes.
499  if(value.probe())
500  // The value is ready so send it now
501  bcast_task(tagged_key, value.get(), root);
502  else {
503  // The value is not ready so spawn a task to send the
504  // data when it is ready.
505  typedef void (WorldGopInterface::*bcast_taskT)(const key_type&,
506  const valueT&, const ProcessID root) const;
507  world_.taskq.add(*this, bcast_taskT(& WorldGopInterface::template bcast_task<key_type, valueT>),
508  tagged_key, value, root, TaskAttributes::hipri());
509  }
510  } else {
511  MADNESS_ASSERT(! value.probe());
512 
513  // Get the broadcast value from local cache
515  }
516  }
517  }
518 
520 
535  template <typename tagT, typename keyT, typename valueT>
536  void bcast_internal(const keyT& key, Future<valueT>& value,
537  const ProcessID group_root, const Group& group) const
538  {
539  // Construct the internal broadcast key
540  typedef TaggedKey<keyT, tagT> key_type;
541  const key_type tagged_key(key);
542 
543  if(group.rank() == group_root) {
544  // This process owns the data to be broadcast.
545  if(value.probe())
546  group_bcast_task(tagged_key, value.get(), group_root, group);
547  else {
548  typedef void (WorldGopInterface::*group_bcast_taskT)(const key_type&, const valueT&,
549  const ProcessID, const Group&) const;
550  world_.taskq.add(this, group_bcast_taskT(& WorldGopInterface::template group_bcast_task<key_type, valueT>),
551  tagged_key, value, group_root, group,
553  }
554  } else {
555  MADNESS_ASSERT(! value.probe());
556 
557  // This is not the root process, so retrieve the broadcast data
559 
560  // Increment local use counter for group
561  group.local_update();
562  }
563  }
564 
565  template <typename valueT, typename opT>
566  static typename detail::result_of<opT>::type
567  reduce_task(const valueT& value, const opT& op) {
568  typename detail::result_of<opT>::type result = op();
569  op(result, value);
570  return result;
571  }
572 
573  template <typename opT>
574  static typename detail::result_of<opT>::type
575  reduce_result_task(const std::vector<Future<typename detail::result_of<opT>::type> >& results,
576  const opT& op)
577  {
578  MADNESS_ASSERT(results.size() != 0ul);
579  Future<typename detail::result_of<opT>::type> result = results.front();
580  for(std::size_t i = 1ul; i < results.size(); ++i)
581  op(result.get(), results[i].get());
582  return result.get();
583  }
584 
586 
597  template <typename tagT, typename keyT, typename valueT, typename opT>
598  Future<typename detail::result_of<opT>::type>
599  reduce_internal(const ProcessID parent, const ProcessID child0,
600  const ProcessID child1, const ProcessID root, const keyT& key,
601  const valueT& value, const opT& op)
602  {
603  // Create tagged key
604  typedef ProcessKey<keyT, tagT> key_type;
605  typedef typename detail::result_of<opT>::type result_type;
606  typedef typename remove_future<valueT>::type value_type;
607  std::vector<Future<result_type> > results;
608  results.reserve(3);
609 
610  // Add local data to vector of values to reduce
611  results.push_back(world_.taskq.add(WorldGopInterface::template reduce_task<valueT, opT>,
612  value, op, TaskAttributes::hipri()));
613 
614  // Reduce child data
615  if(child0 != -1)
616  results.push_back(recv_internal<result_type>(key_type(key, child0)));
617  if(child1 != -1)
618  results.push_back(recv_internal<result_type>(key_type(key, child1)));
619 
620  // Submit the local reduction task
621  Future<result_type> local_result =
622  world_.taskq.add(WorldGopInterface::template reduce_result_task<opT>,
623  results, op, TaskAttributes::hipri());
624 
625  // Send reduced value to parent or, if this is the root process, set the
626  // result future.
627  if(parent == -1)
628  return local_result;
629  else
630  send_internal(parent, key_type(key, world_.rank()), local_result);
631 
633  }
634 
635 
636  public:
637 
638  // In the World constructor can ONLY rely on MPI and MPI being initialized
640  world_(world), deferred_(new detail::DeferredCleanup()), debug_(false)
641  { }
642 
644  deferred_->destroy(true);
645  deferred_->do_cleanup();
646  }
647 
648 
650  bool set_debug(bool value) {
651  bool status = debug_;
652  debug_ = value;
653  return status;
654  }
655 
657  void barrier() {
658  long i = world_.rank();
659  sum(i);
660  if (i != world_.size()*(world_.size()-1)/2) error("bad value after sum in barrier");
661  }
662 
663 
665 
673  void fence();
674 
675 
677 
679  void broadcast(void* buf, size_t nbyte, ProcessID root, bool dowork = true);
680 
681 
683 
685  template <typename T>
686  inline void broadcast(T* buf, size_t nelem, ProcessID root) {
687  broadcast((void *) buf, nelem*sizeof(T), root);
688  }
689 
691  template <typename T>
692  void broadcast(T& t) {
693  broadcast(&t, 1, 0);
694  }
695 
697  template <typename T>
698  void broadcast(T& t, ProcessID root) {
699  broadcast(&t, 1, root);
700  }
701 
703 
706  template <typename objT>
707  void broadcast_serializable(objT& obj, ProcessID root) {
708  size_t BUFLEN;
709  if (world_.rank() == root) {
711  count & obj;
712  BUFLEN = count.size();
713  }
714  broadcast(BUFLEN, root);
715 
716  unsigned char* buf = new unsigned char[BUFLEN];
717  if (world_.rank() == root) {
718  archive::BufferOutputArchive ar(buf,BUFLEN);
719  ar & obj;
720  }
721  broadcast(buf, BUFLEN, root);
722  if (world_.rank() != root) {
723  archive::BufferInputArchive ar(buf,BUFLEN);
724  ar & obj;
725  }
726  delete [] buf;
727  }
728 
730 
732  template <typename T, class opT>
733  void reduce(T* buf, size_t nelem, opT op) {
734  SafeMPI::Request req0, req1;
735  ProcessID parent, child0, child1;
736  world_.mpi.binary_tree_info(0, parent, child0, child1);
737  Tag gsum_tag = world_.mpi.unique_tag();
738 
739  T* buf0 = new T[nelem];
740  T* buf1 = new T[nelem];
741 
742  if (child0 != -1) req0 = world_.mpi.Irecv(buf0, nelem*sizeof(T), MPI_BYTE, child0, gsum_tag);
743  if (child1 != -1) req1 = world_.mpi.Irecv(buf1, nelem*sizeof(T), MPI_BYTE, child1, gsum_tag);
744 
745  if (child0 != -1) {
746  World::await(req0);
747  for (long i=0; i<(long)nelem; ++i) buf[i] = op(buf[i],buf0[i]);
748  }
749  if (child1 != -1) {
750  World::await(req1);
751  for (long i=0; i<(long)nelem; ++i) buf[i] = op(buf[i],buf1[i]);
752  }
753 
754  delete [] buf0;
755  delete [] buf1;
756 
757  if (parent != -1) {
758  req0 = world_.mpi.Isend(buf, nelem*sizeof(T), MPI_BYTE, parent, gsum_tag);
759  World::await(req0);
760  }
761 
762  broadcast(buf, nelem, 0);
763  }
764 
766  template <typename T>
767  inline void sum(T* buf, size_t nelem) {
768  reduce< T, WorldSumOp<T> >(buf, nelem, WorldSumOp<T>());
769  }
770 
772  template <typename T>
773  inline void min(T* buf, size_t nelem) {
774  reduce< T, WorldMinOp<T> >(buf, nelem, WorldMinOp<T>());
775  }
776 
778  template <typename T>
779  inline void max(T* buf, size_t nelem) {
780  reduce< T, WorldMaxOp<T> >(buf, nelem, WorldMaxOp<T>());
781  }
782 
784  template <typename T>
785  inline void absmin(T* buf, size_t nelem) {
786  reduce< T, WorldAbsMinOp<T> >(buf, nelem, WorldAbsMinOp<T>());
787  }
788 
790  template <typename T>
791  inline void absmax(T* buf, size_t nelem) {
792  reduce< T, WorldAbsMaxOp<T> >(buf, nelem, WorldAbsMaxOp<T>());
793  }
794 
796  template <typename T>
797  inline void product(T* buf, size_t nelem) {
798  reduce< T, WorldMultOp<T> >(buf, nelem, WorldMultOp<T>());
799  }
800 
801  template <typename T>
802  inline void bit_and(T* buf, size_t nelem) {
803  reduce< T, WorldBitAndOp<T> >(buf, nelem, WorldBitAndOp<T>());
804  }
805 
806  template <typename T>
807  inline void bit_or(T* buf, size_t nelem) {
808  reduce< T, WorldBitOrOp<T> >(buf, nelem, WorldBitOrOp<T>());
809  }
810 
811  template <typename T>
812  inline void bit_xor(T* buf, size_t nelem) {
813  reduce< T, WorldBitXorOp<T> >(buf, nelem, WorldBitXorOp<T>());
814  }
815 
816  template <typename T>
817  inline void logic_and(T* buf, size_t nelem) {
818  reduce< T, WorldLogicAndOp<T> >(buf, nelem, WorldLogicAndOp<T>());
819  }
820 
821  template <typename T>
822  inline void logic_or(T* buf, size_t nelem) {
823  reduce< T, WorldLogicOrOp<T> >(buf, nelem, WorldLogicOrOp<T>());
824  }
825 
827  template <typename T>
828  void sum(T& a) {
829  sum(&a, 1);
830  }
831 
833  template <typename T>
834  void max(T& a) {
835  max(&a, 1);
836  }
837 
839  template <typename T>
840  void min(T& a) {
841  min(&a, 1);
842  }
843 
845  template <typename T>
846  std::vector<T> concat0(const std::vector<T>& v, size_t bufsz=1024*1024) {
847  SafeMPI::Request req0, req1;
848  ProcessID parent, child0, child1;
849  world_.mpi.binary_tree_info(0, parent, child0, child1);
850  Tag gsum_tag = world_.mpi.unique_tag();
851 
852  unsigned char* buf0 = new unsigned char[bufsz];
853  unsigned char* buf1 = new unsigned char[bufsz];
854 
855  if (child0 != -1) req0 = world_.mpi.Irecv(buf0, bufsz, MPI_BYTE, child0, gsum_tag);
856  if (child1 != -1) req1 = world_.mpi.Irecv(buf1, bufsz, MPI_BYTE, child1, gsum_tag);
857 
858  std::vector<T> left, right;
859  if (child0 != -1) {
860  World::await(req0);
861  archive::BufferInputArchive ar(buf0, bufsz);
862  ar & left;
863  }
864  if (child1 != -1) {
865  World::await(req1);
866  archive::BufferInputArchive ar(buf1, bufsz);
867  ar & right;
868  for (unsigned int i=0; i<right.size(); ++i) left.push_back(right[i]);
869  }
870 
871  for (unsigned int i=0; i<v.size(); ++i) left.push_back(v[i]);
872 
873  if (parent != -1) {
874  archive::BufferOutputArchive ar(buf0, bufsz);
875  ar & left;
876  req0 = world_.mpi.Isend(buf0, ar.size(), MPI_BYTE, parent, gsum_tag);
877  World::await(req0);
878  }
879 
880  delete [] buf0;
881  delete [] buf1;
882 
883  if (parent == -1) return left;
884  else return std::vector<T>();
885  }
886 
888 
897  template <typename valueT, typename keyT>
898  static Future<valueT> recv(const ProcessID source, const keyT& key) {
899  return recv_internal<valueT>(ProcessKey<keyT, PointToPointTag>(key, source));
900  }
901 
903 
912  template <typename keyT, typename valueT>
913  void send(const ProcessID dest, const keyT& key, const valueT& value) const {
914  send_internal(dest, ProcessKey<keyT, PointToPointTag>(key, world_.rank()), value);
915  }
916 
918 
944  template <typename keyT, typename opT>
945  void lazy_sync(const keyT& key, const opT& op) const {
946  if(world_.size() > 1) { // Do nothing for the trivial case
947  // Get the binary tree data
948  Hash<keyT> hasher;
949  const ProcessID root = hasher(key) % world_.size();
950  ProcessID parent = -1, child0 = -1, child1 = -1;
951  world_.mpi.binary_tree_info(root, parent, child0, child1);
952 
953  lazy_sync_internal<LazySyncTag>(parent, child0, child1, key, op);
954  } else {
955  typedef void (WorldGopInterface::*lazy_sync_childrenT)(const ProcessID, const ProcessID,
956  const keyT&, opT&, const ProcessID) const;
957  // There is only one process, so run the sync operation now.
958  world_.taskq.add(*this, lazy_sync_childrenT(& WorldGopInterface::template lazy_sync_children<keyT, opT>),
959  -1, -1, key, op, -1, TaskAttributes::hipri());
960  }
961  }
962 
963 
965 
991  template <typename keyT, typename opT>
992  void lazy_sync(const keyT& key, const opT& op, const Group& group) const {
993  MADNESS_ASSERT(! group.empty());
994  MADNESS_ASSERT(group.get_world().id() == world_.id());
995 
996  if(group.size() > 1) { // Do nothing for the trivial case
997  // Get the binary tree data
998  Hash<keyT> hasher;
999  const ProcessID group_root = hasher(key) % group.size();
1000  ProcessID parent = -1, child0 = -1, child1 = -1;
1001  group.make_tree(group_root, parent, child0, child1);
1002 
1003  lazy_sync_internal<GroupLazySyncTag>(parent, child0, child1, key, op);
1004  } else {
1005  typedef void (WorldGopInterface::*lazy_sync_children)(const ProcessID, const ProcessID,
1006  const keyT&, opT&, const ProcessID) const;
1007  world_.taskq.add(*this, lazy_sync_children(& WorldGopInterface::template lazy_sync_children<keyT, opT>),
1008  -1, -1, key, op, -1, TaskAttributes::hipri());
1009  }
1010  }
1011 
1013 
1028  template <typename keyT, typename valueT>
1029  void bcast(const keyT& key, Future<valueT>& value, const ProcessID root) const {
1030  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1031  MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
1032 
1033  if(world_.size() > 1) // Do nothing for the trivial case
1034  bcast_internal<BcastTag>(key, value, root);
1035  }
1036 
1038 
1060  template <typename keyT, typename valueT>
1061  void bcast(const keyT& key, Future<valueT>& value,
1062  const ProcessID group_root, const Group& group) const
1063  {
1064  MADNESS_ASSERT(! group.empty());
1065  MADNESS_ASSERT(group.get_world().id() == world_.id());
1066  MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1067  MADNESS_ASSERT((group.rank() == group_root) || (! value.probe()));
1068 
1069  if(group.size() > 1) // Do nothing for the trivial case
1070  bcast_internal<GroupBcastTag>(key, value, group_root, group);
1071  }
1072 
1074 
1108  template <typename keyT, typename valueT, typename opT>
1110  reduce(const keyT& key, const valueT& value, const opT& op, const ProcessID root) {
1111  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1112 
1113  // Get the binary tree data
1114  ProcessID parent = -1, child0 = -1, child1 = -1;
1115  world_.mpi.binary_tree_info(root, parent, child0, child1);
1116 
1117  return reduce_internal<ReduceTag>(parent, child0, child1, root, key,
1118  value, op);
1119  }
1120 
1122 
1164  template <typename keyT, typename valueT, typename opT>
1166  reduce(const keyT& key, const valueT& value, const opT& op,
1167  const ProcessID group_root, const Group& group)
1168  {
1169  MADNESS_ASSERT(! group.empty());
1170  MADNESS_ASSERT(group.get_world().id() == world_.id());
1171  MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1172 
1173  // Get the binary tree data
1174  ProcessID parent = -1, child0 = -1, child1 = -1;
1175  group.make_tree(group_root, parent, child0, child1);
1176 
1177  return reduce_internal<ReduceTag>(parent, child0, child1, group_root,
1178  key, value, op);
1179  }
1180 
1182 
1216  template <typename keyT, typename valueT, typename opT>
1218  all_reduce(const keyT& key, const valueT& value, const opT& op) {
1219  // Compute the parent and child processes of this process in a binary tree.
1220  Hash<keyT> hasher;
1221  const ProcessID root = hasher(key) % world_.size();
1222  ProcessID parent = -1, child0 = -1, child1 = -1;
1223  world_.mpi.binary_tree_info(root, parent, child0, child1);
1224 
1225  // Reduce the data
1227  reduce_internal<AllReduceTag>(parent, child0, child1, root,
1228  key, value, op);
1229 
1230  if(world_.rank() != root)
1231  reduce_result = Future<typename detail::result_of<opT>::type>();
1232 
1233  // Broadcast the result of the reduction to all processes
1234  bcast_internal<AllReduceTag>(key, reduce_result, root);
1235 
1236  return reduce_result;
1237  }
1238 
1240 
1280  template <typename keyT, typename valueT, typename opT>
1282  all_reduce(const keyT& key, const valueT& value, const opT& op, const Group& group) {
1283  MADNESS_ASSERT(! group.empty());
1284  MADNESS_ASSERT(group.get_world().id() == world_.id());
1285 
1286  // Compute the parent and child processes of this process in a binary tree.
1287  Hash<keyT> hasher;
1288  const ProcessID group_root = hasher(key) % group.size();
1289  ProcessID parent = -1, child0 = -1, child1 = -1;
1290  group.make_tree(group_root, parent, child0, child1);
1291 
1292  // Reduce the data
1294  reduce_internal<GroupAllReduceTag>(parent, child0, child1,
1295  group_root, key, value, op);
1296 
1297 
1298  if(group.rank() != group_root)
1299  reduce_result = Future<typename detail::result_of<opT>::type>();
1300 
1301  // Broadcast the result of the reduction to all processes in the group
1302  bcast_internal<GroupAllReduceTag>(key, reduce_result, 0, group);
1303 
1304  return reduce_result;
1305  }
1306  }; // class WorldGopInterface
1307 
1308 } // namespace madness
1309 
1310 #endif // MADNESS_WORLD_WORLDGOP_H__INCLUDED
T operator()(const T &a, const T &b) const
Definition: worldgop.h:131
void error(const char *msg)
Definition: world.cc:128
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for MPI request to complete.
Definition: worldfwd.h:650
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op)
Distributed all reduce.
Definition: worldgop.h:1218
static TaskAttributes hipri()
Definition: worldthread.h:277
Definition: worldgop.h:109
WorldGopInterface(World &world)
Definition: worldgop.h:639
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op, const Group &group)
Distributed, group all reduce.
Definition: worldgop.h:1282
void min(T &a)
Global min of a scalar while still processing AM & tasks.
Definition: worldgop.h:840
AmArg * new_am_arg(const A &a, const B &b, const C &c, const D &d, const E &e, const F &f, const G &g, const H &h, const I &i, const J &j)
Convenience template for serializing arguments into a new AmArg.
Definition: worldam.h:185
bool probe() const
Query the whether this future has been assigned.
Definition: worldfut.h:527
void min(T *buf, size_t nelem)
Inplace global min while still processing AM & tasks.
Definition: worldgop.h:773
Definition: worldgop.h:88
unsigned long id() const
Returns the system-wide unique integer ID of this world.
Definition: worldfwd.h:523
Definition: worldgop.h:116
Implements an archive wrapping a memory buffer.
int Tag
Used to clearly identify message tag/type.
Definition: worldtypes.h:38
void absmin(T *buf, size_t nelem)
Inplace global absmin while still processing AM & tasks.
Definition: worldgop.h:785
Definition: worldgop.h:102
void bit_and(T *buf, size_t nelem)
Definition: worldgop.h:802
Definition: worldgop.h:81
Definition: worldgop.h:67
ProcessID rank() const
Group rank accessor.
Definition: group.h:415
static void get_cache_value(const keyT &key, madness::Future< valueT > &value)
Get the cache value accosted with key.
Definition: dist_cache.h:188
#define MPI_BYTE
Definition: stubmpi.h:66
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only) ...
Definition: tensor.h:2429
void bit_xor(T *buf, size_t nelem)
Definition: worldgop.h:812
std::size_t size() const
Definition: bufar.h:97
void sum(T &a)
Global sum of a scalar while still processing AM & tasks.
Definition: worldgop.h:828
Definition: worldgop.h:95
void bcast(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition: worldgop.h:1029
bool empty() const
Quary empty group.
Definition: group.h:394
Deferred cleanup of shared_ptr's.
Definition: deferred_cleanup.h:57
Key object that included the process information.
Definition: dist_keys.h:71
T operator()(const T &a, const T &b) const
Definition: worldgop.h:110
Wraps an archive around a memory buffer for output.
Definition: bufar.h:58
T operator()(const T &a, const T &b) const
Definition: worldgop.h:96
void bit_or(T *buf, size_t nelem)
Definition: worldgop.h:807
Provides collectives that interoperate with the AM and task interfaces.
Definition: worldgop.h:147
void max(T *buf, size_t nelem)
Inplace global max while still processing AM & tasks.
Definition: worldgop.h:779
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition: worldgop.h:707
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
T operator()(const T &a, const T &b) const
Definition: worldgop.h:89
Definition: worldgop.h:130
MUP_BASETYPE value_type
The numeric datatype used by the parser.
Definition: muParserDef.h:222
FLOAT a(int j, FLOAT z)
Definition: y1.cc:86
void product(T *buf, size_t nelem)
Inplace global product while still processing AM & tasks.
Definition: worldgop.h:797
void destroy(bool mode)
Set the destruction mode.
Definition: deferred_cleanup.cc:41
T operator()(const T &a, const T &b) const
Definition: worldgop.h:138
static const attrT ATTR_ORDERED
Definition: worldrmi.h:139
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID root)
Distributed reduce.
Definition: worldgop.h:1110
ProcessID size() const
Group size accessor.
Definition: group.h:432
T operator()(const T &a, const T &b) const
Definition: worldgop.h:68
void broadcast(T *buf, size_t nelem, ProcessID root)
Broadcasts typed contiguous data from process root while still processing AM & tasks.
Definition: worldgop.h:686
World & get_world() const
Parent world accessor.
Definition: group.h:407
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition: worldgop.h:767
void send(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition: worldgop.h:913
A parallel world with full functionality wrapping an MPI communicator.
Definition: worldfwd.h:416
void fence()
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:52
void make_tree(const ProcessID group_root, ProcessID &parent, ProcessID &child1, ProcessID &child2) const
Compute the binary tree parents and children.
Definition: group.h:452
void lazy_sync(const keyT &key, const opT &op, const Group &group) const
Group lazy sync.
Definition: worldgop.h:992
T operator()(const T &a, const T &b) const
Definition: worldgop.h:117
static const Future< T > default_initializer()
See Gotchas on the documentation mainpage about why this exists and how to use it.
Definition: worldfut.h:439
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
void barrier()
Synchronizes all processes in communicator ... does NOT fence pending AM or tasks.
Definition: worldgop.h:657
bool set_debug(bool value)
Set debug flag to new value and return old value.
Definition: worldgop.h:650
std::pair< uniqueidT, std::size_t > DistributedID
Distributed ID which is used to identify objects.
Definition: dist_keys.h:44
Wraps an archive around a memory buffer for input.
Definition: bufar.h:206
static Future< valueT > recv(const ProcessID source, const keyT &key)
Receive data from source.
Definition: worldgop.h:898
void do_cleanup()
Deletes/frees any pointers that are in the list.
Definition: deferred_cleanup.cc:62
Definition: safempi.h:243
double abs(double x)
Definition: complexfun.h:48
T operator()(const T &a, const T &b) const
Definition: worldgop.h:82
void reduce(T *buf, size_t nelem, opT op)
Inplace global reduction (like MPI all_reduce) while still processing AM & tasks. ...
Definition: worldgop.h:733
void bcast(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition: worldgop.h:1061
std::vector< T > concat0(const std::vector< T > &v, size_t bufsz=1024 *1024)
Concatenate an STL vector of serializable stuff onto node 0.
Definition: worldgop.h:846
static madness::Future< Group > get_group(const DistributedID &did)
Get group from the registry.
Definition: group.cc:89
A collection of processes.
Definition: group.h:52
static void set_cache_value(const keyT &key, const valueT &value)
Set the cache value accosted with key.
Definition: dist_cache.h:149
Defines TaskInterface and implements WorldTaskQueue and associated stuff.
void broadcast(void *buf, size_t nbyte, ProcessID root, bool dowork=true)
Broadcasts bytes from process root while still processing AM & tasks.
Definition: worldgop.cc:145
Definition: worldgop.h:123
A future is a possibly yet unevaluated value.
Definition: ref.h:210
Definition: worldgop.h:137
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
void logic_and(T *buf, size_t nelem)
Definition: worldgop.h:817
T operator()(const T &a, const T &b) const
Definition: worldgop.h:103
void absmax(T *buf, size_t nelem)
Inplace global absmax while still processing AM & tasks.
Definition: worldgop.h:791
void broadcast(T &t)
Broadcast of a scalar from node 0 to all other nodes.
Definition: worldgop.h:692
Implements World.
void broadcast(T &t, ProcessID root)
Broadcast of a scalar from node root to all other nodes.
Definition: worldgop.h:698
void logic_or(T *buf, size_t nelem)
Definition: worldgop.h:822
T operator()(const T &a, const T &b) const
Definition: worldgop.h:124
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
~WorldGopInterface()
Definition: worldgop.h:643
T operator()(const T &a, const T &b) const
Definition: worldgop.h:75
Definition: worldgop.h:74
FLOAT b(int j, FLOAT z)
Definition: y1.cc:79
Hash functor.
Definition: shared_ptr_bits.h:32
const mpreal root(const mpreal &v, unsigned long int k, mp_rnd_t rnd_mode)
Definition: mpreal.h:2180
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID group_root, const Group &group)
Distributed group reduce.
Definition: worldgop.h:1166
void max(T &a)
Global max of a scalar while still processing AM & tasks.
Definition: worldgop.h:834
void lazy_sync(const keyT &key, const opT &op) const
Lazy sync.
Definition: worldgop.h:945