Compare commits

...

5 Commits

Author SHA1 Message Date
furszy 3c4c3cbe09
txindex: enable parallel sync 2025-10-08 12:04:03 -04:00
furszy 84fa86574a
index: enable block filter index parallel sync
It also adds coverage for initial sync from a particular block.
Mimicking a node restart.
2025-10-08 12:04:03 -04:00
furszy 89ff1b6406
index: implement index parallel sync
This introduces parallel sync for the indexes initial sync,
distributing the workload across multiple threads.

When enabled, the chain is divided into fixed-size block ranges called
"tasks". Worker threads consume tasks concurrently, calling to the
CustomProcessBlock over their assigned range, storing results in a
shared context while opportunistically batching and dumping the collected
information to disk sequentially (when needed).

Since large reorgs are improbable during initial sync (headers-chain PoW
dictates the valid chain), reorgs are detected only before syncing begins
and once it completes. Any new blocks connected during the process are
caught up sequentially at the end. This, together with the fact that we
no longer depend on an intermediate "index best block" that might be out
of sync with the index m_best_block_index, allows us to remove the
index_reorg_crash test, as it is no longer possible to call Rewind on a
block that is not the index tip.
2025-10-08 12:04:03 -04:00
furszy 30e59ce958
init: provide thread pool to indexes
And add option to customize thread pool workers count
2025-10-08 12:04:02 -04:00
furszy 701bc9ee31
util: introduce general purpose thread pool 2025-10-08 12:04:02 -04:00
12 changed files with 915 additions and 146 deletions

View File

