init: provide thread pool to indexes

And add option to customize thread pool workers count
This commit is contained in:
furszy 2023-02-20 22:10:48 -03:00
parent 701bc9ee31
commit 30e59ce958
No known key found for this signature in database
GPG Key ID: 5DD23CCC686AA623
3 changed files with 36 additions and 1 deletions

View File

@ -18,10 +18,16 @@ class CBlock;
class CBlockIndex; class CBlockIndex;
class Chainstate; class Chainstate;
class ChainstateManager; class ChainstateManager;
class ThreadPool;
namespace interfaces { namespace interfaces {
class Chain; class Chain;
} // namespace interfaces } // namespace interfaces
/** Maximum number of threads a single thread pool instance can have */
static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100;
/** Number of concurrent jobs during the initial sync process */
static constexpr int16_t INDEX_WORKERS_COUNT = 0;
struct IndexSummary { struct IndexSummary {
std::string name; std::string name;
bool synced{false}; bool synced{false};
@ -80,6 +86,8 @@ private:
std::thread m_thread_sync; std::thread m_thread_sync;
CThreadInterrupt m_interrupt; CThreadInterrupt m_interrupt;
ThreadPool* m_thread_pool{nullptr};
/// Write the current index state (eg. chain block locator and subclass-specific items) to disk. /// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
/// ///
/// Recommendations for error handling: /// Recommendations for error handling:
@ -138,6 +146,8 @@ public:
/// Get the name of the index for display in logs. /// Get the name of the index for display in logs.
const std::string& GetName() const LIFETIMEBOUND { return m_name; } const std::string& GetName() const LIFETIMEBOUND { return m_name; }
void SetThreadPool(ThreadPool& thread_pool) { m_thread_pool = &thread_pool; }
/// Blocks the current thread until the index is caught up to the current /// Blocks the current thread until the index is caught up to the current
/// state of the block chain. This only blocks if the index has gotten in /// state of the block chain. This only blocks if the index has gotten in
/// sync once and only needs to process blocks in the ValidationInterface /// sync once and only needs to process blocks in the ValidationInterface
@ -164,6 +174,9 @@ public:
/// Stops the instance from staying in sync with blockchain updates. /// Stops the instance from staying in sync with blockchain updates.
void Stop(); void Stop();
/// True if the child class allows concurrent sync.
virtual bool AllowParallelSync() { return false; }
/// Get a summary of the index and its state. /// Get a summary of the index and its state.
IndexSummary GetSummary() const; IndexSummary GetSummary() const;
}; };

View File

@ -87,6 +87,7 @@
#include <util/syserror.h> #include <util/syserror.h>
#include <util/thread.h> #include <util/thread.h>
#include <util/threadnames.h> #include <util/threadnames.h>
#include <util/threadpool.h>
#include <util/time.h> #include <util/time.h>
#include <util/translation.h> #include <util/translation.h>
#include <validation.h> #include <validation.h>
@ -359,10 +360,12 @@ void Shutdown(NodeContext& node)
// Stop and delete all indexes only after flushing background callbacks. // Stop and delete all indexes only after flushing background callbacks.
for (auto* index : node.indexes) index->Stop(); for (auto* index : node.indexes) index->Stop();
if (node.m_index_threads) node.m_index_threads->Stop();
if (g_txindex) g_txindex.reset(); if (g_txindex) g_txindex.reset();
if (g_coin_stats_index) g_coin_stats_index.reset(); if (g_coin_stats_index) g_coin_stats_index.reset();
DestroyAllBlockFilterIndexes(); DestroyAllBlockFilterIndexes();
node.indexes.clear(); // all instances are nullptr now node.indexes.clear(); // all instances are nullptr now
if (node.m_index_threads) node.m_index_threads.reset();
// Any future callbacks will be dropped. This should absolutely be safe - if // Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown // missing a callback results in an unrecoverable situation, unclean shutdown
@ -530,6 +533,8 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc)
strprintf("Maintain an index of compact filters by block (default: %s, values: %s).", DEFAULT_BLOCKFILTERINDEX, ListBlockFilterTypes()) + strprintf("Maintain an index of compact filters by block (default: %s, values: %s).", DEFAULT_BLOCKFILTERINDEX, ListBlockFilterTypes()) +
" If <type> is not supplied or if <type> = 1, indexes for all known types are enabled.", " If <type> is not supplied or if <type> = 1, indexes for all known types are enabled.",
ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS); ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-indexworkers=<n>", strprintf("Number of worker threads spawned for the initial index synchronization (default: %d). These threads are shared across all indexes. "
"Improves indexing speed on fast storage but may slow indexing on HDDs due to additional disk seeks.", INDEX_WORKERS_COUNT), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-addnode=<ip>", strprintf("Add a node to connect to and attempt to keep the connection open (see the addnode RPC help for more info). This option can be specified multiple times to add multiple nodes; connections are limited to %u at a time and are counted separately from the -maxconnections limit.", MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::CONNECTION); argsman.AddArg("-addnode=<ip>", strprintf("Add a node to connect to and attempt to keep the connection open (see the addnode RPC help for more info). This option can be specified multiple times to add multiple nodes; connections are limited to %u at a time and are counted separately from the -maxconnections limit.", MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::CONNECTION);
argsman.AddArg("-asmap=<file>", strprintf("Specify asn mapping used for bucketing of the peers (default: %s). Relative paths will be prefixed by the net-specific datadir location.", DEFAULT_ASMAP_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-asmap=<file>", strprintf("Specify asn mapping used for bucketing of the peers (default: %s). Relative paths will be prefixed by the net-specific datadir location.", DEFAULT_ASMAP_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
@ -2176,6 +2181,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
bool StartIndexBackgroundSync(NodeContext& node) bool StartIndexBackgroundSync(NodeContext& node)
{ {
if (node.indexes.empty()) return true;
// Find the oldest block among all indexes. // Find the oldest block among all indexes.
// This block is used to verify that we have the required blocks' data stored on disk, // This block is used to verify that we have the required blocks' data stored on disk,
// starting from that point up to the current tip. // starting from that point up to the current tip.
@ -2214,7 +2221,19 @@ bool StartIndexBackgroundSync(NodeContext& node)
} }
} }
if (node.args->IsArgSet("-indexworkers")) {
int index_workers = node.args->GetIntArg("-indexworkers", INDEX_WORKERS_COUNT);
if (index_workers < 0 || index_workers > MAX_INDEX_WORKERS_COUNT) return InitError(Untranslated(strprintf("Invalid -indexworkers arg. Must be a number between 0 and %d", MAX_INDEX_WORKERS_COUNT)));
node.m_index_threads = std::make_unique<ThreadPool>("indexes");
node.m_index_threads->Start(index_workers);
}
// Start threads // Start threads
for (auto index : node.indexes) if (!index->StartBackgroundSync()) return false; for (auto index : node.indexes) {
// Provide thread pool to indexes
if (node.m_index_threads && index->AllowParallelSync()) index->SetThreadPool(*node.m_index_threads);
if (!index->StartBackgroundSync()) return false;
}
return true; return true;
} }

View File

@ -5,6 +5,8 @@
#ifndef BITCOIN_NODE_CONTEXT_H #ifndef BITCOIN_NODE_CONTEXT_H
#define BITCOIN_NODE_CONTEXT_H #define BITCOIN_NODE_CONTEXT_H
#include <util/threadpool.h>
#include <atomic> #include <atomic>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -90,6 +92,7 @@ struct NodeContext {
//! Manages all the node warnings //! Manages all the node warnings
std::unique_ptr<node::Warnings> warnings; std::unique_ptr<node::Warnings> warnings;
std::thread background_init_thread; std::thread background_init_thread;
std::unique_ptr<ThreadPool> m_index_threads;
//! Declare default constructor and destructor that are not inline, so code //! Declare default constructor and destructor that are not inline, so code
//! instantiating the NodeContext struct doesn't need to #include class //! instantiating the NodeContext struct doesn't need to #include class