You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduler.cpp 7.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // Copyright (c) 2015-2016 The Starwels developers
  2. // Distributed under the MIT software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #include "scheduler.h"
  5. #include "random.h"
  6. #include "reverselock.h"
  7. #include <assert.h>
  8. #include <boost/bind.hpp>
  9. #include <utility>
  10. CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
  11. {
  12. }
  13. CScheduler::~CScheduler()
  14. {
  15. assert(nThreadsServicingQueue == 0);
  16. }
  17. #if BOOST_VERSION < 105000
  18. static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
  19. {
  20. // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
  21. // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
  22. return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
  23. }
  24. #endif
  25. void CScheduler::serviceQueue()
  26. {
  27. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  28. ++nThreadsServicingQueue;
  29. // newTaskMutex is locked throughout this loop EXCEPT
  30. // when the thread is waiting or when the user's function
  31. // is called.
  32. while (!shouldStop()) {
  33. try {
  34. if (!shouldStop() && taskQueue.empty()) {
  35. reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
  36. // Use this chance to get a tiny bit more entropy
  37. RandAddSeedSleep();
  38. }
  39. while (!shouldStop() && taskQueue.empty()) {
  40. // Wait until there is something to do.
  41. newTaskScheduled.wait(lock);
  42. }
  43. // Wait until either there is a new task, or until
  44. // the time of the first item on the queue:
  45. // wait_until needs boost 1.50 or later; older versions have timed_wait:
  46. #if BOOST_VERSION < 105000
  47. while (!shouldStop() && !taskQueue.empty() &&
  48. newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
  49. // Keep waiting until timeout
  50. }
  51. #else
  52. // Some boost versions have a conflicting overload of wait_until that returns void.
  53. // Explicitly use a template here to avoid hitting that overload.
  54. while (!shouldStop() && !taskQueue.empty()) {
  55. boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
  56. if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
  57. break; // Exit loop after timeout, it means we reached the time of the event
  58. }
  59. #endif
  60. // If there are multiple threads, the queue can empty while we're waiting (another
  61. // thread may service the task we were waiting on).
  62. if (shouldStop() || taskQueue.empty())
  63. continue;
  64. Function f = taskQueue.begin()->second;
  65. taskQueue.erase(taskQueue.begin());
  66. {
  67. // Unlock before calling f, so it can reschedule itself or another task
  68. // without deadlocking:
  69. reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
  70. f();
  71. }
  72. } catch (...) {
  73. --nThreadsServicingQueue;
  74. throw;
  75. }
  76. }
  77. --nThreadsServicingQueue;
  78. newTaskScheduled.notify_one();
  79. }
  80. void CScheduler::stop(bool drain)
  81. {
  82. {
  83. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  84. if (drain)
  85. stopWhenEmpty = true;
  86. else
  87. stopRequested = true;
  88. }
  89. newTaskScheduled.notify_all();
  90. }
  91. void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
  92. {
  93. {
  94. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  95. taskQueue.insert(std::make_pair(t, f));
  96. }
  97. newTaskScheduled.notify_one();
  98. }
  99. void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
  100. {
  101. schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
  102. }
  103. static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
  104. {
  105. f();
  106. s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
  107. }
  108. void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
  109. {
  110. scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
  111. }
  112. size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
  113. boost::chrono::system_clock::time_point &last) const
  114. {
  115. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  116. size_t result = taskQueue.size();
  117. if (!taskQueue.empty()) {
  118. first = taskQueue.begin()->first;
  119. last = taskQueue.rbegin()->first;
  120. }
  121. return result;
  122. }
  123. bool CScheduler::AreThreadsServicingQueue() const {
  124. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  125. return nThreadsServicingQueue;
  126. }
  127. void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
  128. {
  129. LOCK(m_cs_callbacks_pending);
  130. // Try to avoid scheduling too many copies here, but if we
  131. // accidentally have two ProcessQueue's scheduled at once its
  132. // not a big deal.
  133. if (m_are_callbacks_running) return;
  134. if (m_callbacks_pending.empty()) return;
  135. }
  136. m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
  137. }
  138. void SingleThreadedSchedulerClient::ProcessQueue() {
  139. std::function<void (void)> callback;
  140. {
  141. LOCK(m_cs_callbacks_pending);
  142. if (m_are_callbacks_running) return;
  143. if (m_callbacks_pending.empty()) return;
  144. m_are_callbacks_running = true;
  145. callback = std::move(m_callbacks_pending.front());
  146. m_callbacks_pending.pop_front();
  147. }
  148. // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
  149. // to ensure both happen safely even if callback() throws.
  150. struct RAIICallbacksRunning {
  151. SingleThreadedSchedulerClient* instance;
  152. RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
  153. ~RAIICallbacksRunning() {
  154. {
  155. LOCK(instance->m_cs_callbacks_pending);
  156. instance->m_are_callbacks_running = false;
  157. }
  158. instance->MaybeScheduleProcessQueue();
  159. }
  160. } raiicallbacksrunning(this);
  161. callback();
  162. }
  163. void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
  164. assert(m_pscheduler);
  165. {
  166. LOCK(m_cs_callbacks_pending);
  167. m_callbacks_pending.emplace_back(std::move(func));
  168. }
  169. MaybeScheduleProcessQueue();
  170. }
  171. void SingleThreadedSchedulerClient::EmptyQueue() {
  172. assert(!m_pscheduler->AreThreadsServicingQueue());
  173. bool should_continue = true;
  174. while (should_continue) {
  175. ProcessQueue();
  176. LOCK(m_cs_callbacks_pending);
  177. should_continue = !m_callbacks_pending.empty();
  178. }
  179. }