@ -13,6 +13,7 @@
#include <node/context.h>
#include <node/database_args.h>
#include <node/interface_ui.h>
#include <util/threadpool.h>
#include <tinyformat.h>
#include <undo.h>
#include <util/string.h>
@ -20,6 +21,7 @@
#include <util/translation.h>
#include <validation.h>
#include <algorithm>
#include <chrono>
#include <memory>
#include <optional>
@ -149,7 +151,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
return chain.Next(chain.FindFork(pindex_prev));
}
bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
std::any BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
{
interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data);
@ -158,7 +160,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
if (!m_chainstate->m_blockman.ReadBlock(block, *pindex)) {
FatalErrorf("Failed to read block %s from disk",
pindex->GetBlockHash().ToString());
return false;
return {};
}
block_info.data = &block;
}
@ -168,87 +170,292 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
if (pindex->nHeight > 0 && !m_chainstate->m_blockman.ReadBlockUndo(block_undo, *pindex)) {
FatalErrorf("Failed to read undo block data %s from disk",
pindex->GetBlockHash().ToString());
return false;
return {};
}
block_info.undo_data = &block_undo;
}
if (!CustomAppend(block_info)) {
FatalErrorf("Failed to write block %s to index database",
pindex->GetBlockHash().ToString());
return false;
const auto& any_obj = CustomProcessBlock(block_info);
if (!any_obj.has_value()) {
FatalErrorf("Failed to process block %s for index %s",
pindex->GetBlockHash().GetHex(), GetName());
return {};
}
return true;
return any_obj;
}
std::vector<std::any> BaseIndex::ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end)
{
std::vector<std::any> results;
if (process_in_order) {
// When ordering is required, collect all block indexes from [end..start] in order
std::vector<const CBlockIndex*> ordered_blocks;
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
ordered_blocks.emplace_back(block);
}
// And process blocks in forward order: from start to end
for (auto it = ordered_blocks.rbegin(); it != ordered_blocks.rend(); ++it) {
auto op_res = ProcessBlock(*it);
if (!op_res.has_value()) return {};
results.emplace_back(std::move(op_res));
}
return results;
}
// If ordering is not required, process blocks directly from end to start
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
auto op_res = ProcessBlock(block);
if (!op_res.has_value()) return {};
results.emplace_back(std::move(op_res));
}
return results;
}
struct Task {
int id;
const CBlockIndex* start_index;
const CBlockIndex* end_index;
std::vector<std::any> result;
Task(int task_id, const CBlockIndex* start, const CBlockIndex* end)
: id(task_id), start_index(start), end_index(end) {}
// Disallow copy
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task(Task&&) noexcept = default;
};
// Context shared across the initial sync workers
struct SyncContext {
Mutex mutex_pending_tasks;
std::queue<Task> pending_tasks GUARDED_BY(mutex_pending_tasks);
Mutex mutex_processed_tasks;
std::map<int, Task> processed_tasks GUARDED_BY(mutex_processed_tasks);
std::atomic<int> next_id_to_process{0};
};
// Synchronizes the index with the active chain.
//
// If parallel sync is enabled, this method uses WorkersCount()+1 threads (including the current thread)
// to process block ranges concurrently. Each worker handles up to 'm_blocks_per_worker' blocks each time
// (this is called a "task"), which are processed via CustomProcessBlock calls. Results are stored in the
// SyncContext's 'processed_tasks' map so they can be sequentially post-processed later.
//
// After completing a task, workers opportunistically post-process completed tasks *in order* using
// CustomPostProcessBlocks. This continues until all blocks have been fully processed and committed.
//
// Reorgs are detected and handled before syncing begins, ensuring the index starts aligned with the active chain.
void BaseIndex::Sync()
{
if (m_synced) return; // we are synced, nothing to do
// Before anything, verify we are in the active chain
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
auto last_log_time{NodeClock::now()};
auto last_locator_write_time{last_log_time};
while (true) {
if (m_interrupt) {
LogInfo("%s: m_interrupt set; exiting ThreadSync", GetName());
const int tip_height = WITH_LOCK(cs_main, return m_chainstate->m_chain.Height());
// Note: be careful, could return null if there is no more work to do or if pindex is not found (erased blocks dir).
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
if (!pindex_next) {
m_synced = true;
return;
}
SetBestBlockIndex(pindex);
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
return;
}
// Handle potential reorgs; if the next block's parent doesn't match our current tip,
// rewind our index state to match the chain and resume from there.
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
return;
}
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
// If pindex_next is null, it means pindex is the chain tip, so
// commit data indexed so far.
if (!pindex_next) {
SetBestBlockIndex(pindex);
// No need to handle errors in Commit. See rationale above.
Commit();
// Compute tasks ranges
const int blocks_to_sync = tip_height - pindex_next->nHeight;
const int num_tasks = blocks_to_sync / m_blocks_per_worker;
const int remaining_blocks = blocks_to_sync % m_blocks_per_worker;
const bool process_in_order = !AllowParallelSync();
// If pindex is still the chain tip after committing, exit the
// sync loop. It is important for cs_main to be locked while
// setting m_synced = true, otherwise a new block could be
// attached while m_synced is still false, and it would not be
// indexed.
LOCK(::cs_main);
pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain);
if (!pindex_next) {
m_synced = true;
break;
}
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("Failed to rewind %s to a previous chain tip", GetName());
return;
}
pindex = pindex_next;
std::shared_ptr<SyncContext> ctx = std::make_shared<SyncContext>();
{
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
// Create fixed-size tasks
const CBlockIndex* it_start = pindex;
const CBlockIndex* it_end;
for (int id = 0; id < num_tasks; ++id) {
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
it_end = m_chainstate->m_chain[it_start->nHeight + m_blocks_per_worker - 1];
ctx->pending_tasks.emplace(id, it_start, it_end);
it_start = it_end;
}
if (!ProcessBlock(pindex)) return; // error logged internally
auto current_time{NodeClock::now()};
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
LogInfo("Syncing %s with block chain from height %d", GetName(), pindex->nHeight);
last_log_time = current_time;
}
if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
SetBestBlockIndex(pindex);
last_locator_write_time = current_time;
// No need to handle errors in Commit. See rationale above.
Commit();
}
// Add final task with the remaining blocks, if any
if (remaining_blocks > 0) {
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
it_end = m_chainstate->m_chain[it_start->nHeight + remaining_blocks];
ctx->pending_tasks.emplace(/*task_id=*/num_tasks, it_start, it_end);
}
}
if (pindex) {
LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight);
} else {
LogInfo("%s is enabled", GetName());
// Returns nullopt only when there are no pending tasks
const auto& try_get_task = [](auto& ctx) -> std::optional<Task> {
LOCK(ctx->mutex_pending_tasks);
if (ctx->pending_tasks.empty()) return std::nullopt;
Task t = std::move(ctx->pending_tasks.front());
ctx->pending_tasks.pop();
return t;
};
enum class WorkerStatus { ABORT, PROCESSING, FINISHED };
const auto& func_worker = [this, try_get_task, process_in_order](auto& ctx) -> WorkerStatus {
if (m_interrupt) return WorkerStatus::FINISHED;
// Try to obtain a task and process it
if (std::optional<Task> maybe_task = try_get_task(ctx)) {
Task task = std::move(*maybe_task);
task.result = ProcessBlocks(process_in_order, task.start_index, task.end_index);
if (task.result.empty()) {
// Empty result indicates an internal error (logged internally).
m_interrupt(); // notify other workers and abort.
return WorkerStatus::ABORT;
}
LOCK(ctx->mutex_processed_tasks);
ctx->processed_tasks.emplace(task.id, std::move(task));
} else {
// No pending tasks — might be finished
// If we still have processed task to consume, proceed to finalize them.
LOCK(ctx->mutex_processed_tasks);
if (ctx->processed_tasks.empty()) return WorkerStatus::FINISHED;
}
// Post-process completed tasks opportunistically
std::vector<Task> to_process;
{
TRY_LOCK(ctx->mutex_processed_tasks, locked);
if (!locked) return WorkerStatus::PROCESSING;
// Collect ready-to-process tasks in order
int next_id = ctx->next_id_to_process.load();
while (true) {
auto it = ctx->processed_tasks.find(next_id);
if (it == ctx->processed_tasks.end()) break;
to_process.push_back(std::move(it->second));
ctx->processed_tasks.erase(it);
++next_id;
}
// Nothing to process right now, keep processing
if (to_process.empty()) return WorkerStatus::PROCESSING;
}
// Post-Process tasks
for (const Task& task : to_process) {
for (auto it = task.result.rbegin(); it != task.result.rend(); ++it) {
if (!CustomPostProcessBlocks(*it)) { // error logged internally
m_interrupt();
FatalErrorf("Index %s: Failed to post process blocks", GetName());
return WorkerStatus::ABORT;
}
}
// Update progress
SetBestBlockIndex(task.end_index);
ctx->next_id_to_process.store(task.id + 1);
}
// Check if there's anything left to do
LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks);
if (ctx->pending_tasks.empty() && ctx->processed_tasks.empty()) {
return WorkerStatus::FINISHED;
}
return WorkerStatus::PROCESSING;
};
// Process task in parallel if enabled
std::vector<std::future<void>> workers_job;
if (m_thread_pool) {
for (size_t i = 0; i < m_thread_pool->WorkersCount(); ++i) {
workers_job.emplace_back(m_thread_pool->Submit([this, ctx, func_worker]() {
WorkerStatus status{WorkerStatus::PROCESSING};
while (!m_synced && status == WorkerStatus::PROCESSING) {
status = func_worker(ctx);
if (m_interrupt) return;
}
}));
}
}
// Main index thread
// Active-wait: we process blocks in this thread too.
auto last_log_time{NodeClock::now()};
auto last_locator_write_time{last_log_time};
while (!m_synced) {
const WorkerStatus status{func_worker(ctx)};
if (m_interrupt || status == WorkerStatus::ABORT) {
m_interrupt();
// Ensure all workers are interrupted before returning.
// This avoids accessing any local variable post-destruction.
for (const auto& job : workers_job) job.wait();
return;
}
if (status == WorkerStatus::FINISHED) {
// No more tasks to process; wait for all workers to finish their current tasks
for (const auto& job : workers_job) job.wait();
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
// Before finishing, check if any new blocks were connected while we were syncing.
// If so, process them synchronously before exiting.
//
// Note: it is important for cs_main to be locked while setting m_synced = true,
// otherwise a new block could be attached while m_synced is still false, and
// it would not be indexed.
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
const CBlockIndex* curr_tip{m_best_block_index.load()};
pindex_next = NextSyncBlock(curr_tip, m_chainstate->m_chain);
// If the next block is null, it means we are done!
if (!pindex_next) {
m_synced = true;
break;
}
// New blocks arrived during sync.
// Handle potential reorgs; if the next block's parent doesn't match our tip,
// rewind index state to the correct chain, then resume.
if (pindex_next->pprev != curr_tip && !Rewind(curr_tip, pindex_next->pprev)) {
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
return;
}
// Queue the final range of blocks to process.
ctx->pending_tasks.emplace(ctx->next_id_to_process.load(),
/*start_index=*/pindex_next,
/*end_index=*/m_chainstate->m_chain.Tip());
}
auto current_time{NodeClock::now()};
// Log progress every so often
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
LogInfo("Syncing %s with block chain from height %d\n",
GetName(), m_best_block_index.load()->nHeight);
last_log_time = current_time;
}
// Commit changes every so often
if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
Commit(); // No need to handle errors in Commit. See rationale above.
last_locator_write_time = current_time;
}
}
LogInfo("%s is enabled at height %d\n", GetName(), (m_best_block_index) ? m_best_block_index.load()->nHeight : 0);
}
bool BaseIndex::Commit()
@ -273,6 +480,7 @@ bool BaseIndex::Commit()
bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
{
assert(current_tip == m_best_block_index);
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);
CBlock block;
@ -354,7 +562,7 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr<const
}
// Dispatch block to child class; errors are logged internally and abort the node.
if (ProcessBlock(pindex, block.get())) {
if (CustomPostProcessBlocks(ProcessBlock(pindex, block.get()))) {
// Setting the best block index is intentionally the last step of this
// function, so BlockUntilSyncedToCurrentChain callers waiting for the
// best block index to be updated can rely on the block being fully
@ -402,7 +610,7 @@ void BaseIndex::ChainStateFlushed(ChainstateRole role, const CBlockLocator& loca
return;
}
// No need to handle errors in Commit. If it fails, the error will be already be logged. The
// No need to handle errors in Commit. If it fails, the error will already be logged. The
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
// for an advanced index state.
Commit();

View File

@ -12,16 +12,25 @@
#include <util/threadinterrupt.h>
#include <validationinterface.h>
#include <any>
#include <string>
class CBlock;
class CBlockIndex;
class Chainstate;
class ChainstateManager;
class ThreadPool;
namespace interfaces {
class Chain;
} // namespace interfaces
/** Maximum number of threads a single thread pool instance can have */
static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100;
/** Number of concurrent jobs during the initial sync process */
static constexpr int16_t INDEX_WORKERS_COUNT = 0;
/** Number of tasks processed by each worker */
static constexpr int16_t INDEX_WORK_PER_CHUNK = 1000;
struct IndexSummary {
std::string name;
bool synced{false};
@ -80,6 +89,9 @@ private:
std::thread m_thread_sync;
CThreadInterrupt m_interrupt;
ThreadPool* m_thread_pool{nullptr};
int m_blocks_per_worker{INDEX_WORK_PER_CHUNK};
/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
///
/// Recommendations for error handling:
@ -93,7 +105,8 @@ private:
/// Loop over disconnected blocks and call CustomRemove.
bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);
bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::any ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::vector<std::any> ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end);
virtual bool AllowPrune() const = 0;
@ -130,6 +143,26 @@ protected:
/// Update the internal best block index as well as the prune lock.
void SetBestBlockIndex(const CBlockIndex* block);
/// If 'AllowParallelSync()' returns true, 'ProcessBlock()' will run concurrently in batches.
/// The 'std::any' result will be passed to 'CustomPostProcessBlocks()' so the index can process
/// async result batches in a synchronous fashion (if required).
[[nodiscard]] virtual std::any CustomProcessBlock(const interfaces::BlockInfo& block_info) {
// If parallel sync is enabled, the child class must implement this method.
if (AllowParallelSync()) return std::any();
// Default, synchronous write
if (!CustomAppend(block_info)) {
throw std::runtime_error(strprintf("%s: Failed to write block %s to index database",
__func__, block_info.hash.ToString()));
}
return true;
}
/// 'CustomPostProcessBlocks()' is called in a synchronous manner after a batch of async 'ProcessBlock()'
/// calls have completed.
/// Here the index usually links and dump information that cannot be processed in an asynchronous fashion.
[[nodiscard]] virtual bool CustomPostProcessBlocks(const std::any& obj) { return true; };
public:
BaseIndex(std::unique_ptr<interfaces::Chain> chain, std::string name);
/// Destructor interrupts sync thread if running and blocks until it exits.
@ -138,6 +171,8 @@ public:
/// Get the name of the index for display in logs.
const std::string& GetName() const LIFETIMEBOUND { return m_name; }
void SetThreadPool(ThreadPool& thread_pool) { m_thread_pool = &thread_pool; }
/// Blocks the current thread until the index is caught up to the current
/// state of the block chain. This only blocks if the index has gotten in
/// sync once and only needs to process blocks in the ValidationInterface
@ -164,6 +199,12 @@ public:
/// Stops the instance from staying in sync with blockchain updates.
void Stop();
/// Number of blocks each worker thread will process at a time
void SetBlocksPerWorker(int count) { m_blocks_per_worker = count; }
/// True if the child class allows concurrent sync.
virtual bool AllowParallelSync() { return false; }
/// Get a summary of the index and its state.
IndexSummary GetSummary() const;
};

