MADNESS  version 0.9
binsorter.h
Go to the documentation of this file.
1 #include <madness/world/world.h>
4 #include <vector>
5 
6 namespace madness {
7 
9  template <typename T, typename inserterT>
10  class BinSorter : public WorldObject< BinSorter<T,inserterT> > {
11  World* pworld;
12  inserterT inserter;
13  std::size_t bufsize;
14  std::vector<T>* bins;
15 
16  void flush(int owner) {
17  if (bins[owner].size())
18  this->send(owner, &BinSorter<T,inserterT>::sorter, bins[owner]);
19  bins[owner].clear();
20  }
21 
22  void sorter(const std::vector<T>& v) {
23  for(typename std::vector<T>::const_iterator it = v.begin(); it != v.end(); ++it) {
24  inserter(*it);
25  }
26  }
27 
28  public:
30 
34  BinSorter(World& world, inserterT inserter, int bufsize=0)
35  : WorldObject<BinSorter>(world)
36  , pworld(&world)
37  , inserter(inserter)
38  , bufsize(bufsize)
39  , bins(new std::vector<T>[world.size()])
40  {
41  // bufsize ... max from AM buffer size is about 512K/sizeof(T)
42  // bufsize ... max from total buffer use is about 1GB/sizeof(T)/P
43  if (bufsize <= 0) this->bufsize = std::min((RMI::max_msg_len()-1024)/sizeof(T),(1<<30)/(world.size()*sizeof(T)));
44 
45  // for (int i=0; i<world.size(); i++) {
46  // bins[i].reserve(bufsize); // Not a good idea on large process counts unless truly all to all?
47  // }
48 
49  //print("binsorter bufsize is", this->bufsize, this->bufsize*sizeof(T));
51  }
52 
53  virtual ~BinSorter() {
54  for (int i=0; i<pworld->size(); i++) {
55  MADNESS_ASSERT(bins[i].size() == 0);
56  }
57  delete [] bins;
58  }
59 
61  void finish() {
62  for (int i=0; i<pworld->size(); i++) {
63  flush(i);
64  }
65  pworld->gop.fence();
66  }
67 
69  void insert(ProcessID p, const T& value) {
70  bins[p].push_back(value);
71 
72  // More intelligent buffer management would look at total use and flush
73  // largest buffers rather than using a fixed buffersize per process
74  if (bins[p].size() >= bufsize) flush(p);
75  }
76  };
77 }
78 
79 
80 
WorldGopInterface & gop
Global operations.
Definition: worldfwd.h:462
virtual ~BinSorter()
Definition: binsorter.h:53
void process_pending()
To be called from derived constructor to process pending messages.
Definition: worldobj.h:330
A parallel bin sort across MPI processes.
Definition: binsorter.h:10
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition: worldobj.h:388
void finish()
Invoke to complete the sort, flush all buffers, and ensure communication/processing is complete...
Definition: binsorter.h:61
Definition: mpreal.h:3066
ProcessID size() const
Returns the number of processes in this world (same as MPI_Comm_size())
Definition: worldfwd.h:533
This header should include pretty much everything needed for the parallel runtime.
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
const mpreal min(const mpreal &x, const mpreal &y)
Definition: mpreal.h:2675
Defines and implements WorldObject.
Implements most parts of a globally addressable object (via unique ID)
Definition: worldam.h:74
BinSorter(World &world, inserterT inserter, int bufsize=0)
Constructs the sorter object.
Definition: binsorter.h:34
A parallel world with full functionality wrapping an MPI communicator.
Definition: worldfwd.h:416
void fence()
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:52
static std::size_t max_msg_len()
Definition: worldrmi.h:247
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
void insert(ProcessID p, const T &value)
Application calls this to add a value to the bin for process p.
Definition: binsorter.h:69
Implements MadnessException.
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
World & world
Think globally act locally.
Definition: worldobj.h:171