Browse Source

Support more than one CScheduler thread for serial clients

This will be used by CValidationInterface soon.

This requires a bit of work as we need to ensure that most of our
callbacks happen in-order (to avoid synchronization issues in
wallet) - we keep our own internal queue and push things onto it,
scheduling a queue-draining function immediately upon new
callbacks.
tags/v0.15.1
Matt Corallo 4 years ago
parent
commit
08096bbbc6
4 changed files with 90 additions and 10 deletions
  1. 52
    0
      src/scheduler.cpp
  2. 24
    0
      src/scheduler.h
  3. 14
    8
      src/validationinterface.cpp
  4. 0
    2
      src/validationinterface.h

+ 52
- 0
src/scheduler.cpp View File

@@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}


void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
{
LOCK(m_cs_callbacks_pending);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
}

void SingleThreadedSchedulerClient::ProcessQueue() {
std::function<void (void)> callback;
{
LOCK(m_cs_callbacks_pending);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;

callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}

// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning() {
{
LOCK(instance->m_cs_callbacks_pending);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);

callback();
}

void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
assert(m_pscheduler);

{
LOCK(m_cs_callbacks_pending);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}

+ 24
- 0
src/scheduler.h View File

@@ -14,6 +14,8 @@
#include <boost/thread.hpp>
#include <map>

#include "sync.h"

//
// Simple class for background tasks that should be run
// periodically or once "after a while"
@@ -79,4 +81,26 @@ private:
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

/**
* Class used by CScheduler clients which may schedule multiple jobs
* which are required to be run serially. Does not require such jobs
* to be executed on the same thread, but no two jobs will be executed
* at the same time.
*/
class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;

CCriticalSection m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;

void MaybeScheduleProcessQueue();
void ProcessQueue();

public:
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);
};

#endif

+ 14
- 8
src/validationinterface.cpp View File

@@ -6,6 +6,11 @@
#include "validationinterface.h"
#include "init.h"
#include "scheduler.h"
#include "sync.h"
#include "util.h"

#include <list>
#include <atomic>

#include <boost/signals2/signal.hpp>

@@ -20,22 +25,23 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;

CScheduler *m_scheduler = NULL;
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;

MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};

static CMainSignals g_signals;

CMainSignals::CMainSignals() {
m_internals.reset(new MainSignalsInstance());
}

void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
assert(!m_internals->m_scheduler);
m_internals->m_scheduler = &scheduler;
assert(!m_internals);
m_internals.reset(new MainSignalsInstance(&scheduler));
}

void CMainSignals::UnregisterBackgroundSignalScheduler() {
m_internals->m_scheduler = NULL;
m_internals.reset(nullptr);
}

CMainSignals& GetMainSignals()

+ 0
- 2
src/validationinterface.h View File

@@ -75,8 +75,6 @@ private:
friend void ::UnregisterAllValidationInterfaces();

public:
CMainSignals();

/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */

Loading…
Cancel
Save