View File

@ -299,6 +299,23 @@ bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, c
return true;
}
std::any BlockFilterIndex::CustomProcessBlock(const interfaces::BlockInfo& block_info)
{
return std::make_pair(BlockFilter(m_filter_type, *block_info.data, *block_info.undo_data), block_info.height);
}
bool BlockFilterIndex::CustomPostProcessBlocks(const std::any& obj)
{
const auto& [filter, height] = std::any_cast<std::pair<BlockFilter, int>>(obj);
const uint256& header = filter.ComputeHeader(m_last_header);
if (!Write(filter, height, header)) {
LogError("Error writing filters, shutting down block filters index\n");
return false;
}
m_last_header = header;
return true;
}
[[nodiscard]] static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch,
const std::string& index_name, int height)
{

View File

@ -64,6 +64,9 @@ protected:
BaseIndex::DB& GetDB() const LIFETIMEBOUND override { return *m_db; }
std::any CustomProcessBlock(const interfaces::BlockInfo& block) override;
bool CustomPostProcessBlocks(const std::any& obj) override;
public:
/** Constructs the index, which becomes available to be queried. */
explicit BlockFilterIndex(std::unique_ptr<interfaces::Chain> chain, BlockFilterType filter_type,
@ -71,6 +74,8 @@ public:
BlockFilterType GetFilterType() const { return m_filter_type; }
bool AllowParallelSync() override { return true; }
/** Get a single filter by block. */
bool LookupFilter(const CBlockIndex* block_index, BlockFilter& filter_out) const;

View File

@ -29,6 +29,10 @@ protected:
BaseIndex::DB& GetDB() const override;
std::any CustomProcessBlock(const interfaces::BlockInfo& block) override {
return CustomAppend(block);
}
public:
/// Constructs the index, which becomes available to be queried.
explicit TxIndex(std::unique_ptr<interfaces::Chain> chain, size_t n_cache_size, bool f_memory = false, bool f_wipe = false);
@ -36,6 +40,8 @@ public:
// Destructor is declared because this class contains a unique_ptr to an incomplete type.
virtual ~TxIndex() override;
bool AllowParallelSync() override { return true; }
/// Look up a transaction by hash.
///
/// @param[in] tx_hash The hash of the transaction to be returned.

View File

@ -87,6 +87,7 @@
#include <util/syserror.h>
#include <util/thread.h>
#include <util/threadnames.h>
#include <util/threadpool.h>
#include <util/time.h>
#include <util/translation.h>
#include <validation.h>
@ -359,10 +360,12 @@ void Shutdown(NodeContext& node)
// Stop and delete all indexes only after flushing background callbacks.
for (auto* index : node.indexes) index->Stop();
if (node.m_index_threads) node.m_index_threads->Stop();
if (g_txindex) g_txindex.reset();
if (g_coin_stats_index) g_coin_stats_index.reset();
DestroyAllBlockFilterIndexes();
node.indexes.clear(); // all instances are nullptr now
if (node.m_index_threads) node.m_index_threads.reset();
// Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown
@ -530,6 +533,8 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc)
strprintf("Maintain an index of compact filters by block (default: %s, values: %s).", DEFAULT_BLOCKFILTERINDEX, ListBlockFilterTypes()) +
" If <type> is not supplied or if <type> = 1, indexes for all known types are enabled.",
ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-indexworkers=<n>", strprintf("Number of worker threads spawned for the initial index synchronization (default: %d). These threads are shared across all indexes. "
"Improves indexing speed on fast storage but may slow indexing on HDDs due to additional disk seeks.", INDEX_WORKERS_COUNT), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-addnode=<ip>", strprintf("Add a node to connect to and attempt to keep the connection open (see the addnode RPC help for more info). This option can be specified multiple times to add multiple nodes; connections are limited to %u at a time and are counted separately from the -maxconnections limit.", MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::CONNECTION);
argsman.AddArg("-asmap=<file>", strprintf("Specify asn mapping used for bucketing of the peers (default: %s). Relative paths will be prefixed by the net-specific datadir location.", DEFAULT_ASMAP_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
@ -2176,6 +2181,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
bool StartIndexBackgroundSync(NodeContext& node)
{
if (node.indexes.empty()) return true;
// Find the oldest block among all indexes.
// This block is used to verify that we have the required blocks' data stored on disk,
// starting from that point up to the current tip.
@ -2214,7 +2221,19 @@ bool StartIndexBackgroundSync(NodeContext& node)
}
}
if (node.args->IsArgSet("-indexworkers")) {
int index_workers = node.args->GetIntArg("-indexworkers", INDEX_WORKERS_COUNT);
if (index_workers < 0 || index_workers > MAX_INDEX_WORKERS_COUNT) return InitError(Untranslated(strprintf("Invalid -indexworkers arg. Must be a number between 0 and %d", MAX_INDEX_WORKERS_COUNT)));
node.m_index_threads = std::make_unique<ThreadPool>("indexes");
node.m_index_threads->Start(index_workers);
}
// Start threads
for (auto index : node.indexes) if (!index->StartBackgroundSync()) return false;
for (auto index : node.indexes) {
// Provide thread pool to indexes
if (node.m_index_threads && index->AllowParallelSync()) index->SetThreadPool(*node.m_index_threads);
if (!index->StartBackgroundSync()) return false;
}
return true;
}

