Raven Core  3.0.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2015 The Bitcoin Core developers
2 // Copyright (c) 2017-2019 The Raven Core developers
3 // Distributed under the MIT software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #ifndef RAVEN_CHECKQUEUE_H
7 #define RAVEN_CHECKQUEUE_H
8 
9 #include "sync.h"
10 
11 #include <algorithm>
12 #include <vector>
13 
14 #include <boost/thread/condition_variable.hpp>
15 #include <boost/thread/mutex.hpp>
16 
17 template <typename T>
19 
30 template <typename T>
32 {
33 private:
35  boost::mutex mutex;
36 
38  boost::condition_variable condWorker;
39 
41  boost::condition_variable condMaster;
42 
45  std::vector<T> queue;
46 
48  int nIdle;
49 
51  int nTotal;
52 
54  bool fAllOk;
55 
61  unsigned int nTodo;
62 
64  bool fQuit;
65 
67  unsigned int nBatchSize;
68 
70  bool Loop(bool fMaster = false)
71  {
72  boost::condition_variable& cond = fMaster ? condMaster : condWorker;
73  std::vector<T> vChecks;
74  vChecks.reserve(nBatchSize);
75  unsigned int nNow = 0;
76  bool fOk = true;
77  do {
78  {
79  boost::unique_lock<boost::mutex> lock(mutex);
80  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
81  if (nNow) {
82  fAllOk &= fOk;
83  nTodo -= nNow;
84  if (nTodo == 0 && !fMaster)
85  // We processed the last element; inform the master it can exit and return the result
86  condMaster.notify_one();
87  } else {
88  // first iteration
89  nTotal++;
90  }
91  // logically, the do loop starts here
92  while (queue.empty()) {
93  if ((fMaster || fQuit) && nTodo == 0) {
94  nTotal--;
95  bool fRet = fAllOk;
96  // reset the status for new work later
97  if (fMaster)
98  fAllOk = true;
99  // return the current status
100  return fRet;
101  }
102  nIdle++;
103  cond.wait(lock); // wait
104  nIdle--;
105  }
106  // Decide how many work units to process now.
107  // * Do not try to do everything at once, but aim for increasingly smaller batches so
108  // all workers finish approximately simultaneously.
109  // * Try to account for idle jobs which will instantly start helping.
110  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
111  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
112  vChecks.resize(nNow);
113  for (unsigned int i = 0; i < nNow; i++) {
114  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
115  // queue to the local batch vector instead of copying.
116  vChecks[i].swap(queue.back());
117  queue.pop_back();
118  }
119  // Check whether we need to do work at all
120  fOk = fAllOk;
121  }
122  // execute work
123  for (T& check : vChecks)
124  if (fOk)
125  fOk = check();
126  vChecks.clear();
127  } while (true);
128  }
129 
130 public:
132  boost::mutex ControlMutex;
133 
135  explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
136 
138  void Thread()
139  {
140  Loop();
141  }
142 
144  bool Wait()
145  {
146  return Loop(true);
147  }
148 
150  void Add(std::vector<T>& vChecks)
151  {
152  boost::unique_lock<boost::mutex> lock(mutex);
153  for (T& check : vChecks) {
154  queue.push_back(T());
155  check.swap(queue.back());
156  }
157  nTodo += vChecks.size();
158  if (vChecks.size() == 1)
159  condWorker.notify_one();
160  else if (vChecks.size() > 1)
161  condWorker.notify_all();
162  }
163 
165  {
166  }
167 
168 };
169 
174 template <typename T>
175 class CCheckQueueControl
176 {
177 private:
179  bool fDone;
180 
181 public:
182  CCheckQueueControl() = delete;
183  CCheckQueueControl(const CCheckQueueControl&) = delete;
184  CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
185  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
186  {
187  // passed queue is supposed to be unused, or nullptr
188  if (pqueue != nullptr) {
190  }
191  }
192 
193  bool Wait()
194  {
195  if (pqueue == nullptr)
196  return true;
197  bool fRet = pqueue->Wait();
198  fDone = true;
199  return fRet;
200  }
201 
202  void Add(std::vector<T>& vChecks)
203  {
204  if (pqueue != nullptr)
205  pqueue->Add(vChecks);
206  }
207 
209  {
210  if (!fDone)
211  Wait();
212  if (pqueue != nullptr) {
214  }
215  }
216 };
217 
218 #endif // RAVEN_CHECKQUEUE_H
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:202
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:38
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:35
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:41
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:70
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:185
void Thread()
Worker thread.
Definition: checkqueue.h:138
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:18
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:135
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:45
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:54
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:186
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:51
Queue for verifications that have to be performed.
Definition: checkqueue.h:31
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:178
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:180
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:144
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:48
bool fQuit
Whether we&#39;re shutting down.
Definition: checkqueue.h:64
unsigned int nTodo
Number of verifications that haven&#39;t completed yet.
Definition: checkqueue.h:61
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:150
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:67
boost::mutex ControlMutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:132