You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

checkqueue.h 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright (c) 2012-2015 The Starwels developers
  2. // Distributed under the MIT software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #ifndef STARWELS_CHECKQUEUE_H
  5. #define STARWELS_CHECKQUEUE_H
  6. #include "sync.h"
  7. #include <algorithm>
  8. #include <vector>
  9. #include <boost/thread/condition_variable.hpp>
  10. #include <boost/thread/mutex.hpp>
  11. template <typename T>
  12. class CCheckQueueControl;
  13. /**
  14. * Queue for verifications that have to be performed.
  15. * The verifications are represented by a type T, which must provide an
  16. * operator(), returning a bool.
  17. *
  18. * One thread (the master) is assumed to push batches of verifications
  19. * onto the queue, where they are processed by N-1 worker threads. When
  20. * the master is done adding work, it temporarily joins the worker pool
  21. * as an N'th worker, until all jobs are done.
  22. */
  23. template <typename T>
  24. class CCheckQueue
  25. {
  26. private:
  27. //! Mutex to protect the inner state
  28. boost::mutex mutex;
  29. //! Worker threads block on this when out of work
  30. boost::condition_variable condWorker;
  31. //! Master thread blocks on this when out of work
  32. boost::condition_variable condMaster;
  33. //! The queue of elements to be processed.
  34. //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
  35. std::vector<T> queue;
  36. //! The number of workers (including the master) that are idle.
  37. int nIdle;
  38. //! The total number of workers (including the master).
  39. int nTotal;
  40. //! The temporary evaluation result.
  41. bool fAllOk;
  42. /**
  43. * Number of verifications that haven't completed yet.
  44. * This includes elements that are no longer queued, but still in the
  45. * worker's own batches.
  46. */
  47. unsigned int nTodo;
  48. //! Whether we're shutting down.
  49. bool fQuit;
  50. //! The maximum number of elements to be processed in one batch
  51. unsigned int nBatchSize;
  52. /** Internal function that does bulk of the verification work. */
  53. bool Loop(bool fMaster = false)
  54. {
  55. boost::condition_variable& cond = fMaster ? condMaster : condWorker;
  56. std::vector<T> vChecks;
  57. vChecks.reserve(nBatchSize);
  58. unsigned int nNow = 0;
  59. bool fOk = true;
  60. do {
  61. {
  62. boost::unique_lock<boost::mutex> lock(mutex);
  63. // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
  64. if (nNow) {
  65. fAllOk &= fOk;
  66. nTodo -= nNow;
  67. if (nTodo == 0 && !fMaster)
  68. // We processed the last element; inform the master it can exit and return the result
  69. condMaster.notify_one();
  70. } else {
  71. // first iteration
  72. nTotal++;
  73. }
  74. // logically, the do loop starts here
  75. while (queue.empty()) {
  76. if ((fMaster || fQuit) && nTodo == 0) {
  77. nTotal--;
  78. bool fRet = fAllOk;
  79. // reset the status for new work later
  80. if (fMaster)
  81. fAllOk = true;
  82. // return the current status
  83. return fRet;
  84. }
  85. nIdle++;
  86. cond.wait(lock); // wait
  87. nIdle--;
  88. }
  89. // Decide how many work units to process now.
  90. // * Do not try to do everything at once, but aim for increasingly smaller batches so
  91. // all workers finish approximately simultaneously.
  92. // * Try to account for idle jobs which will instantly start helping.
  93. // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
  94. nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
  95. vChecks.resize(nNow);
  96. for (unsigned int i = 0; i < nNow; i++) {
  97. // We want the lock on the mutex to be as short as possible, so swap jobs from the global
  98. // queue to the local batch vector instead of copying.
  99. vChecks[i].swap(queue.back());
  100. queue.pop_back();
  101. }
  102. // Check whether we need to do work at all
  103. fOk = fAllOk;
  104. }
  105. // execute work
  106. for (T& check : vChecks)
  107. if (fOk)
  108. fOk = check();
  109. vChecks.clear();
  110. } while (true);
  111. }
  112. public:
  113. //! Mutex to ensure only one concurrent CCheckQueueControl
  114. boost::mutex ControlMutex;
  115. //! Create a new check queue
  116. CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
  117. //! Worker thread
  118. void Thread()
  119. {
  120. Loop();
  121. }
  122. //! Wait until execution finishes, and return whether all evaluations were successful.
  123. bool Wait()
  124. {
  125. return Loop(true);
  126. }
  127. //! Add a batch of checks to the queue
  128. void Add(std::vector<T>& vChecks)
  129. {
  130. boost::unique_lock<boost::mutex> lock(mutex);
  131. for (T& check : vChecks) {
  132. queue.push_back(T());
  133. check.swap(queue.back());
  134. }
  135. nTodo += vChecks.size();
  136. if (vChecks.size() == 1)
  137. condWorker.notify_one();
  138. else if (vChecks.size() > 1)
  139. condWorker.notify_all();
  140. }
  141. ~CCheckQueue()
  142. {
  143. }
  144. };
  145. /**
  146. * RAII-style controller object for a CCheckQueue that guarantees the passed
  147. * queue is finished before continuing.
  148. */
  149. template <typename T>
  150. class CCheckQueueControl
  151. {
  152. private:
  153. CCheckQueue<T> * const pqueue;
  154. bool fDone;
  155. public:
  156. CCheckQueueControl() = delete;
  157. CCheckQueueControl(const CCheckQueueControl&) = delete;
  158. CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
  159. explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
  160. {
  161. // passed queue is supposed to be unused, or nullptr
  162. if (pqueue != nullptr) {
  163. ENTER_CRITICAL_SECTION(pqueue->ControlMutex);
  164. }
  165. }
  166. bool Wait()
  167. {
  168. if (pqueue == nullptr)
  169. return true;
  170. bool fRet = pqueue->Wait();
  171. fDone = true;
  172. return fRet;
  173. }
  174. void Add(std::vector<T>& vChecks)
  175. {
  176. if (pqueue != nullptr)
  177. pqueue->Add(vChecks);
  178. }
  179. ~CCheckQueueControl()
  180. {
  181. if (!fDone)
  182. Wait();
  183. if (pqueue != nullptr) {
  184. LEAVE_CRITICAL_SECTION(pqueue->ControlMutex);
  185. }
  186. }
  187. };
  188. #endif // STARWELS_CHECKQUEUE_H