View File

@ -5,6 +5,8 @@
#ifndef BITCOIN_NODE_CONTEXT_H
#define BITCOIN_NODE_CONTEXT_H
#include <util/threadpool.h>
#include <atomic>
#include <cstdlib>
#include <functional>
@ -90,6 +92,7 @@ struct NodeContext {
//! Manages all the node warnings
std::unique_ptr<node::Warnings> warnings;
std::thread background_init_thread;
std::unique_ptr<ThreadPool> m_index_threads;
//! Declare default constructor and destructor that are not inline, so code
//! instantiating the NodeContext struct doesn't need to #include class

View File

@ -103,6 +103,7 @@ add_executable(test_bitcoin
sync_tests.cpp
system_tests.cpp
testnet4_miner_tests.cpp
threadpool_tests.cpp
timeoffsets_tests.cpp
torcontrol_tests.cpp
transaction_tests.cpp

View File

@ -13,6 +13,7 @@
#include <pow.h>
#include <test/util/blockfilter.h>
#include <test/util/setup_common.h>
#include <util/threadpool.h>
#include <validation.h>
#include <boost/test/unit_test.hpp>
@ -269,6 +270,50 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_initial_sync, BuildChainTestingSetup)
filter_index.Stop();
}
BOOST_FIXTURE_TEST_CASE(blockfilter_index_parallel_initial_sync, BuildChainTestingSetup)
{
int tip_height = 100; // pre-mined blocks
const uint16_t MINE_BLOCKS = 650;
for (int round = 0; round < 2; round++) { // two rounds to test sync from genesis and from a higher block
// Generate blocks
mineBlocks(MINE_BLOCKS);
const CBlockIndex* tip = WITH_LOCK(::cs_main, return m_node.chainman->ActiveChain().Tip());
BOOST_REQUIRE(tip->nHeight == MINE_BLOCKS + tip_height);
tip_height = tip->nHeight;
// Init index
BlockFilterIndex filter_index(interfaces::MakeChain(m_node), BlockFilterType::BASIC, 1 << 20, /*f_memory=*/false);
BOOST_REQUIRE(filter_index.Init());
ThreadPool thread_pool("blockfilter");
thread_pool.Start(2);
filter_index.SetThreadPool(thread_pool);
filter_index.SetBlocksPerWorker(200);
// Start sync
BOOST_CHECK(!filter_index.BlockUntilSyncedToCurrentChain());
filter_index.Sync();
const auto& summary{filter_index.GetSummary()};
BOOST_CHECK(summary.synced);
BOOST_CHECK_EQUAL(summary.best_block_height, tip_height);
// Check that filter index has all blocks that were in the chain before it started.
{
uint256 last_header;
LOCK(cs_main);
const CBlockIndex* block_index;
for (block_index = m_node.chainman->ActiveChain().Genesis();
block_index != nullptr;
block_index = m_node.chainman->ActiveChain().Next(block_index)) {
CheckFilterLookups(filter_index, block_index, last_header, m_node.chainman->m_blockman);
}
}
filter_index.Interrupt();
filter_index.Stop();
}
}
BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup)
{
BlockFilterIndex* filter_index;
@ -306,81 +351,4 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup)
BOOST_CHECK(filter_index == nullptr);
}
class IndexReorgCrash : public BaseIndex
{
private:
std::unique_ptr<BaseIndex::DB> m_db;
std::shared_future<void> m_blocker;
int m_blocking_height;
public:
explicit IndexReorgCrash(std::unique_ptr<interfaces::Chain> chain, std::shared_future<void> blocker,
int blocking_height) : BaseIndex(std::move(chain), "test index"), m_blocker(blocker),
m_blocking_height(blocking_height)
{
const fs::path path = gArgs.GetDataDirNet() / "index";
fs::create_directories(path);
m_db = std::make_unique<BaseIndex::DB>(path / "db", /*n_cache_size=*/0, /*f_memory=*/true, /*f_wipe=*/false);
}
bool AllowPrune() const override { return false; }
BaseIndex::DB& GetDB() const override { return *m_db; }
bool CustomAppend(const interfaces::BlockInfo& block) override
{
// Simulate a delay so new blocks can get connected during the initial sync
if (block.height == m_blocking_height) m_blocker.wait();
// Move mock time forward so the best index gets updated only when we are not at the blocking height
if (block.height == m_blocking_height - 1 || block.height > m_blocking_height) {
SetMockTime(GetTime<std::chrono::seconds>() + 31s);
}
return true;
}
};
BOOST_FIXTURE_TEST_CASE(index_reorg_crash, BuildChainTestingSetup)
{
// Enable mock time
SetMockTime(GetTime<std::chrono::minutes>());
std::promise<void> promise;
std::shared_future<void> blocker(promise.get_future());
int blocking_height = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->nHeight);
IndexReorgCrash index(interfaces::MakeChain(m_node), blocker, blocking_height);
BOOST_REQUIRE(index.Init());
BOOST_REQUIRE(index.StartBackgroundSync());
auto func_wait_until = [&](int height, std::chrono::milliseconds timeout) {
auto deadline = std::chrono::steady_clock::now() + timeout;
while (index.GetSummary().best_block_height < height) {
if (std::chrono::steady_clock::now() > deadline) {
BOOST_FAIL(strprintf("Timeout waiting for index height %d (current: %d)", height, index.GetSummary().best_block_height));
return;
}
std::this_thread::sleep_for(100ms);
}
};
// Wait until the index is one block before the fork point
func_wait_until(blocking_height - 1, /*timeout=*/5s);
// Create a fork to trigger the reorg
std::vector<std::shared_ptr<CBlock>> fork;
const CBlockIndex* prev_tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->pprev);
BOOST_REQUIRE(BuildChain(prev_tip, GetScriptForDestination(PKHash(GenerateRandomKey().GetPubKey())), 3, fork));
for (const auto& block : fork) {
BOOST_REQUIRE(m_node.chainman->ProcessNewBlock(block, /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
}
// Unblock the index thread so it can process the reorg
promise.set_value();
// Wait for the index to reach the new tip
func_wait_until(blocking_height + 2, 5s);
index.Stop();
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -0,0 +1,268 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <util/string.h>
#include <util/threadpool.h>
#include <boost/test/unit_test.hpp>
BOOST_AUTO_TEST_SUITE(threadpool_tests)
constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
template <typename T>
void WaitFor(std::vector<std::future<T>>& futures, const std::string& context)
{
for (size_t i = 0; i < futures.size(); ++i) {
if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
}
}
}
BOOST_AUTO_TEST_CASE(threadpool_basic)
{
// Test Cases
// 1) Submit tasks and verify completion.
// 2) Maintain all threads busy except one.
// 3) Wait for work to finish.
// 4) Wait for result object.
// 5) The task throws an exception, catch must be done in the consumer side.
// 6) Busy workers, help them by processing tasks from outside.
const int NUM_WORKERS_DEFAULT = 3;
const std::string POOL_NAME = "test";
// Test case 1, submit tasks and verify completion.
{
int num_tasks = 50;
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic<int> counter = 0;
// Store futures to ensure completion before checking counter.
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 1; i <= num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter, i]() {
counter.fetch_add(i);
}));
}
// Wait for all tasks to finish
WaitFor(futures, /*context=*/"test1 task");
int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
BOOST_CHECK_EQUAL(counter.load(), expected_value);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
}
// Test case 2, maintain all threads busy except one.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
// Single blocking future for all threads
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
// Use per-thread ready promises to ensure all blocked threads have started
std::vector<std::promise<void>> ready_promises(NUM_WORKERS_DEFAULT - 1);
std::vector<std::future<void>> ready_futures;
ready_futures.reserve(NUM_WORKERS_DEFAULT - 1);
for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
// Submit blocking task to all threads except one
std::vector<std::future<void>> blocking_tasks;
blocking_tasks.reserve(NUM_WORKERS_DEFAULT - 1);
for (int i = 0; i < NUM_WORKERS_DEFAULT - 1; i++) {
std::promise<void>& ready = ready_promises[i];
blocking_tasks.emplace_back(threadPool.Submit([&ready, blocker_future]() {
ready.set_value();
blocker_future.wait();
}));
}
// Wait until all blocked threads are actually blocked
WaitFor(ready_futures, /*context=*/"test2 blocking tasks enabled");
// Now execute tasks on the single available worker
// and check that all the tasks are executed.
int num_tasks = 15;
std::atomic<int> counter = 0;
// Store futures to wait on
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter]() {
counter.fetch_add(1);
}));
}
WaitFor(futures, /*context=*/"test2 tasks");
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
blocker.set_value();
WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
threadPool.Stop();
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
// Test case 3, wait for work to finish.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic<bool> flag = false;
std::future<void> future = threadPool.Submit([&flag]() {
std::this_thread::sleep_for(std::chrono::milliseconds{200});
flag.store(true);
});
future.wait();
BOOST_CHECK(flag.load());
}
// Test case 4, obtain result object.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::future<bool> future_bool = threadPool.Submit([]() {
return true;
});
BOOST_CHECK(future_bool.get());
std::future<std::string> future_str = threadPool.Submit([]() {
return std::string("true");
});
std::string result = future_str.get();
BOOST_CHECK_EQUAL(result, "true");
}
// Test case 5, throw exception and catch it on the consumer side.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
int ROUNDS = 5;
std::string err_msg{"something wrong happened"};
std::vector<std::future<void>> futures;
futures.reserve(ROUNDS);
for (int i = 0; i < ROUNDS; i++) {
futures.emplace_back(threadPool.Submit([err_msg, i]() {
throw std::runtime_error(err_msg + util::ToString(i));
}));
}
for (int i = 0; i < ROUNDS; i++) {
try {
futures.at(i).get();
BOOST_FAIL("Expected exception not thrown");
} catch (const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
}
}
}
// Test case 6, all workers are busy, help them by processing tasks from outside.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
// Submit blocking task
for (int i = 0; i < NUM_WORKERS_DEFAULT; i++) {
threadPool.Submit([blocker_future]() {
blocker_future.wait();
});
}
// Now submit tasks and check that none of them are executed.
int num_tasks = 20;
std::atomic<int> counter = 0;
for (int i = 0; i < num_tasks; i++) {
threadPool.Submit([&counter]() {
counter.fetch_add(1);
});
}
std::this_thread::sleep_for(std::chrono::milliseconds{100});
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
// Now process manually
for (int i = 0; i < num_tasks; i++) {
threadPool.ProcessTask();
}
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
blocker.set_value();
threadPool.Stop();
}
// Test case 7, recursive submission of tasks.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> signal;
threadPool.Submit([&]() {
threadPool.Submit([&]() {
signal.set_value();
});
});
signal.get_future().wait();
threadPool.Stop();
}
// Test case 8, submit a task when all threads are busy and then stop the pool.
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
// Per-thread ready promises to ensure all workers are actually blocked
std::vector<std::promise<void>> ready_promises(NUM_WORKERS_DEFAULT);
std::vector<std::future<void>> ready_futures;
ready_futures.reserve(NUM_WORKERS_DEFAULT);
for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
// Fill all workers with blocking tasks
for (int i = 0; i < NUM_WORKERS_DEFAULT; i++) {
std::promise<void>& ready = ready_promises[i];
threadPool.Submit([blocker_future, &ready]() {
ready.set_value();
blocker_future.wait();
});
}
// Wait until all threads are actually blocked
WaitFor(ready_futures, /*context=*/"test8 blocking tasks enabled");
// Submit an extra task that should execute once a worker is free
std::future<bool> future = threadPool.Submit([]() { return true; });
// At this point, all workers are blocked, and the extra task is queued
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
// Wait a short moment before unblocking the threads to mimic a concurrent shutdown
std::thread thread_unblocker([&blocker]() {
std::this_thread::sleep_for(std::chrono::milliseconds{300});
blocker.set_value();
});
// Stop the pool while the workers are still blocked
threadPool.Stop();
// Expect the submitted task to complete
BOOST_CHECK(future.get());
thread_unblocker.join();
// Pool should be stopped and no workers remaining
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -7,6 +7,7 @@
#include <index/txindex.h>
#include <interfaces/chain.h>
#include <test/util/setup_common.h>
#include <util/threadpool.h>
#include <validation.h>
#include <boost/test/unit_test.hpp>
@ -73,4 +74,45 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
txindex.Stop();
}
BOOST_FIXTURE_TEST_CASE(txindex_parallel_initial_sync, TestChain100Setup)
{
int tip_height = 100; // pre-mined blocks
const uint16_t MINE_BLOCKS = 650;
for (int round = 0; round < 2; round++) { // two rounds to test sync from genesis and from a higher block
// Generate blocks
mineBlocks(MINE_BLOCKS);
const CBlockIndex* tip = WITH_LOCK(::cs_main, return m_node.chainman->ActiveChain().Tip());
BOOST_REQUIRE(tip->nHeight == MINE_BLOCKS + tip_height);
tip_height = tip->nHeight;
// Init and start index
TxIndex txindex(interfaces::MakeChain(m_node), 1 << 20, /*f_memory=*/false);
BOOST_REQUIRE(txindex.Init());
ThreadPool thread_pool("txindex");
thread_pool.Start(2);
txindex.SetThreadPool(thread_pool);
txindex.SetBlocksPerWorker(200);
BOOST_CHECK(!txindex.BlockUntilSyncedToCurrentChain());
txindex.Sync();
const auto& summary{txindex.GetSummary()};
BOOST_CHECK(summary.synced);
BOOST_CHECK_EQUAL(summary.best_block_height, tip_height);
// Check that txindex has all txs that were in the chain before it started.
CTransactionRef tx_disk;
uint256 block_hash;
for (const auto& txn : m_coinbase_txns) {
if (!txindex.FindTx(txn->GetHash(), block_hash, tx_disk)) {
BOOST_ERROR("FindTx failed");
} else if (tx_disk->GetHash() != txn->GetHash()) {
BOOST_ERROR("Read incorrect tx");
}
}
txindex.Interrupt();
txindex.Stop();
}
}
BOOST_AUTO_TEST_SUITE_END()

