From 89ff1b6406953d08a1ef7fb0241866ac60107980 Mon Sep 17 00:00:00 2001 From: furszy Date: Mon, 20 Feb 2023 20:17:50 -0300 Subject: [PATCH] index: implement index parallel sync This introduces parallel sync for the indexes initial sync, distributing the workload across multiple threads. When enabled, the chain is divided into fixed-size block ranges called "tasks". Worker threads consume tasks concurrently, calling to the CustomProcessBlock over their assigned range, storing results in a shared context while opportunistically batching and dumping the collected information to disk sequentially (when needed). Since large reorgs are improbable during initial sync (headers-chain PoW dictates the valid chain), reorgs are detected only before syncing begins and once it completes. Any new blocks connected during the process are caught up sequentially at the end. This, together with the fact that we no longer depend on an intermediate "index best block" that might be out of sync with the index m_best_block_index, allows us to remove the index_reorg_crash test, as it is no longer possible to call Rewind on a block that is not the index tip. --- src/index/base.cpp | 342 +++++++++++++++++++++------ src/index/base.h | 30 ++- src/test/blockfilter_index_tests.cpp | 77 ------ 3 files changed, 304 insertions(+), 145 deletions(-) diff --git a/src/index/base.cpp b/src/index/base.cpp index 82259ac046a..f34d47b0b1e 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include #include #include #include @@ -149,7 +151,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain& return chain.Next(chain.FindFork(pindex_prev)); } -bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data) +std::any BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data) { interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data); @@ -158,7 +160,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data if (!m_chainstate->m_blockman.ReadBlock(block, *pindex)) { FatalErrorf("Failed to read block %s from disk", pindex->GetBlockHash().ToString()); - return false; + return {}; } block_info.data = █ } @@ -168,87 +170,292 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data if (pindex->nHeight > 0 && !m_chainstate->m_blockman.ReadBlockUndo(block_undo, *pindex)) { FatalErrorf("Failed to read undo block data %s from disk", pindex->GetBlockHash().ToString()); - return false; + return {}; } block_info.undo_data = &block_undo; } - if (!CustomAppend(block_info)) { - FatalErrorf("Failed to write block %s to index database", - pindex->GetBlockHash().ToString()); - return false; + const auto& any_obj = CustomProcessBlock(block_info); + if (!any_obj.has_value()) { + FatalErrorf("Failed to process block %s for index %s", + pindex->GetBlockHash().GetHex(), GetName()); + return {}; } - - return true; + return any_obj; } +std::vector BaseIndex::ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end) +{ + std::vector results; + if (process_in_order) { + // When ordering is required, collect all block indexes from [end..start] in order + std::vector ordered_blocks; + for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) { + ordered_blocks.emplace_back(block); + } + + // And process blocks in forward order: from start to end + for (auto it = ordered_blocks.rbegin(); it != ordered_blocks.rend(); ++it) { + auto op_res = ProcessBlock(*it); + if (!op_res.has_value()) return {}; + results.emplace_back(std::move(op_res)); + } + return results; + } + + // If ordering is not required, process blocks directly from end to start + for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) { + auto op_res = ProcessBlock(block); + if (!op_res.has_value()) return {}; + results.emplace_back(std::move(op_res)); + } + + return results; +} + +struct Task { + int id; + const CBlockIndex* start_index; + const CBlockIndex* end_index; + std::vector result; + + Task(int task_id, const CBlockIndex* start, const CBlockIndex* end) + : id(task_id), start_index(start), end_index(end) {} + + // Disallow copy + Task(const Task&) = delete; + Task& operator=(const Task&) = delete; + Task(Task&&) noexcept = default; +}; + +// Context shared across the initial sync workers +struct SyncContext { + Mutex mutex_pending_tasks; + std::queue pending_tasks GUARDED_BY(mutex_pending_tasks); + + Mutex mutex_processed_tasks; + std::map processed_tasks GUARDED_BY(mutex_processed_tasks); + + std::atomic next_id_to_process{0}; +}; + +// Synchronizes the index with the active chain. +// +// If parallel sync is enabled, this method uses WorkersCount()+1 threads (including the current thread) +// to process block ranges concurrently. Each worker handles up to 'm_blocks_per_worker' blocks each time +// (this is called a "task"), which are processed via CustomProcessBlock calls. Results are stored in the +// SyncContext's 'processed_tasks' map so they can be sequentially post-processed later. +// +// After completing a task, workers opportunistically post-process completed tasks *in order* using +// CustomPostProcessBlocks. This continues until all blocks have been fully processed and committed. +// +// Reorgs are detected and handled before syncing begins, ensuring the index starts aligned with the active chain. void BaseIndex::Sync() { + if (m_synced) return; // we are synced, nothing to do + + // Before anything, verify we are in the active chain const CBlockIndex* pindex = m_best_block_index.load(); - if (!m_synced) { - auto last_log_time{NodeClock::now()}; - auto last_locator_write_time{last_log_time}; - while (true) { - if (m_interrupt) { - LogInfo("%s: m_interrupt set; exiting ThreadSync", GetName()); + const int tip_height = WITH_LOCK(cs_main, return m_chainstate->m_chain.Height()); + // Note: be careful, could return null if there is no more work to do or if pindex is not found (erased blocks dir). + const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain)); + if (!pindex_next) { + m_synced = true; + return; + } - SetBestBlockIndex(pindex); - // No need to handle errors in Commit. If it fails, the error will be already be - // logged. The best way to recover is to continue, as index cannot be corrupted by - // a missed commit to disk for an advanced index state. - Commit(); - return; - } + // Handle potential reorgs; if the next block's parent doesn't match our current tip, + // rewind our index state to match the chain and resume from there. + if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) { + FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName()); + return; + } - const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain)); - // If pindex_next is null, it means pindex is the chain tip, so - // commit data indexed so far. - if (!pindex_next) { - SetBestBlockIndex(pindex); - // No need to handle errors in Commit. See rationale above. - Commit(); + // Compute tasks ranges + const int blocks_to_sync = tip_height - pindex_next->nHeight; + const int num_tasks = blocks_to_sync / m_blocks_per_worker; + const int remaining_blocks = blocks_to_sync % m_blocks_per_worker; + const bool process_in_order = !AllowParallelSync(); - // If pindex is still the chain tip after committing, exit the - // sync loop. It is important for cs_main to be locked while - // setting m_synced = true, otherwise a new block could be - // attached while m_synced is still false, and it would not be - // indexed. - LOCK(::cs_main); - pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain); - if (!pindex_next) { - m_synced = true; - break; - } - } - if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) { - FatalErrorf("Failed to rewind %s to a previous chain tip", GetName()); - return; - } - pindex = pindex_next; + std::shared_ptr ctx = std::make_shared(); + { + LOCK2(ctx->mutex_pending_tasks, ::cs_main); + // Create fixed-size tasks + const CBlockIndex* it_start = pindex; + const CBlockIndex* it_end; + for (int id = 0; id < num_tasks; ++id) { + it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain)); + it_end = m_chainstate->m_chain[it_start->nHeight + m_blocks_per_worker - 1]; + ctx->pending_tasks.emplace(id, it_start, it_end); + it_start = it_end; + } - - if (!ProcessBlock(pindex)) return; // error logged internally - - auto current_time{NodeClock::now()}; - if (current_time - last_log_time >= SYNC_LOG_INTERVAL) { - LogInfo("Syncing %s with block chain from height %d", GetName(), pindex->nHeight); - last_log_time = current_time; - } - - if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) { - SetBestBlockIndex(pindex); - last_locator_write_time = current_time; - // No need to handle errors in Commit. See rationale above. - Commit(); - } + // Add final task with the remaining blocks, if any + if (remaining_blocks > 0) { + it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain)); + it_end = m_chainstate->m_chain[it_start->nHeight + remaining_blocks]; + ctx->pending_tasks.emplace(/*task_id=*/num_tasks, it_start, it_end); } } - if (pindex) { - LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight); - } else { - LogInfo("%s is enabled", GetName()); + // Returns nullopt only when there are no pending tasks + const auto& try_get_task = [](auto& ctx) -> std::optional { + LOCK(ctx->mutex_pending_tasks); + if (ctx->pending_tasks.empty()) return std::nullopt; + Task t = std::move(ctx->pending_tasks.front()); + ctx->pending_tasks.pop(); + return t; + }; + + enum class WorkerStatus { ABORT, PROCESSING, FINISHED }; + + const auto& func_worker = [this, try_get_task, process_in_order](auto& ctx) -> WorkerStatus { + if (m_interrupt) return WorkerStatus::FINISHED; + + // Try to obtain a task and process it + if (std::optional maybe_task = try_get_task(ctx)) { + Task task = std::move(*maybe_task); + task.result = ProcessBlocks(process_in_order, task.start_index, task.end_index); + if (task.result.empty()) { + // Empty result indicates an internal error (logged internally). + m_interrupt(); // notify other workers and abort. + return WorkerStatus::ABORT; + } + + LOCK(ctx->mutex_processed_tasks); + ctx->processed_tasks.emplace(task.id, std::move(task)); + } else { + // No pending tasks — might be finished + // If we still have processed task to consume, proceed to finalize them. + LOCK(ctx->mutex_processed_tasks); + if (ctx->processed_tasks.empty()) return WorkerStatus::FINISHED; + } + + // Post-process completed tasks opportunistically + std::vector to_process; + { + TRY_LOCK(ctx->mutex_processed_tasks, locked); + if (!locked) return WorkerStatus::PROCESSING; + + // Collect ready-to-process tasks in order + int next_id = ctx->next_id_to_process.load(); + while (true) { + auto it = ctx->processed_tasks.find(next_id); + if (it == ctx->processed_tasks.end()) break; + to_process.push_back(std::move(it->second)); + ctx->processed_tasks.erase(it); + ++next_id; + } + + // Nothing to process right now, keep processing + if (to_process.empty()) return WorkerStatus::PROCESSING; + } + + // Post-Process tasks + for (const Task& task : to_process) { + for (auto it = task.result.rbegin(); it != task.result.rend(); ++it) { + if (!CustomPostProcessBlocks(*it)) { // error logged internally + m_interrupt(); + FatalErrorf("Index %s: Failed to post process blocks", GetName()); + return WorkerStatus::ABORT; + } + } + // Update progress + SetBestBlockIndex(task.end_index); + ctx->next_id_to_process.store(task.id + 1); + } + + // Check if there's anything left to do + LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks); + if (ctx->pending_tasks.empty() && ctx->processed_tasks.empty()) { + return WorkerStatus::FINISHED; + } + + return WorkerStatus::PROCESSING; + }; + + // Process task in parallel if enabled + std::vector> workers_job; + if (m_thread_pool) { + for (size_t i = 0; i < m_thread_pool->WorkersCount(); ++i) { + workers_job.emplace_back(m_thread_pool->Submit([this, ctx, func_worker]() { + WorkerStatus status{WorkerStatus::PROCESSING}; + while (!m_synced && status == WorkerStatus::PROCESSING) { + status = func_worker(ctx); + if (m_interrupt) return; + } + })); + } } + + // Main index thread + // Active-wait: we process blocks in this thread too. + auto last_log_time{NodeClock::now()}; + auto last_locator_write_time{last_log_time}; + + while (!m_synced) { + const WorkerStatus status{func_worker(ctx)}; + if (m_interrupt || status == WorkerStatus::ABORT) { + m_interrupt(); + // Ensure all workers are interrupted before returning. + // This avoids accessing any local variable post-destruction. + for (const auto& job : workers_job) job.wait(); + return; + } + + if (status == WorkerStatus::FINISHED) { + // No more tasks to process; wait for all workers to finish their current tasks + for (const auto& job : workers_job) job.wait(); + // No need to handle errors in Commit. If it fails, the error will be already be + // logged. The best way to recover is to continue, as index cannot be corrupted by + // a missed commit to disk for an advanced index state. + Commit(); + + // Before finishing, check if any new blocks were connected while we were syncing. + // If so, process them synchronously before exiting. + // + // Note: it is important for cs_main to be locked while setting m_synced = true, + // otherwise a new block could be attached while m_synced is still false, and + // it would not be indexed. + LOCK2(ctx->mutex_pending_tasks, ::cs_main); + const CBlockIndex* curr_tip{m_best_block_index.load()}; + pindex_next = NextSyncBlock(curr_tip, m_chainstate->m_chain); + // If the next block is null, it means we are done! + if (!pindex_next) { + m_synced = true; + break; + } + + // New blocks arrived during sync. + // Handle potential reorgs; if the next block's parent doesn't match our tip, + // rewind index state to the correct chain, then resume. + if (pindex_next->pprev != curr_tip && !Rewind(curr_tip, pindex_next->pprev)) { + FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName()); + return; + } + + // Queue the final range of blocks to process. + ctx->pending_tasks.emplace(ctx->next_id_to_process.load(), + /*start_index=*/pindex_next, + /*end_index=*/m_chainstate->m_chain.Tip()); + } + + auto current_time{NodeClock::now()}; + // Log progress every so often + if (current_time - last_log_time >= SYNC_LOG_INTERVAL) { + LogInfo("Syncing %s with block chain from height %d\n", + GetName(), m_best_block_index.load()->nHeight); + last_log_time = current_time; + } + + // Commit changes every so often + if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) { + Commit(); // No need to handle errors in Commit. See rationale above. + last_locator_write_time = current_time; + } + } + + LogInfo("%s is enabled at height %d\n", GetName(), (m_best_block_index) ? m_best_block_index.load()->nHeight : 0); } bool BaseIndex::Commit() @@ -273,6 +480,7 @@ bool BaseIndex::Commit() bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) { + assert(current_tip == m_best_block_index); assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip); CBlock block; @@ -354,7 +562,7 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr #include +#include #include class CBlock; @@ -27,6 +28,8 @@ class Chain; static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100; /** Number of concurrent jobs during the initial sync process */ static constexpr int16_t INDEX_WORKERS_COUNT = 0; +/** Number of tasks processed by each worker */ +static constexpr int16_t INDEX_WORK_PER_CHUNK = 1000; struct IndexSummary { std::string name; @@ -87,6 +90,7 @@ private: CThreadInterrupt m_interrupt; ThreadPool* m_thread_pool{nullptr}; + int m_blocks_per_worker{INDEX_WORK_PER_CHUNK}; /// Write the current index state (eg. chain block locator and subclass-specific items) to disk. /// @@ -101,7 +105,8 @@ private: /// Loop over disconnected blocks and call CustomRemove. bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip); - bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr); + std::any ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr); + std::vector ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end); virtual bool AllowPrune() const = 0; @@ -138,6 +143,26 @@ protected: /// Update the internal best block index as well as the prune lock. void SetBestBlockIndex(const CBlockIndex* block); + /// If 'AllowParallelSync()' returns true, 'ProcessBlock()' will run concurrently in batches. + /// The 'std::any' result will be passed to 'CustomPostProcessBlocks()' so the index can process + /// async result batches in a synchronous fashion (if required). + [[nodiscard]] virtual std::any CustomProcessBlock(const interfaces::BlockInfo& block_info) { + // If parallel sync is enabled, the child class must implement this method. + if (AllowParallelSync()) return std::any(); + + // Default, synchronous write + if (!CustomAppend(block_info)) { + throw std::runtime_error(strprintf("%s: Failed to write block %s to index database", + __func__, block_info.hash.ToString())); + } + return true; + } + + /// 'CustomPostProcessBlocks()' is called in a synchronous manner after a batch of async 'ProcessBlock()' + /// calls have completed. + /// Here the index usually links and dump information that cannot be processed in an asynchronous fashion. + [[nodiscard]] virtual bool CustomPostProcessBlocks(const std::any& obj) { return true; }; + public: BaseIndex(std::unique_ptr chain, std::string name); /// Destructor interrupts sync thread if running and blocks until it exits. @@ -174,6 +199,9 @@ public: /// Stops the instance from staying in sync with blockchain updates. void Stop(); + /// Number of blocks each worker thread will process at a time + void SetBlocksPerWorker(int count) { m_blocks_per_worker = count; } + /// True if the child class allows concurrent sync. virtual bool AllowParallelSync() { return false; } diff --git a/src/test/blockfilter_index_tests.cpp b/src/test/blockfilter_index_tests.cpp index 224acb8b079..fb3cb561b7c 100644 --- a/src/test/blockfilter_index_tests.cpp +++ b/src/test/blockfilter_index_tests.cpp @@ -306,81 +306,4 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup) BOOST_CHECK(filter_index == nullptr); } -class IndexReorgCrash : public BaseIndex -{ -private: - std::unique_ptr m_db; - std::shared_future m_blocker; - int m_blocking_height; - -public: - explicit IndexReorgCrash(std::unique_ptr chain, std::shared_future blocker, - int blocking_height) : BaseIndex(std::move(chain), "test index"), m_blocker(blocker), - m_blocking_height(blocking_height) - { - const fs::path path = gArgs.GetDataDirNet() / "index"; - fs::create_directories(path); - m_db = std::make_unique(path / "db", /*n_cache_size=*/0, /*f_memory=*/true, /*f_wipe=*/false); - } - - bool AllowPrune() const override { return false; } - BaseIndex::DB& GetDB() const override { return *m_db; } - - bool CustomAppend(const interfaces::BlockInfo& block) override - { - // Simulate a delay so new blocks can get connected during the initial sync - if (block.height == m_blocking_height) m_blocker.wait(); - - // Move mock time forward so the best index gets updated only when we are not at the blocking height - if (block.height == m_blocking_height - 1 || block.height > m_blocking_height) { - SetMockTime(GetTime() + 31s); - } - - return true; - } -}; - -BOOST_FIXTURE_TEST_CASE(index_reorg_crash, BuildChainTestingSetup) -{ - // Enable mock time - SetMockTime(GetTime()); - - std::promise promise; - std::shared_future blocker(promise.get_future()); - int blocking_height = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->nHeight); - - IndexReorgCrash index(interfaces::MakeChain(m_node), blocker, blocking_height); - BOOST_REQUIRE(index.Init()); - BOOST_REQUIRE(index.StartBackgroundSync()); - - auto func_wait_until = [&](int height, std::chrono::milliseconds timeout) { - auto deadline = std::chrono::steady_clock::now() + timeout; - while (index.GetSummary().best_block_height < height) { - if (std::chrono::steady_clock::now() > deadline) { - BOOST_FAIL(strprintf("Timeout waiting for index height %d (current: %d)", height, index.GetSummary().best_block_height)); - return; - } - std::this_thread::sleep_for(100ms); - } - }; - - // Wait until the index is one block before the fork point - func_wait_until(blocking_height - 1, /*timeout=*/5s); - - // Create a fork to trigger the reorg - std::vector> fork; - const CBlockIndex* prev_tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->pprev); - BOOST_REQUIRE(BuildChain(prev_tip, GetScriptForDestination(PKHash(GenerateRandomKey().GetPubKey())), 3, fork)); - - for (const auto& block : fork) { - BOOST_REQUIRE(m_node.chainman->ProcessNewBlock(block, /*force_processing=*/true, /*min_pow_checked=*/true, nullptr)); - } - - // Unblock the index thread so it can process the reorg - promise.set_value(); - // Wait for the index to reach the new tip - func_wait_until(blocking_height + 2, 5s); - index.Stop(); -} - BOOST_AUTO_TEST_SUITE_END()