diff --git a/src/index/base.h b/src/index/base.h index 4131b06cad9..390c1a43cfc 100644 --- a/src/index/base.h +++ b/src/index/base.h @@ -18,10 +18,16 @@ class CBlock; class CBlockIndex; class Chainstate; class ChainstateManager; +class ThreadPool; namespace interfaces { class Chain; } // namespace interfaces +/** Maximum number of threads a single thread pool instance can have */ +static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100; +/** Number of concurrent jobs during the initial sync process */ +static constexpr int16_t INDEX_WORKERS_COUNT = 0; + struct IndexSummary { std::string name; bool synced{false}; @@ -80,6 +86,8 @@ private: std::thread m_thread_sync; CThreadInterrupt m_interrupt; + ThreadPool* m_thread_pool{nullptr}; + /// Write the current index state (eg. chain block locator and subclass-specific items) to disk. /// /// Recommendations for error handling: @@ -138,6 +146,8 @@ public: /// Get the name of the index for display in logs. const std::string& GetName() const LIFETIMEBOUND { return m_name; } + void SetThreadPool(ThreadPool& thread_pool) { m_thread_pool = &thread_pool; } + /// Blocks the current thread until the index is caught up to the current /// state of the block chain. This only blocks if the index has gotten in /// sync once and only needs to process blocks in the ValidationInterface @@ -164,6 +174,9 @@ public: /// Stops the instance from staying in sync with blockchain updates. void Stop(); + /// True if the child class allows concurrent sync. + virtual bool AllowParallelSync() { return false; } + /// Get a summary of the index and its state. IndexSummary GetSummary() const; }; diff --git a/src/init.cpp b/src/init.cpp index b6b52e2cea5..a8a28a045c6 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -87,6 +87,7 @@ #include #include #include +#include #include #include #include @@ -359,10 +360,12 @@ void Shutdown(NodeContext& node) // Stop and delete all indexes only after flushing background callbacks. for (auto* index : node.indexes) index->Stop(); + if (node.m_index_threads) node.m_index_threads->Stop(); if (g_txindex) g_txindex.reset(); if (g_coin_stats_index) g_coin_stats_index.reset(); DestroyAllBlockFilterIndexes(); node.indexes.clear(); // all instances are nullptr now + if (node.m_index_threads) node.m_index_threads.reset(); // Any future callbacks will be dropped. This should absolutely be safe - if // missing a callback results in an unrecoverable situation, unclean shutdown @@ -530,6 +533,8 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) strprintf("Maintain an index of compact filters by block (default: %s, values: %s).", DEFAULT_BLOCKFILTERINDEX, ListBlockFilterTypes()) + " If is not supplied or if = 1, indexes for all known types are enabled.", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS); + argsman.AddArg("-indexworkers=", strprintf("Number of worker threads spawned for the initial index synchronization (default: %d). These threads are shared across all indexes. " + "Improves indexing speed on fast storage but may slow indexing on HDDs due to additional disk seeks.", INDEX_WORKERS_COUNT), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS); argsman.AddArg("-addnode=", strprintf("Add a node to connect to and attempt to keep the connection open (see the addnode RPC help for more info). This option can be specified multiple times to add multiple nodes; connections are limited to %u at a time and are counted separately from the -maxconnections limit.", MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::CONNECTION); argsman.AddArg("-asmap=", strprintf("Specify asn mapping used for bucketing of the peers (default: %s). Relative paths will be prefixed by the net-specific datadir location.", DEFAULT_ASMAP_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -2176,6 +2181,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) bool StartIndexBackgroundSync(NodeContext& node) { + if (node.indexes.empty()) return true; + // Find the oldest block among all indexes. // This block is used to verify that we have the required blocks' data stored on disk, // starting from that point up to the current tip. @@ -2214,7 +2221,19 @@ bool StartIndexBackgroundSync(NodeContext& node) } } + if (node.args->IsArgSet("-indexworkers")) { + int index_workers = node.args->GetIntArg("-indexworkers", INDEX_WORKERS_COUNT); + if (index_workers < 0 || index_workers > MAX_INDEX_WORKERS_COUNT) return InitError(Untranslated(strprintf("Invalid -indexworkers arg. Must be a number between 0 and %d", MAX_INDEX_WORKERS_COUNT))); + + node.m_index_threads = std::make_unique("indexes"); + node.m_index_threads->Start(index_workers); + } + // Start threads - for (auto index : node.indexes) if (!index->StartBackgroundSync()) return false; + for (auto index : node.indexes) { + // Provide thread pool to indexes + if (node.m_index_threads && index->AllowParallelSync()) index->SetThreadPool(*node.m_index_threads); + if (!index->StartBackgroundSync()) return false; + } return true; } diff --git a/src/node/context.h b/src/node/context.h index debc1221206..e808f1950bc 100644 --- a/src/node/context.h +++ b/src/node/context.h @@ -5,6 +5,8 @@ #ifndef BITCOIN_NODE_CONTEXT_H #define BITCOIN_NODE_CONTEXT_H +#include + #include #include #include @@ -90,6 +92,7 @@ struct NodeContext { //! Manages all the node warnings std::unique_ptr warnings; std::thread background_init_thread; + std::unique_ptr m_index_threads; //! Declare default constructor and destructor that are not inline, so code //! instantiating the NodeContext struct doesn't need to #include class