191
src/util/threadpool.h Normal file
View File

@ -0,0 +1,191 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_THREADPOOL_H
#define BITCOIN_UTIL_THREADPOOL_H
#include <sync.h>
#include <util/string.h>
#include <util/thread.h>
#include <util/threadinterrupt.h>
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <future>
#include <memory>
#include <stdexcept>
#include <utility>
#include <queue>
#include <thread>
#include <vector>
/**
* @brief Fixed-size thread pool for running arbitrary tasks concurrently.
*
* The thread pool maintains a set of worker threads that consume and execute
* tasks submitted through Submit(). Once started, tasks can be queued and
* processed asynchronously until Stop() is called.
*
* ### Thread-safety and lifecycle
* - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
* Calling `Stop()` from a worker thread will deadlock, as it waits for all
* workers to join, including the current one.
*
* - `Submit()` can be called from any thread, including workers. It safely
* enqueues new work for execution as long as the pool has active workers.
*
* - `Stop()` prevents further task submission and wakes all worker threads.
* Workers finish processing all remaining queued tasks before exiting,
* guaranteeing that no caller waits forever on a pending future.
*/
class ThreadPool {
private:
std::string m_name;
Mutex m_mutex;
std::queue<std::function<void()>> m_work_queue GUARDED_BY(m_mutex);
std::condition_variable m_cv;
// Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable.
// This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
// Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
bool m_interrupt GUARDED_BY(m_mutex){false};
std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
WAIT_LOCK(m_mutex, wait_lock);
for (;;) {
std::function<void()> task;
{
// Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
if (!m_interrupt && m_work_queue.empty()) {
// Block until the pool is interrupted or a task is available.
m_cv.wait(wait_lock,[&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
}
// If stopped and no work left, exit worker
if (m_interrupt && m_work_queue.empty()) {
return;
}
task = std::move(m_work_queue.front());
m_work_queue.pop();
}
{
// Execute the task without the lock
REVERSE_LOCK(wait_lock, m_mutex);
task();
}
}
}
public:
explicit ThreadPool(const std::string& name) : m_name(name) {}
~ThreadPool()
{
Stop(); // In case it hasn't been stopped.
}
/**
* @brief Start worker threads.
*
* Creates and launches `num_workers` threads that begin executing tasks
* from the queue. If the pool is already started, throws.
*
* Must be called from a controller (non-worker) thread.
*/
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
m_interrupt = false; // Reset
// Create workers
for (int i = 0; i < num_workers; i++) {
m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
}
}
/**
* @brief Stop all worker threads and wait for them to exit.
*
* Sets the interrupt flag, wakes all waiting workers, and joins them.
* Any remaining tasks in the queue will be processed before returning.
*
* Must be called from a controller (non-worker) thread.
*/
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
// Notify workers and join them.
std::vector<std::thread> threads_to_join;
{
LOCK(m_mutex);
m_interrupt = true;
threads_to_join.swap(m_workers);
}
m_cv.notify_all();
for (auto& worker : threads_to_join) worker.join();
// m_interrupt is left true until next Start()
}
/**
* @brief Submit a new task for asynchronous execution.
*
* Enqueues a callable to be executed by one of the worker threads.
* Returns a `std::future` that can be used to retrieve the tasks result.
*/
template<class T> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
auto Submit(T task) -> std::future<decltype(task())>
{
using TaskType = std::packaged_task<decltype(task())()>;
auto ptr_task = std::make_shared<TaskType>(std::move(task));
std::future<decltype(task())> future = ptr_task->get_future();
{
LOCK(m_mutex);
if (m_workers.empty() || m_interrupt) {
throw std::runtime_error("No active workers; cannot accept new tasks");
}
m_work_queue.emplace([ptr_task]() {
(*ptr_task)();
});
}
m_cv.notify_one();
return future;
}
/**
* @brief Execute a single queued task synchronously.
* Removes one task from the queue and executes it on the calling thread.
*/
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
std::function<void()> task;
{
LOCK(m_mutex);
if (m_work_queue.empty()) return;
// Pop the task
task = std::move(m_work_queue.front());
m_work_queue.pop();
}
task();
}
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
return WITH_LOCK(m_mutex, return m_work_queue.size());
}
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
return WITH_LOCK(m_mutex, return m_workers.size());
}
};
#endif // BITCOIN_UTIL_THREADPOOL_H