Raven Core  3.0.0
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Copyright (c) 2017-2019 The Raven Core developers
3 // Distributed under the MIT software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #include "scheduler.h"
7 
8 #include "random.h"
9 #include "reverselock.h"
10 
11 #include <assert.h>
12 #include <boost/bind.hpp>
13 #include <utility>
14 
15 CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
16 {
17 }
18 
20 {
21  assert(nThreadsServicingQueue == 0);
22 }
23 
24 
25 #if BOOST_VERSION < 105000
26 static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
27 {
28  // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
29  // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
30  return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
31 }
32 #endif
33 
35 {
36  boost::unique_lock<boost::mutex> lock(newTaskMutex);
38 
39  // newTaskMutex is locked throughout this loop EXCEPT
40  // when the thread is waiting or when the user's function
41  // is called.
42  while (!shouldStop()) {
43  try {
44  if (!shouldStop() && taskQueue.empty()) {
46  // Use this chance to get a tiny bit more entropy
48  }
49  while (!shouldStop() && taskQueue.empty()) {
50  // Wait until there is something to do.
51  newTaskScheduled.wait(lock);
52  }
53 
54  // Wait until either there is a new task, or until
55  // the time of the first item on the queue:
56 
57 // wait_until needs boost 1.50 or later; older versions have timed_wait:
58 #if BOOST_VERSION < 105000
59  while (!shouldStop() && !taskQueue.empty() &&
60  newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
61  // Keep waiting until timeout
62  }
63 #else
64  // Some boost versions have a conflicting overload of wait_until that returns void.
65  // Explicitly use a template here to avoid hitting that overload.
66  while (!shouldStop() && !taskQueue.empty()) {
67  boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
68  if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
69  break; // Exit loop after timeout, it means we reached the time of the event
70  }
71 #endif
72  // If there are multiple threads, the queue can empty while we're waiting (another
73  // thread may service the task we were waiting on).
74  if (shouldStop() || taskQueue.empty())
75  continue;
76 
77  Function f = taskQueue.begin()->second;
78  taskQueue.erase(taskQueue.begin());
79 
80  {
81  // Unlock before calling f, so it can reschedule itself or another task
82  // without deadlocking:
84  f();
85  }
86  } catch (...) {
88  throw;
89  }
90  }
92  newTaskScheduled.notify_one();
93 }
94 
95 void CScheduler::stop(bool drain)
96 {
97  {
98  boost::unique_lock<boost::mutex> lock(newTaskMutex);
99  if (drain)
100  stopWhenEmpty = true;
101  else
102  stopRequested = true;
103  }
104  newTaskScheduled.notify_all();
105 }
106 
107 void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
108 {
109  {
110  boost::unique_lock<boost::mutex> lock(newTaskMutex);
111  taskQueue.insert(std::make_pair(t, f));
112  }
113  newTaskScheduled.notify_one();
114 }
115 
116 void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
117 {
118  schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
119 }
120 
121 static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
122 {
123  f();
124  s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
125 }
126 
127 void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
128 {
129  scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
130 }
131 
132 size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
133  boost::chrono::system_clock::time_point &last) const
134 {
135  boost::unique_lock<boost::mutex> lock(newTaskMutex);
136  size_t result = taskQueue.size();
137  if (!taskQueue.empty()) {
138  first = taskQueue.begin()->first;
139  last = taskQueue.rbegin()->first;
140  }
141  return result;
142 }
143 
145  boost::unique_lock<boost::mutex> lock(newTaskMutex);
146  return nThreadsServicingQueue;
147 }
148 
149 
151  {
152  LOCK(m_cs_callbacks_pending);
153  // Try to avoid scheduling too many copies here, but if we
154  // accidentally have two ProcessQueue's scheduled at once its
155  // not a big deal.
156  if (m_are_callbacks_running) return;
157  if (m_callbacks_pending.empty()) return;
158  }
159  m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
160 }
161 
163  std::function<void (void)> callback;
164  {
165  LOCK(m_cs_callbacks_pending);
166  if (m_are_callbacks_running) return;
167  if (m_callbacks_pending.empty()) return;
168  m_are_callbacks_running = true;
169 
170  callback = std::move(m_callbacks_pending.front());
171  m_callbacks_pending.pop_front();
172  }
173 
174  // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
175  // to ensure both happen safely even if callback() throws.
176  struct RAIICallbacksRunning {
178  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
179  ~RAIICallbacksRunning() {
180  {
181  LOCK(instance->m_cs_callbacks_pending);
182  instance->m_are_callbacks_running = false;
183  }
184  instance->MaybeScheduleProcessQueue();
185  }
186  } raiicallbacksrunning(this);
187 
188  callback();
189 }
190 
191 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
192  assert(m_pscheduler);
193 
194  {
195  LOCK(m_cs_callbacks_pending);
196  m_callbacks_pending.emplace_back(std::move(func));
197  }
198  MaybeScheduleProcessQueue();
199 }
200 
202  assert(!m_pscheduler->AreThreadsServicingQueue());
203  bool should_continue = true;
204  while (should_continue) {
205  ProcessQueue();
206  LOCK(m_cs_callbacks_pending);
207  should_continue = !m_callbacks_pending.empty();
208  }
209 }
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:94
bool stopWhenEmpty
Definition: scheduler.h:84
void RandAddSeedSleep()
Add a little bit of randomness to the output of GetStrongRangBytes.
Definition: random.cpp:283
std::multimap< boost::chrono::system_clock::time_point, Function > taskQueue
Definition: scheduler.h:79
bool shouldStop() const
Definition: scheduler.h:85
void scheduleEvery(Function f, int64_t deltaMilliSeconds)
Definition: scheduler.cpp:127
void scheduleFromNow(Function f, int64_t deltaMilliSeconds)
Definition: scheduler.cpp:116
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now())
Definition: scheduler.cpp:107
void stop(bool drain=false)
Definition: scheduler.cpp:95
#define LOCK(cs)
Definition: sync.h:176
std::function< void(void)> Function
Definition: scheduler.h:44
boost::condition_variable newTaskScheduled
Definition: scheduler.h:80
void serviceQueue()
Definition: scheduler.cpp:34
An RAII-style reverse lock.
Definition: reverselock.h:13
boost::mutex newTaskMutex
Definition: scheduler.h:81
bool stopRequested
Definition: scheduler.h:83
CCriticalSection m_cs_callbacks_pending
Definition: scheduler.h:98
bool AreThreadsServicingQueue() const
Definition: scheduler.cpp:144
int nThreadsServicingQueue
Definition: scheduler.h:82
size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const
Definition: scheduler.cpp:132
void AddToProcessQueue(std::function< void(void)> func)
Definition: scheduler.cpp:191