MADNESS  version 0.9
madness/tensor/systolic.h
Go to the documentation of this file.
1 #ifndef MADNESS_SYSTOLIC_H
2 #define MADNESS_SYSTOLIC_H
3 
4 /*
5  This file is part of MADNESS.
6 
7  Copyright (C) 2007,2010 Oak Ridge National Laboratory
8 
9  This program is free software; you can redistribute it and/or modify
10  it under the terms of the GNU General Public License as published by
11  the Free Software Foundation; either version 2 of the License, or
12  (at your option) any later version.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 
23  For more information please contact:
24 
25  Robert J. Harrison
26  Oak Ridge National Laboratory
27  One Bethel Valley Road
28  P.O. Box 2008, MS-6367
29 
30  email: harrisonrj@ornl.gov
31  tel: 865-241-3937
32  fax: 865-572-0680
33 
34  $Id$
35 */
36 
37 #include <madness/world/world.h>
38 #include <utility>
39 #include <madness/tensor/tensor.h>
41 
42 namespace madness {
43 
45  template <typename T>
46  class SystolicMatrixAlgorithm : public TaskInterface {
47  private:
48  DistributedMatrix<T>& A;
49  const int64_t nproc; //< No. of processes with rows of the matrix (not size of world)
50  const int64_t coldim; //< A(coldim,rowdim)
51  const int64_t rowdim; //< A(coldim,rowdim)
52  const int64_t nlocal; //< No. of local pairs
53  const ProcessID rank; //< Rank of current process
54  const int tag; //< MPI tag to be used for messages
55  std::vector<T*> iptr, jptr; //< Indirection for implementing cyclic buffer !! SHOULD BE VOLATILE ?????
56  std::vector<int64_t> map; //< Used to keep track of actual row indices
57 
58  void iteration(const TaskThreadEnv& env) {
59 
60  env.barrier();
62  env.barrier();
63 
64  if (nlocal > 0) {
65  int64_t ilo, ihi;
66  A.local_colrange(ilo, ihi);
67 
68  int neven = coldim + (coldim&0x1);
69 
70  int pairlo = rank*A.coltile()/2;
71 
72  int threadid = env.id();
73  int nthread = env.nthread();
74 
75  for (int loop=0; loop<(neven-1); ++loop) {
76 
77  // This loop is parallelized over threads
78  for (int pair=env.id(); pair<nlocal; pair+=nthread) {
79 
80  int rp = neven/2-1-(pair+pairlo);
81  int iii = (rp+loop)%(neven-1);
82  int jjj = (2*neven-2-rp+loop)%(neven-1);
83  if (rp == 0) jjj = neven-1;
84 
85  iii = map[iii];
86  jjj = map[jjj];
87 
88  if (jptr[pair]) {
89  kernel(iii, jjj, iptr[pair], jptr[pair]);
90  }
91  }
92  env.barrier();
93 
94  if (threadid == 0) cycle();
95 
96  env.barrier();
97  }
98  }
99 
100  end_iteration_hook(env);
101 
102  env.barrier();
103  }
104 
106 
113  void unshuffle() {
114  if (nlocal <= 0) return;
115  Tensor<T>& t = A.data();
116  Tensor<T> tmp(2L, t.dims(), false);
117  T* tp = tmp.ptr();
118  for (int64_t i=0; i<nlocal; ++i) {
119  memcpy(tp+i*rowdim, iptr[i], rowdim*sizeof(T));
120  if (jptr[i]) {
121  memcpy(tp+(i+nlocal)*rowdim, jptr[i], rowdim*sizeof(T));
122  }
123  iptr[i] = &t(i,0);
124  jptr[i] = &t(i+nlocal,0);
125  }
126  memcpy(t.ptr(), tmp.ptr(), t.size()*sizeof(T));
127 
128  if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
129  }
130 
132  void cycle() {
133  if (coldim <= 2) return; // No cycling necessary
134  if (nlocal <= 0) { // Nothing local
135  MADNESS_ASSERT(rank >= nproc);
136  return;
137  }
138 
139  // Check assumption that tiling put incomplete tile at the end
140  MADNESS_ASSERT(A.local_coldim() == A.coltile() || rank == (nproc-1));
141 
142  const ProcessID left = rank-1; //Invalid values are not used
143  const ProcessID right = rank+1;
144 
145  /*
146  Consider matrix (10,*) distributed with coltile=4 over
147  three processors.
148 
149  . 0 1 2 3 4 5 6 7 8 9
150 
151  This is divided up as follows into this initial
152  configuration for the loop
153 
154  . P=0 P=1 P=2
155  . msg msg
156  . i -->0-->1 --> 4-->5 --> 8 -->
157  . ^ | msg
158  . | <---------
159  . j <--2<--3 <-- 6<--7 <--| 9
160  . msg msg
161 
162  The first and last processes in the loop have to wrap ... others
163  just pass left and right. Note that 9 stays put.
164 
165  Note that the algorithm is assuming distribution puts equal
166  amount of data on all nodes except the last.
167 
168  The i data is considered as flowing to the right.
169  The j data is considered as flowing to the left.
170 
171 
172  Hence, we should explore the pairs in this order
173  (n-1 sets of n/2 pairs)
174 
175  . P=0 P=1 P=2
176  . 0 1 4 5 8
177  . 2 3 6 7 9
178 
179  . 2 0 1 4 5
180  . 3 6 7 8 9
181 
182  . 3 2 0 1 4
183  . 6 7 8 5 9
184 
185  . 6 3 2 0 1
186  . 7 8 5 4 9
187 
188  . 7 6 3 2 0
189  . 8 5 4 1 9
190 
191  . 8 7 6 3 2
192  . 5 4 1 0 9
193 
194  . 5 8 7 6 3
195  . 4 1 0 2 9
196 
197  . 4 5 8 7 6
198  . 1 0 2 3 9
199 
200  . 1 4 5 8 7
201  . 0 2 3 6 9
202  */
203 
204  // Copy end elements before they are overwritten
205  T* ilast = iptr[nlocal-1];
206  T* jfirst = jptr[0];
207 
208  // Cycle local pointers
209  for (int64_t i=0; i<nlocal-1; ++i) {
210  iptr[nlocal-i-1] = iptr[nlocal-i-2];
211  jptr[i] = jptr[i+1];
212  }
213 
214  World& world = A.get_world();
215 
216  if (nproc == 1) {
217  iptr[0] = jfirst;
218  jptr[nlocal-2] = ilast;
219  }
220  else if (rank == 0) {
221  iptr[0] = jfirst;
222  world.mpi.Send(ilast, rowdim, right, tag);
223  jptr[nlocal-1] = ilast;
224  world.mpi.Recv(ilast, rowdim, right, tag);
225  }
226  else if (rank == (nproc-1)) {
227  if (nlocal > 1) {
228  iptr[0] = jfirst;
229  jptr[nlocal-2] = ilast;
230  }
231  std::vector<T> buf(rowdim);
232  SafeMPI::Request req = world.mpi.Irecv(&buf[0], rowdim, left, tag);
233  world.mpi.Send(iptr[0], rowdim, left, tag);
234  world.await(req,false);
235  std::memcpy(iptr[0], &buf[0], rowdim*sizeof(T));
236  }
237  else {
238  std::vector<T> buf1(rowdim);
239  std::vector<T> buf2(rowdim);
240  SafeMPI::Request req1 = world.mpi.Irecv(&buf1[0], rowdim, left, tag);
241  SafeMPI::Request req2 = world.mpi.Irecv(&buf2[0], rowdim, right, tag);
242  world.mpi.Send( ilast, rowdim, right, tag);
243  world.mpi.Send(jfirst, rowdim, left, tag);
244  world.await(req1,false);
245  world.await(req2,false);
246  std::memcpy(ilast, &buf2[0], rowdim*sizeof(T)); //world.mpi.Recv( ilast, rowdim, right, tag);
247  std::memcpy(jfirst, &buf1[0], rowdim*sizeof(T)); //world.mpi.Recv(jfirst, rowdim, left, tag);
248 
249  iptr[0] = jfirst;
250  jptr[nlocal-1] = ilast;
251  }
252  }
253 
255 
257  virtual void get_id(std::pair<void*,unsigned short>& id) const {
258  PoolTaskInterface::make_id(id, *this);
259  }
260 
261  public:
263 
269  : A(A)
270  , nproc(A.process_coldim()*A.process_rowdim())
271  , coldim(A.coldim())
272  , rowdim(A.rowdim())
273  , nlocal((A.local_coldim()+1)/2)
274  , rank(A.get_world().rank())
275  , tag(tag)
276  , iptr(nlocal)
277  , jptr(nlocal)
278  , map(coldim+(coldim&0x1))
279  {
281 
282  MADNESS_ASSERT(A.is_column_distributed() && (nproc==1 || (A.coltile()&0x1)==0));
283 
284  // Initialize vectors of pointers to matrix rows)
285  Tensor<T>& t = A.data();
286 
287  //madness::print(nproc, coldim, rowdim, nlocal, rank, tag);
288 
289  for (int64_t i=0; i<nlocal; ++i) {
290  iptr[i] = &t(i,0);
291  jptr[i] = &t(i+nlocal,0);
292  }
293 
294  // If no. of rows is odd, last process should have an empty last row
295  if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
296 
297  // Initialize map from logical index order to actual index order
298 
299  int neven = (coldim+1)/2;
300  int ii=0;
301  for (ProcessID p=0; p<nproc; ++p) {
302  int64_t lo, hi;
303  A.get_colrange(p, lo, hi);
304  int p_nlocal = (hi - lo + 2)/2;
305  //print("I think process",p,"has",lo,hi,p_nlocal);
306  for (int i=0; i<p_nlocal; ++i) {
307  map[ii+i] = lo+i;
308  //map[coldim-ii-nlocal+i] = lo+i+nlocal;
309  map[ii+i+neven] = lo+i+p_nlocal;
310  }
311  ii += p_nlocal;
312  }
313 
314  std::reverse(map.begin(),map.begin()+neven);
315 
316  //print("MAP", map);
317  }
318 
320 
322 
327  virtual void kernel(int i, int j, T* rowi, T* rowj) = 0;
328 
329 
331 
334  virtual bool converged(const TaskThreadEnv& env) const = 0;
335 
336 
338 
341  virtual void start_iteration_hook(const TaskThreadEnv& env) {}
342 
343 
345 
349  virtual void end_iteration_hook(const TaskThreadEnv& env) {}
350 
352 
354  void run(World& world, const TaskThreadEnv& env) {
355  do {
356  iteration(env);
357  } while (!converged(env));
358 
359  if (env.id() == 0) unshuffle();
360 
361  env.barrier();
362  }
363 
364 
366 
369  run(A.get_world(), TaskThreadEnv(1,0,0));
370  }
371 
373  int64_t get_rowdim() const {return rowdim;}
374 
375 
377  int64_t get_coldim() const {return coldim;}
378 
380  World& get_world() const {
381  return A.get_world();
382  }
383 
385  ProcessID get_rank() const {
386  return rank;
387  }
388  };
389 }
390 
391 #endif
int id() const
Definition: worldthread.h:309
virtual void start_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the start of each iteration.
Definition: apps/ii/systolic.h:593
virtual void end_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the end of each iteration.
Definition: apps/ii/systolic.h:599
int64_t get_rowdim() const
Returns length of row.
Definition: madness/tensor/systolic.h:373
void get_colrange(int p, int64_t &ilow, int64_t &ihigh) const
Returns the inclusive range of column indices on processor p.
Definition: apps/ii/systolic.h:137
Used to pass info about thread environment into users task.
Definition: worldthread.h:289
bool barrier() const
Definition: worldthread.h:311
World & get_world() const
Returns associated world.
Definition: apps/ii/systolic.h:152
Tensor< T > & data()
Returns reference to data.
Definition: apps/ii/systolic.h:155
void solve_sequential()
Invoked by the user to run the algorithm with one thread mostly for debugging.
Definition: madness/tensor/systolic.h:368
World & get_world() const
Returns a reference to the world.
Definition: apps/ii/systolic.h:631
void run(World &world, const TaskThreadEnv &env)
Invoked by the task queue to run the algorithm with multiple threads.
Definition: madness/tensor/systolic.h:354
This header should include pretty much everything needed for the parallel runtime.
virtual ~SystolicMatrixAlgorithm()
Definition: madness/tensor/systolic.h:319
Defines and implements most of Tensor.
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
static std::size_t size()
Returns number of threads in the pool.
Definition: worldthread.h:1040
ProcessID get_rank() const
Returns rank of this process in the world.
Definition: madness/tensor/systolic.h:385
A parallel world with full functionality wrapping an MPI communicator.
Definition: worldfwd.h:416
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:37
virtual void kernel(int i, int j, T *rowi, T *rowj)=0
Threadsafe routine to apply the operation to rows i and j of the matrix.
Definition: safempi.h:243
int64_t get_coldim() const
Returns length of column.
Definition: madness/tensor/systolic.h:377
static enable_if_c< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: worldthread.h:680
bool is_column_distributed() const
Returns true if the matrix is column distributed (i.e., row dimension not distributed) ...
Definition: apps/ii/systolic.h:161
int64_t coltile() const
Returns the column tile size.
Definition: apps/ii/systolic.h:86
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: worldthread.h:769
SystolicMatrixAlgorithm(DistributedMatrix< T > &A, int tag, int nthread=ThreadPool::size()+1)
A must be a column distributed matrix with an even column tile >= 2.
Definition: madness/tensor/systolic.h:268
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
virtual bool converged(const TaskThreadEnv &env) const =0
Invoked simultaneously by all threads after each sweep to test for convergence.
Manages data associated with a row/column/block distributed array.
Definition: apps/ii/systolic.h:51