You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

checkqueue.h 6.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (c) 2012 The Bitcoin developers
  2. // Distributed under the MIT/X11 software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #ifndef CHECKQUEUE_H
  5. #define CHECKQUEUE_H
  6. #include <algorithm>
  7. #include <vector>
  8. #include <boost/foreach.hpp>
  9. #include <boost/thread/condition_variable.hpp>
  10. #include <boost/thread/locks.hpp>
  11. #include <boost/thread/mutex.hpp>
  12. template<typename T> class CCheckQueueControl;
  13. /** Queue for verifications that have to be performed.
  14. * The verifications are represented by a type T, which must provide an
  15. * operator(), returning a bool.
  16. *
  17. * One thread (the master) is assumed to push batches of verifications
  18. * onto the queue, where they are processed by N-1 worker threads. When
  19. * the master is done adding work, it temporarily joins the worker pool
  20. * as an N'th worker, until all jobs are done.
  21. */
  22. template<typename T> class CCheckQueue {
  23. private:
  24. // Mutex to protect the inner state
  25. boost::mutex mutex;
  26. // Worker threads block on this when out of work
  27. boost::condition_variable condWorker;
  28. // Master thread blocks on this when out of work
  29. boost::condition_variable condMaster;
  30. // The queue of elements to be processed.
  31. // As the order of booleans doesn't matter, it is used as a LIFO (stack)
  32. std::vector<T> queue;
  33. // The number of workers (including the master) that are idle.
  34. int nIdle;
  35. // The total number of workers (including the master).
  36. int nTotal;
  37. // The temporary evaluation result.
  38. bool fAllOk;
  39. // Number of verifications that haven't completed yet.
  40. // This includes elements that are not anymore in queue, but still in
  41. // worker's own batches.
  42. unsigned int nTodo;
  43. // Whether we're shutting down.
  44. bool fQuit;
  45. // The maximum number of elements to be processed in one batch
  46. unsigned int nBatchSize;
  47. // Internal function that does bulk of the verification work.
  48. bool Loop(bool fMaster = false) {
  49. boost::condition_variable &cond = fMaster ? condMaster : condWorker;
  50. std::vector<T> vChecks;
  51. vChecks.reserve(nBatchSize);
  52. unsigned int nNow = 0;
  53. bool fOk = true;
  54. do {
  55. {
  56. boost::unique_lock<boost::mutex> lock(mutex);
  57. // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
  58. if (nNow) {
  59. fAllOk &= fOk;
  60. nTodo -= nNow;
  61. if (nTodo == 0 && !fMaster)
  62. // We processed the last element; inform the master he can exit and return the result
  63. condMaster.notify_one();
  64. } else {
  65. // first iteration
  66. nTotal++;
  67. }
  68. // logically, the do loop starts here
  69. while (queue.empty()) {
  70. if ((fMaster || fQuit) && nTodo == 0) {
  71. nTotal--;
  72. bool fRet = fAllOk;
  73. // reset the status for new work later
  74. if (fMaster)
  75. fAllOk = true;
  76. // return the current status
  77. return fRet;
  78. }
  79. nIdle++;
  80. cond.wait(lock); // wait
  81. nIdle--;
  82. }
  83. // Decide how many work units to process now.
  84. // * Do not try to do everything at once, but aim for increasingly smaller batches so
  85. // all workers finish approximately simultaneously.
  86. // * Try to account for idle jobs which will instantly start helping.
  87. // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
  88. nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
  89. vChecks.resize(nNow);
  90. for (unsigned int i = 0; i < nNow; i++) {
  91. // We want the lock on the mutex to be as short as possible, so swap jobs from the global
  92. // queue to the local batch vector instead of copying.
  93. vChecks[i].swap(queue.back());
  94. queue.pop_back();
  95. }
  96. // Check whether we need to do work at all
  97. fOk = fAllOk;
  98. }
  99. // execute work
  100. BOOST_FOREACH(T &check, vChecks)
  101. if (fOk)
  102. fOk = check();
  103. vChecks.clear();
  104. } while(true);
  105. }
  106. public:
  107. // Create a new check queue
  108. CCheckQueue(unsigned int nBatchSizeIn) :
  109. nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
  110. // Worker thread
  111. void Thread() {
  112. Loop();
  113. }
  114. // Wait until execution finishes, and return whether all evaluations where succesful.
  115. bool Wait() {
  116. return Loop(true);
  117. }
  118. // Add a batch of checks to the queue
  119. void Add(std::vector<T> &vChecks) {
  120. boost::unique_lock<boost::mutex> lock(mutex);
  121. BOOST_FOREACH(T &check, vChecks) {
  122. queue.push_back(T());
  123. check.swap(queue.back());
  124. }
  125. nTodo += vChecks.size();
  126. if (vChecks.size() == 1)
  127. condWorker.notify_one();
  128. else if (vChecks.size() > 1)
  129. condWorker.notify_all();
  130. }
  131. ~CCheckQueue() {
  132. }
  133. friend class CCheckQueueControl<T>;
  134. };
  135. /** RAII-style controller object for a CCheckQueue that guarantees the passed
  136. * queue is finished before continuing.
  137. */
  138. template<typename T> class CCheckQueueControl {
  139. private:
  140. CCheckQueue<T> *pqueue;
  141. bool fDone;
  142. public:
  143. CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) {
  144. // passed queue is supposed to be unused, or NULL
  145. if (pqueue != NULL) {
  146. assert(pqueue->nTotal == pqueue->nIdle);
  147. assert(pqueue->nTodo == 0);
  148. assert(pqueue->fAllOk == true);
  149. }
  150. }
  151. bool Wait() {
  152. if (pqueue == NULL)
  153. return true;
  154. bool fRet = pqueue->Wait();
  155. fDone = true;
  156. return fRet;
  157. }
  158. void Add(std::vector<T> &vChecks) {
  159. if (pqueue != NULL)
  160. pqueue->Add(vChecks);
  161. }
  162. ~CCheckQueueControl() {
  163. if (!fDone)
  164. Wait();
  165. }
  166. };
  167. #endif