MADNESS  version 0.9
dqueue.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31 
32  $Id$
33 */
34 
35 
36 #ifndef MADNESS_WORLD_DQUEUE_H__INCLUDED
37 #define MADNESS_WORLD_DQUEUE_H__INCLUDED
38 
40 #include <cstddef>
41 #include <utility>
42 #include <algorithm>
43 #include <iostream>
44 #include <stdint.h>
45 
48 
49 namespace madness {
50 
51  struct DQStats { // Dilly bar, blizzard, ...
52  uint64_t npush_back;
53  uint64_t npush_front;
54  uint64_t npop_front;
55  uint64_t ngrow;
56  uint64_t nmax;
57 
59  : npush_back(0), npush_front(0), npop_front(0), ngrow(0), nmax(0) {}
60  };
61 
62 
64 
71  template <typename T>
72  class DQueue : private CONDITION_VARIABLE_TYPE {
73  char pad[64];
74  volatile size_t n __attribute__((aligned(64)));
75  volatile size_t sz;
76  volatile T* volatile buf;
77  volatile int _front;
78  volatile int _back;
79  DQStats stats;
80 
81  void grow() {
82  // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
83  ++(stats.ngrow);
84  if (sz != n) MADNESS_EXCEPTION("assertion failure in dqueue::grow", static_cast<int>(sz));
85  size_t oldsz = sz;
86  if (sz < 32768)
87  sz = 65536;
88  else if (sz <= 1048576)
89  sz *= 2;
90  else
91  sz += 1048576;
92  volatile T* volatile nbuf = new T[sz];
93  int lo = sz/2 - oldsz/2;
94  for (int i=_front; i<int(oldsz); ++i,++lo) {
95  nbuf[lo] = buf[i];
96  }
97  if (_front > 0) {
98  for (int i=0; i<=_back; ++i,++lo) {
99  nbuf[lo] = buf[i];
100  }
101  }
102  _front = sz/2 - oldsz/2;
103  _back = _front + n - 1;
104  buf = nbuf;
105  //sanity_check();
106  }
107 
108  void sanity_check() const {
109  // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
110  int num = _back - _front + 1;
111  if (num < 0) num += sz;
112  if (num==int(sz) && n==0) num=0;
113  if (num==0 && n==sz) num=sz;
114  //if (long(n) != num) print("size",sz,"front",_front,"back",_back,"n",n,"num",num);
115  MADNESS_ASSERT(long(n) == num);
116  }
117 
118  void push_back_with_lock(const T& value) {
119  size_t nn = n;
120  size_t ss = sz;
121  if (nn == ss) {
122  grow();
123  ss = sz;
124  }
125  ++nn;
126  if (nn > stats.nmax) stats.nmax = nn;
127  n = nn;
128 
129  int b = _back + 1;
130  if (b >= int(ss)) b = 0;
131  buf[b] = value;
132  _back = b;
133  ++(stats.npush_back);
134 
135  signal();
136  }
137 
138 
139  public:
140  DQueue(size_t hint=200000) // was 32768
141  : n(0)
142  , sz(hint>2 ? hint : 2)
143  , buf(new T[sz])
144  , _front(sz/2)
145  , _back(_front-1) {}
146 
147  virtual ~DQueue() {
148  delete buf;
149  }
150 
152  void push_front(const T& value) {
154  //sanity_check();
155 
156  size_t nn = n;
157  size_t ss = sz;
158  if (nn == ss) {
159  grow();
160  ss = sz;
161  }
162  ++nn;
163  if (nn > stats.nmax) stats.nmax = nn;
164  n = nn;
165 
166  int f = _front - 1;
167  if (f < 0) f = ss - 1;
168  buf[f] = value;
169  _front = f;
170  ++(stats.npush_front);
171 
172  //sanity_check();
173  signal();
174  //broadcast();
175  }
176 
178  void push_back(const T& value, int ncopy=1) {
180  //sanity_check();
181  while (ncopy--)
182  push_back_with_lock(value);
183  //sanity_check();
184  //broadcast();
185  }
186 
187  template <typename opT>
188  void scan(opT& op) {
190 
191  int f = _front;
192  size_t nn = n;
193  int size = int(sz);
194  std::cout << "IN Q " << nn << std::endl;
195 
196  while (nn--) {
197  T* p = const_cast<T*>(buf + f);
198  if (!op(p)) break;
199  ++f;
200  if (f >= size) f = 0;
201  }
202  }
203 
205 
210  int pop_front(int nmax, T* r, bool wait) {
212 
213  size_t nn = n;
214 
215  if (nn==0 && wait) {
216  while (n == 0) // !!! Must be n (memory) not nn (local copy)
218 
219  nn = n;
220  }
221 
222  ++(stats.npop_front);
223  if (nn) {
224  size_t thesize = sz;
225  //sanity_check();
226 
227  nmax = std::min(nmax,std::max(int(nn>>6),1));
228  int retval; // Will return the number of items taken
229 
230 
231  int f = _front;
232 
233  // Original loop was this
234  //retval = nmax;
235  //while (nmax--) {
236  // *r++ = buf[f++];
237  // if (f >= int(sz)) f = 0;
238  //}
239 
240  // New loop includes checking for replicated multi-threaded task
241  // ... take one task and then check that subsequent tasks differ
242  nmax--;
243  *r++ = buf[f++];
244  if (f >= int(thesize)) f = 0;
245  retval=1;
246  while (nmax--) {
247  T ptr = buf[f];
248  if (ptr == *(r-1)) {
249  break;
250  }
251  else if (ptr) { // Null pointer indicates stolen task
252  *r++ = ptr;
253  ++f;
254  if (f >= int(thesize)) f = 0;
255  ++retval;
256  }
257  }
258 
259  n = nn - retval;
260  _front = f;
261 
262  //sanity_check();
263  return retval;
264  }
265  else {
266  return 0;
267  }
268  }
269 
271  std::pair<T,bool> pop_front(bool wait) {
272  T r;
273  int ngot = pop_front(1, &r, wait);
274  return std::pair<T,bool>(r,ngot==1);
275  }
276 
277  size_t size() const {
278  return n;
279  }
280 
281  bool empty() const {
282  return n==0;
283  }
284 
285  const DQStats& get_stats() const {
286  return stats;
287  }
288  };
289 
290 }
291 
292 #endif // MADNESS_WORLD_DQUEUE_H__INCLUDED
Mutex that is applied/released at start/end of a scope.
Definition: worldmutex.h:186
virtual ~DQueue()
Definition: dqueue.h:147
uint64_t npop_front
#calls to pop_front
Definition: dqueue.h:54
const DQStats & get_stats() const
Definition: dqueue.h:285
Simple wrapper for Pthread condition variable with its own mutex.
Definition: worldmutex.h:566
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero...
Definition: dqueue.h:210
void signal() const
Definition: worldmutex.h:596
std::pair< T, bool > pop_front(bool wait)
Pop value off the front of queue.
Definition: dqueue.h:271
DQueue(size_t hint=200000)
Definition: dqueue.h:140
void push_back(const T &value, int ncopy=1)
Insert element at back of queue (default is just one copy)
Definition: dqueue.h:178
bool empty() const
Definition: dqueue.h:281
Definition: dqueue.h:51
NDIM & f
Definition: mra.h:2179
void push_front(const T &value)
Insert value at front of queue.
Definition: dqueue.h:152
void scan(opT &op)
Definition: dqueue.h:188
void wait() const
You should have acquired the mutex before entering here.
Definition: worldmutex.h:592
DQStats()
Definition: dqueue.h:58
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
#define max(a, b)
Definition: lda.h:53
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:72
const mpreal min(const mpreal &x, const mpreal &y)
Definition: mpreal.h:2675
Implements Mutex, MutexFair, Spinlock, ConditionVariable.
uint64_t ngrow
#calls to grow
Definition: dqueue.h:55
uint64_t npush_front
#calls to push_front
Definition: dqueue.h:53
size_t size() const
Definition: dqueue.h:277
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
uint64_t npush_back
#calls to push_back
Definition: dqueue.h:52
#define MADNESS_EXCEPTION(msg, value)
Definition: worldexc.h:88
uint64_t nmax
Lifetime max. entries in the queue.
Definition: dqueue.h:56
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
FLOAT b(int j, FLOAT z)
Definition: y1.cc:79