MADNESS  version 0.9
parar.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31  $Id$
32 */
33 #ifndef MADNESS_WORLD_PARAR_H__INCLUDED
34 #define MADNESS_WORLD_PARAR_H__INCLUDED
35 
38 
39 #include <madness/world/archive.h>
40 #include <madness/world/binfsar.h>
41 #include <madness/world/worldfwd.h>
42 #include <madness/world/worldgop.h>
43 
44 #include <unistd.h>
45 #include <cstring>
46 #include <cstdio>
47 
48 namespace madness {
49  namespace archive {
50 
53 
55 
57  template <typename Archive>
59  World* world;
60  mutable Archive ar;
61  int nio;
62  bool do_fence;
63  char fname[256];
64  int nclient;
65 
66  public:
67  static const bool is_parallel_archive = true;
68 
69  BaseParallelArchive() : world(0), ar(), nio(0), do_fence(true) {}
70 
72 
75  ProcessID io_node(ProcessID rank) const {
76  return rank%nio;
77  }
78 
81  MADNESS_ASSERT(world);
82  return io_node(world->rank());
83  }
84 
86  int num_io_clients() const {
87  MADNESS_ASSERT(world);
88  return nclient;
89  }
90 
92  bool is_io_node() const {
93  MADNESS_ASSERT(world);
94  return world->rank() == my_io_node();
95  }
96 
98  World* get_world() const {
99  MADNESS_ASSERT(world);
100  return world;
101  }
102 
104 
117  void open(World& world, const char* filename, int nwriter=1) {
118  this->world = &world;
119  nio = nwriter;
120 #if defined(HAVE_IBMBGP) || defined(HAVE_IBMBGQ)
121  /* Jeff believes that BG is designed to handle up to *
122  * one file per node and I assume no more than 8 ppn */
123  int maxio = world.size()/8;
124 #else
125  int maxio = 50;
126 #endif
127  if (nio > maxio) nio = maxio; // Sanity?
128  if (nio > world.size()) nio = world.size();
129 
130  MADNESS_ASSERT(filename);
131  MADNESS_ASSERT(strlen(filename)-1<sizeof(fname));
132  strcpy(fname,filename); // Save the filename for later
133  char buf[256];
134  MADNESS_ASSERT(strlen(filename)+7 <= sizeof(buf));
135  sprintf(buf, "%s.%5.5d", filename, world.rank());
136 
137  if (world.rank() == 0) {
138  ar.open(buf);
139  ar & nio; // read/write nio from/to the archive
140  MADNESS_ASSERT(nio <= world.size());
141  }
142 
143  // Ensure all agree on value of nio that may also have changed if reading
144  world.gop.broadcast(nio, 0);
145 
146  // Other reader/writers can now open the local archive
147  if (is_io_node() && world.rank()) {
148  ar.open(buf);
149  }
150 
151  // Count #client
152  ProcessID me = world.rank();
153  nclient=0;
154  for (ProcessID p=0; p<world.size(); ++p) if (io_node(p) == me) ++nclient;
155 
156 // if (is_io_node()) {
157 // madness::print("I am an IO node with",nclient,"clients and file",buf);
158 // }
159 // else {
160 // madness::print("I am a client served by",my_io_node(),fname);
161 // }
162  }
163 
165  static bool exists(World& world, const char* filename) {
166  char buf[256];
167  MADNESS_ASSERT(strlen(filename)+7 <= sizeof(buf));
168  sprintf(buf, "%s.%5.5d", filename, world.rank());
169  bool status;
170  if (world.rank() == 0)
171  status = (access(buf, F_OK|R_OK) == 0);
172 
173  world.gop.broadcast(status);
174 
175  return status;
176  }
177 
179  void close() {
180  MADNESS_ASSERT(world);
181  if (is_io_node()) ar.close();
182  }
183 
185  Archive& local_archive() const {
186  MADNESS_ASSERT(world);
187  MADNESS_ASSERT(is_io_node());
188  return ar;
189  }
190 
192  template <typename objT>
193  void broadcast(objT& obj, ProcessID root) const {
194  get_world()->gop.broadcast_serializable(obj, root);
195  }
196 
198 
201  static void remove(World& world, const char* filename) {
202  if (world.rank() == 0) {
203  char buf[256];
204  MADNESS_ASSERT(strlen(filename)+7 <= sizeof(buf));
205  for (ProcessID p=0; p<world.size(); ++p) {
206  sprintf(buf, "%s.%5.5d", filename, p);
207  if (::remove(buf)) break;
208  }
209  }
210  }
211 
213  void remove() {
214  MADNESS_ASSERT(world);
215  remove(*world, fname);
216  }
217 
218 
219  bool dofence() const {
220  return this->do_fence;
221  }
222 
223  void set_dofence(bool dofence) {
224  do_fence = dofence;
225  }
226  };
227 
228 
230 
241  class ParallelOutputArchive : public BaseParallelArchive<BinaryFstreamOutputArchive>, public BaseOutputArchive {
242  public:
244 
246  ParallelOutputArchive(World& world, const char* filename, int nio=1) {
247  open(world, filename, nio);
248  }
249 
250  void flush() {
251  if (is_io_node()) local_archive().flush();
252  }
253  };
254 
256 
266  class ParallelInputArchive : public BaseParallelArchive<BinaryFstreamInputArchive>, public BaseInputArchive {
267  public:
269 
271  ParallelInputArchive(World& world, const char* filename, int nio=1) {
272  open(world, filename, nio);
273  }
274  };
275 
276 
278  template <class T>
280  static void preamble_store(const ParallelOutputArchive& ar) {};
281  static inline void postamble_store(const ParallelOutputArchive& ar) {};
282  };
283 
285  template <class T>
287  static inline void preamble_load(const ParallelInputArchive& ar) {};
288  static inline void postamble_load(const ParallelInputArchive& ar) {};
289  };
290 
291 
292  template <class T>
295  template <typename Q>
296  static inline
298  wrap_store(const ParallelOutputArchive& ar, const Q& t) {
300  return ar;
301  }
302 
304  template <typename Q>
305  static inline
307  wrap_store(const ParallelOutputArchive& ar, const Q& t) {
308  if (ar.get_world()->rank()==0) {
309  ar.local_archive() & t;
310  }
311  return ar;
312  }
313  };
314 
315  template <class T>
318  template <typename Q>
319  static inline
321  wrap_load(const ParallelInputArchive& ar, const Q& t) {
322  ArchiveLoadImpl<ParallelInputArchive,T>::load(ar,const_cast<T&>(t));
323  return ar;
324  }
325 
327  template <typename Q>
328  static inline
330  wrap_load(const ParallelInputArchive& ar, const Q& t) {
331  if (ar.get_world()->rank()==0) {
332  ar.local_archive() & t;
333  }
334  ar.broadcast(const_cast<T&>(t), 0);
335  return ar;
336  }
337  };
338 
339 
341  template <class T>
343  static inline const ParallelOutputArchive& wrap_store(const ParallelOutputArchive& ar, const archive_array<T>& t) {
344  if (ar.get_world()->rank() == 0) ar.local_archive() & t;
345  return ar;
346  }
347  };
348 
350  template <class T>
352  static inline const ParallelInputArchive& wrap_load(const ParallelInputArchive& ar, const archive_array<T>& t) {
353  if (ar.get_world()->rank() == 0) ar.local_archive() & t;
354  ar.broadcast(t, 0);
355  return ar;
356  }
357  };
358 
360  template <class T, std::size_t n>
362  static inline const ParallelOutputArchive& wrap_store(const ParallelOutputArchive& ar, const T(&t)[n]) {
363  ar << wrap(&t[0],n);
364  return ar;
365  }
366  };
367 
369  template <class T, std::size_t n>
371  static inline const ParallelInputArchive& wrap_load(const ParallelInputArchive& ar, const T(&t)[n]) {
372  ar >> wrap(&t[0],n);
373  return ar;
374  }
375  };
376  }
377 }
378 
379 #endif // MADNESS_WORLD_PARAR_H__INCLUDED
WorldGopInterface & gop
Global operations.
Definition: worldfwd.h:462
bool is_io_node() const
Returns true if this node is doing physical IO.
Definition: parar.h:92
ParallelInputArchive()
Definition: parar.h:268
An archive for storing local or parallel data wrapping BinaryFstreamOutputArchive.
Definition: parar.h:241
BaseParallelArchive()
Definition: parar.h:69
static const ParallelInputArchive & wrap_load(const ParallelInputArchive &ar, const archive_array< T > &t)
Definition: parar.h:352
archive_array< T > wrap(const T *, unsigned int)
Factory function to wrap dynamically allocated pointer as typed archive_array.
Definition: archive.h:820
void flush()
Definition: binfsar.cc:65
Objects that implement their own parallel archive interface should derive from this.
Definition: parar.h:52
Interface templates for the archives (serialization)
ProcessID io_node(ProcessID rank) const
Returns the process doing IO for given node.
Definition: parar.h:75
bool dofence() const
Definition: parar.h:219
void broadcast(objT &obj, ProcessID root) const
Same as world.gop.broadcast_serializable(obj, root)
Definition: parar.h:193
static madness::enable_if< is_derived_from< Q, ParallelSerializableObject >, const ParallelInputArchive & >::type wrap_load(const ParallelInputArchive &ar, const Q &t)
Parallel objects are forwarded to their implementation of parallel load.
Definition: parar.h:321
static void store(const Archive &ar, const T &t)
Definition: archive.h:709
ProcessID size() const
Returns the number of processes in this world (same as MPI_Comm_size())
Definition: worldfwd.h:533
Default implementation of pre/postamble.
Definition: archive.h:641
POD holding excitation energy and response vector for a single excitation.
Definition: tdhf_CIS.h:134
disable_if from Boost for conditionally instantiating templates based on type
Definition: enable_if.h:78
static const ParallelInputArchive & wrap_load(const ParallelInputArchive &ar, const T(&t)[n])
Definition: parar.h:371
Implements archive wrapping a binary filestream.
ParallelOutputArchive()
Definition: parar.h:243
static void postamble_load(const ParallelInputArchive &ar)
Definition: parar.h:288
ParallelOutputArchive(World &world, const char *filename, int nio=1)
Creates a parallel archive for output with given base filename and number of IO nodes.
Definition: parar.h:246
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition: worldgop.h:707
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
void close()
Closes the parallel archive.
Definition: parar.h:179
static madness::disable_if< is_derived_from< Q, ParallelSerializableObject >, const ParallelInputArchive & >::type wrap_load(const ParallelInputArchive &ar, const Q &t)
Serial objects read only from process 0 and broadcast results.
Definition: parar.h:330
Base class for output archives classes.
Definition: archive.h:583
static const ParallelOutputArchive & wrap_store(const ParallelOutputArchive &ar, const archive_array< T > &t)
Definition: parar.h:343
static void load(const Archive &ar, const T &t)
Definition: archive.h:719
Base class for input archives classes.
Definition: archive.h:576
Base class for input and output parallel archives.
Definition: parar.h:58
ParallelInputArchive(World &world, const char *filename, int nio=1)
Creates a parallel archive for input.
Definition: parar.h:271
A parallel world with full functionality wrapping an MPI communicator.
Definition: worldfwd.h:416
Default implementation of wrap_store and wrap_load.
Definition: archive.h:728
void open(World &world, const char *filename, int nwriter=1)
Opens the parallel archive.
Definition: parar.h:117
Wrapper for dynamic arrays and pointers.
Definition: archive.h:451
static madness::enable_if< is_derived_from< Q, ParallelSerializableObject >, const ParallelOutputArchive & >::type wrap_store(const ParallelOutputArchive &ar, const Q &t)
Parallel objects are forwarded to their implementation of parallel store.
Definition: parar.h:298
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
int num_io_clients() const
Returns the number of IO clients for this node including self (zero if not an IO node) ...
Definition: parar.h:86
static const ParallelOutputArchive & wrap_store(const ParallelOutputArchive &ar, const T(&t)[n])
Definition: parar.h:362
Archive & local_archive() const
Returns a reference to local archive ... throws if not an IO node.
Definition: parar.h:185
ProcessID rank() const
Returns the process rank in this world (same as MPI_Comm_rank()))
Definition: worldfwd.h:526
World * get_world() const
Returns pointer to the world.
Definition: parar.h:98
static void preamble_load(const ParallelInputArchive &ar)
Definition: parar.h:287
static const bool is_parallel_archive
Definition: parar.h:67
static void preamble_store(const ParallelOutputArchive &ar)
Definition: parar.h:280
static void postamble_store(const ParallelOutputArchive &ar)
Definition: parar.h:281
void broadcast(void *buf, size_t nbyte, ProcessID root, bool dowork=true)
Broadcasts bytes from process root while still processing AM & tasks.
Definition: worldgop.cc:145
void set_dofence(bool dofence)
Definition: parar.h:223
Implements global operations.
void flush()
Definition: parar.h:250
Implements World.
enable_if from Boost for conditionally instantiating templates based on type
Definition: enable_if.h:60
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
An archive for storing local or parallel data wrapping BinaryFstreamInputArchive. ...
Definition: parar.h:266
static bool exists(World &world, const char *filename)
Returns true if the named, unopened archive exists on disk with read access ... collective.
Definition: parar.h:165
static madness::disable_if< is_derived_from< Q, ParallelSerializableObject >, const ParallelOutputArchive & >::type wrap_store(const ParallelOutputArchive &ar, const Q &t)
Serial objects write only from process 0.
Definition: parar.h:307
ProcessID my_io_node() const
Returns the process doing IO for this node.
Definition: parar.h:80