index: implement index parallel sync

This introduces parallel sync for the indexes initial sync,
distributing the workload across multiple threads.

When enabled, the chain is divided into fixed-size block ranges called
"tasks". Worker threads consume tasks concurrently, calling to the
CustomProcessBlock over their assigned range, storing results in a
shared context while opportunistically batching and dumping the collected
information to disk sequentially (when needed).

Since large reorgs are improbable during initial sync (headers-chain PoW
dictates the valid chain), reorgs are detected only before syncing begins
and once it completes. Any new blocks connected during the process are
caught up sequentially at the end. This, together with the fact that we
no longer depend on an intermediate "index best block" that might be out
of sync with the index m_best_block_index, allows us to remove the
index_reorg_crash test, as it is no longer possible to call Rewind on a
block that is not the index tip.
This commit is contained in:
furszy 2023-02-20 20:17:50 -03:00
parent c932ccd8d6
commit 4156cd6ee8
No known key found for this signature in database
GPG Key ID: 5DD23CCC686AA623
3 changed files with 304 additions and 145 deletions

View File

@ -13,6 +13,7 @@
#include <node/context.h>
#include <node/database_args.h>
#include <node/interface_ui.h>
#include <util/threadpool.h>
#include <tinyformat.h>
#include <undo.h>
#include <util/string.h>
@ -20,6 +21,7 @@
#include <util/translation.h>
#include <validation.h>
#include <algorithm>
#include <chrono>
#include <memory>
#include <optional>
@ -149,7 +151,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
return chain.Next(chain.FindFork(pindex_prev));
}
bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
std::any BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
{
interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data);
@ -158,7 +160,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
if (!m_chainstate->m_blockman.ReadBlock(block, *pindex)) {
FatalErrorf("Failed to read block %s from disk",
pindex->GetBlockHash().ToString());
return false;
return {};
}
block_info.data = &block;
}
@ -168,87 +170,292 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
if (pindex->nHeight > 0 && !m_chainstate->m_blockman.ReadBlockUndo(block_undo, *pindex)) {
FatalErrorf("Failed to read undo block data %s from disk",
pindex->GetBlockHash().ToString());
return false;
return {};
}
block_info.undo_data = &block_undo;
}
if (!CustomAppend(block_info)) {
FatalErrorf("Failed to write block %s to index database",
pindex->GetBlockHash().ToString());
return false;
const auto& any_obj = CustomProcessBlock(block_info);
if (!any_obj.has_value()) {
FatalErrorf("Failed to process block %s for index %s",
pindex->GetBlockHash().GetHex(), GetName());
return {};
}
return true;
return any_obj;
}
std::vector<std::any> BaseIndex::ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end)
{
std::vector<std::any> results;
if (process_in_order) {
// When ordering is required, collect all block indexes from [end..start] in order
std::vector<const CBlockIndex*> ordered_blocks;
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
ordered_blocks.emplace_back(block);
}
// And process blocks in forward order: from start to end
for (auto it = ordered_blocks.rbegin(); it != ordered_blocks.rend(); ++it) {
auto op_res = ProcessBlock(*it);
if (!op_res.has_value()) return {};
results.emplace_back(std::move(op_res));
}
return results;
}
// If ordering is not required, process blocks directly from end to start
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
auto op_res = ProcessBlock(block);
if (!op_res.has_value()) return {};
results.emplace_back(std::move(op_res));
}
return results;
}
struct Task {
int id;
const CBlockIndex* start_index;
const CBlockIndex* end_index;
std::vector<std::any> result;
Task(int task_id, const CBlockIndex* start, const CBlockIndex* end)
: id(task_id), start_index(start), end_index(end) {}
// Disallow copy
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task(Task&&) noexcept = default;
};
// Context shared across the initial sync workers
struct SyncContext {
Mutex mutex_pending_tasks;
std::queue<Task> pending_tasks GUARDED_BY(mutex_pending_tasks);
Mutex mutex_processed_tasks;
std::map<int, Task> processed_tasks GUARDED_BY(mutex_processed_tasks);
std::atomic<int> next_id_to_process{0};
};
// Synchronizes the index with the active chain.
//
// If parallel sync is enabled, this method uses WorkersCount()+1 threads (including the current thread)
// to process block ranges concurrently. Each worker handles up to 'm_blocks_per_worker' blocks each time
// (this is called a "task"), which are processed via CustomProcessBlock calls. Results are stored in the
// SyncContext's 'processed_tasks' map so they can be sequentially post-processed later.
//
// After completing a task, workers opportunistically post-process completed tasks *in order* using
// CustomPostProcessBlocks. This continues until all blocks have been fully processed and committed.
//
// Reorgs are detected and handled before syncing begins, ensuring the index starts aligned with the active chain.
void BaseIndex::Sync()
{
if (m_synced) return; // we are synced, nothing to do
// Before anything, verify we are in the active chain
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
const int tip_height = WITH_LOCK(cs_main, return m_chainstate->m_chain.Height());
// Note: be careful, could return null if there is no more work to do or if pindex is not found (erased blocks dir).
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
if (!pindex_next) {
m_synced = true;
return;
}
// Handle potential reorgs; if the next block's parent doesn't match our current tip,
// rewind our index state to match the chain and resume from there.
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
return;
}
// Compute tasks ranges
const int blocks_to_sync = tip_height - pindex_next->nHeight;
const int num_tasks = blocks_to_sync / m_blocks_per_worker;
const int remaining_blocks = blocks_to_sync % m_blocks_per_worker;
const bool process_in_order = !AllowParallelSync();
std::shared_ptr<SyncContext> ctx = std::make_shared<SyncContext>();
{
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
// Create fixed-size tasks
const CBlockIndex* it_start = pindex;
const CBlockIndex* it_end;
for (int id = 0; id < num_tasks; ++id) {
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
it_end = m_chainstate->m_chain[it_start->nHeight + m_blocks_per_worker - 1];
ctx->pending_tasks.emplace(id, it_start, it_end);
it_start = it_end;
}
// Add final task with the remaining blocks, if any
if (remaining_blocks > 0) {
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
it_end = m_chainstate->m_chain[it_start->nHeight + remaining_blocks];
ctx->pending_tasks.emplace(/*task_id=*/num_tasks, it_start, it_end);
}
}
// Returns nullopt only when there are no pending tasks
const auto& try_get_task = [](auto& ctx) -> std::optional<Task> {
LOCK(ctx->mutex_pending_tasks);
if (ctx->pending_tasks.empty()) return std::nullopt;
Task t = std::move(ctx->pending_tasks.front());
ctx->pending_tasks.pop();
return t;
};
enum class WorkerStatus { ABORT, PROCESSING, FINISHED };
const auto& func_worker = [this, try_get_task, process_in_order](auto& ctx) -> WorkerStatus {
if (m_interrupt) return WorkerStatus::FINISHED;
// Try to obtain a task and process it
if (std::optional<Task> maybe_task = try_get_task(ctx)) {
Task task = std::move(*maybe_task);
task.result = ProcessBlocks(process_in_order, task.start_index, task.end_index);
if (task.result.empty()) {
// Empty result indicates an internal error (logged internally).
m_interrupt(); // notify other workers and abort.
return WorkerStatus::ABORT;
}
LOCK(ctx->mutex_processed_tasks);
ctx->processed_tasks.emplace(task.id, std::move(task));
} else {
// No pending tasks — might be finished
// If we still have processed task to consume, proceed to finalize them.
LOCK(ctx->mutex_processed_tasks);
if (ctx->processed_tasks.empty()) return WorkerStatus::FINISHED;
}
// Post-process completed tasks opportunistically
std::vector<Task> to_process;
{
TRY_LOCK(ctx->mutex_processed_tasks, locked);
if (!locked) return WorkerStatus::PROCESSING;
// Collect ready-to-process tasks in order
int next_id = ctx->next_id_to_process.load();
while (true) {
auto it = ctx->processed_tasks.find(next_id);
if (it == ctx->processed_tasks.end()) break;
to_process.push_back(std::move(it->second));
ctx->processed_tasks.erase(it);
++next_id;
}
// Nothing to process right now, keep processing
if (to_process.empty()) return WorkerStatus::PROCESSING;
}
// Post-Process tasks
for (const Task& task : to_process) {
for (auto it = task.result.rbegin(); it != task.result.rend(); ++it) {
if (!CustomPostProcessBlocks(*it)) { // error logged internally
m_interrupt();
FatalErrorf("Index %s: Failed to post process blocks", GetName());
return WorkerStatus::ABORT;
}
}
// Update progress
SetBestBlockIndex(task.end_index);
ctx->next_id_to_process.store(task.id + 1);
}
// Check if there's anything left to do
LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks);
if (ctx->pending_tasks.empty() && ctx->processed_tasks.empty()) {
return WorkerStatus::FINISHED;
}
return WorkerStatus::PROCESSING;
};
// Process task in parallel if enabled
std::vector<std::future<void>> workers_job;
if (m_thread_pool) {
for (size_t i = 0; i < m_thread_pool->WorkersCount(); ++i) {
workers_job.emplace_back(m_thread_pool->Submit([this, ctx, func_worker]() {
WorkerStatus status{WorkerStatus::PROCESSING};
while (!m_synced && status == WorkerStatus::PROCESSING) {
status = func_worker(ctx);
if (m_interrupt) return;
}
}));
}
}
// Main index thread
// Active-wait: we process blocks in this thread too.
auto last_log_time{NodeClock::now()};
auto last_locator_write_time{last_log_time};
while (true) {
if (m_interrupt) {
LogInfo("%s: m_interrupt set; exiting ThreadSync", GetName());
SetBestBlockIndex(pindex);
while (!m_synced) {
const WorkerStatus status{func_worker(ctx)};
if (m_interrupt || status == WorkerStatus::ABORT) {
m_interrupt();
// Ensure all workers are interrupted before returning.
// This avoids accessing any local variable post-destruction.
for (const auto& job : workers_job) job.wait();
return;
}
if (status == WorkerStatus::FINISHED) {
// No more tasks to process; wait for all workers to finish their current tasks
for (const auto& job : workers_job) job.wait();
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
return;
}
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
// If pindex_next is null, it means pindex is the chain tip, so
// commit data indexed so far.
if (!pindex_next) {
SetBestBlockIndex(pindex);
// No need to handle errors in Commit. See rationale above.
Commit();
// If pindex is still the chain tip after committing, exit the
// sync loop. It is important for cs_main to be locked while
// setting m_synced = true, otherwise a new block could be
// attached while m_synced is still false, and it would not be
// indexed.
LOCK(::cs_main);
pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain);
// Before finishing, check if any new blocks were connected while we were syncing.
// If so, process them synchronously before exiting.
//
// Note: it is important for cs_main to be locked while setting m_synced = true,
// otherwise a new block could be attached while m_synced is still false, and
// it would not be indexed.
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
const CBlockIndex* curr_tip{m_best_block_index.load()};
pindex_next = NextSyncBlock(curr_tip, m_chainstate->m_chain);
// If the next block is null, it means we are done!
if (!pindex_next) {
m_synced = true;
break;
}
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("Failed to rewind %s to a previous chain tip", GetName());
// New blocks arrived during sync.
// Handle potential reorgs; if the next block's parent doesn't match our tip,
// rewind index state to the correct chain, then resume.
if (pindex_next->pprev != curr_tip && !Rewind(curr_tip, pindex_next->pprev)) {
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
return;
}
pindex = pindex_next;
if (!ProcessBlock(pindex)) return; // error logged internally
// Queue the final range of blocks to process.
ctx->pending_tasks.emplace(ctx->next_id_to_process.load(),
/*start_index=*/pindex_next,
/*end_index=*/m_chainstate->m_chain.Tip());
}
auto current_time{NodeClock::now()};
// Log progress every so often
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
LogInfo("Syncing %s with block chain from height %d", GetName(), pindex->nHeight);
LogInfo("Syncing %s with block chain from height %d\n",
GetName(), m_best_block_index.load()->nHeight);
last_log_time = current_time;
}
// Commit changes every so often
if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
SetBestBlockIndex(pindex);
Commit(); // No need to handle errors in Commit. See rationale above.
last_locator_write_time = current_time;
// No need to handle errors in Commit. See rationale above.
Commit();
}
}
}
if (pindex) {
LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight);
} else {
LogInfo("%s is enabled", GetName());
}
LogInfo("%s is enabled at height %d\n", GetName(), (m_best_block_index) ? m_best_block_index.load()->nHeight : 0);
}
bool BaseIndex::Commit()
@ -273,6 +480,7 @@ bool BaseIndex::Commit()
bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
{
assert(current_tip == m_best_block_index);
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);
CBlock block;
@ -354,7 +562,7 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr<const
}
// Dispatch block to child class; errors are logged internally and abort the node.
if (ProcessBlock(pindex, block.get())) {
if (CustomPostProcessBlocks(ProcessBlock(pindex, block.get()))) {
// Setting the best block index is intentionally the last step of this
// function, so BlockUntilSyncedToCurrentChain callers waiting for the
// best block index to be updated can rely on the block being fully
@ -402,7 +610,7 @@ void BaseIndex::ChainStateFlushed(ChainstateRole role, const CBlockLocator& loca
return;
}
// No need to handle errors in Commit. If it fails, the error will be already be logged. The
// No need to handle errors in Commit. If it fails, the error will already be logged. The
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
// for an advanced index state.
Commit();

View File

@ -12,6 +12,7 @@
#include <util/threadinterrupt.h>
#include <validationinterface.h>
#include <any>
#include <string>
class CBlock;
@ -27,6 +28,8 @@ class Chain;
static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100;
/** Number of concurrent jobs during the initial sync process */
static constexpr int16_t INDEX_WORKERS_COUNT = 0;
/** Number of tasks processed by each worker */
static constexpr int16_t INDEX_WORK_PER_CHUNK = 1000;
struct IndexSummary {
std::string name;
@ -87,6 +90,7 @@ private:
CThreadInterrupt m_interrupt;
ThreadPool* m_thread_pool{nullptr};
int m_blocks_per_worker{INDEX_WORK_PER_CHUNK};
/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
///
@ -101,7 +105,8 @@ private:
/// Loop over disconnected blocks and call CustomRemove.
bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);
bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::any ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
std::vector<std::any> ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end);
virtual bool AllowPrune() const = 0;
@ -138,6 +143,26 @@ protected:
/// Update the internal best block index as well as the prune lock.
void SetBestBlockIndex(const CBlockIndex* block);
/// If 'AllowParallelSync()' returns true, 'ProcessBlock()' will run concurrently in batches.
/// The 'std::any' result will be passed to 'CustomPostProcessBlocks()' so the index can process
/// async result batches in a synchronous fashion (if required).
[[nodiscard]] virtual std::any CustomProcessBlock(const interfaces::BlockInfo& block_info) {
// If parallel sync is enabled, the child class must implement this method.
if (AllowParallelSync()) return std::any();
// Default, synchronous write
if (!CustomAppend(block_info)) {
throw std::runtime_error(strprintf("%s: Failed to write block %s to index database",
__func__, block_info.hash.ToString()));
}
return true;
}
/// 'CustomPostProcessBlocks()' is called in a synchronous manner after a batch of async 'ProcessBlock()'
/// calls have completed.
/// Here the index usually links and dump information that cannot be processed in an asynchronous fashion.
[[nodiscard]] virtual bool CustomPostProcessBlocks(const std::any& obj) { return true; };
public:
BaseIndex(std::unique_ptr<interfaces::Chain> chain, std::string name);
/// Destructor interrupts sync thread if running and blocks until it exits.
@ -174,6 +199,9 @@ public:
/// Stops the instance from staying in sync with blockchain updates.
void Stop();
/// Number of blocks each worker thread will process at a time
void SetBlocksPerWorker(int count) { m_blocks_per_worker = count; }
/// True if the child class allows concurrent sync.
virtual bool AllowParallelSync() { return false; }

View File

@ -306,81 +306,4 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup)
BOOST_CHECK(filter_index == nullptr);
}
class IndexReorgCrash : public BaseIndex
{
private:
std::unique_ptr<BaseIndex::DB> m_db;
std::shared_future<void> m_blocker;
int m_blocking_height;
public:
explicit IndexReorgCrash(std::unique_ptr<interfaces::Chain> chain, std::shared_future<void> blocker,
int blocking_height) : BaseIndex(std::move(chain), "test index"), m_blocker(blocker),
m_blocking_height(blocking_height)
{
const fs::path path = gArgs.GetDataDirNet() / "index";
fs::create_directories(path);
m_db = std::make_unique<BaseIndex::DB>(path / "db", /*n_cache_size=*/0, /*f_memory=*/true, /*f_wipe=*/false);
}
bool AllowPrune() const override { return false; }
BaseIndex::DB& GetDB() const override { return *m_db; }
bool CustomAppend(const interfaces::BlockInfo& block) override
{
// Simulate a delay so new blocks can get connected during the initial sync
if (block.height == m_blocking_height) m_blocker.wait();
// Move mock time forward so the best index gets updated only when we are not at the blocking height
if (block.height == m_blocking_height - 1 || block.height > m_blocking_height) {
SetMockTime(GetTime<std::chrono::seconds>() + 31s);
}
return true;
}
};
BOOST_FIXTURE_TEST_CASE(index_reorg_crash, BuildChainTestingSetup)
{
// Enable mock time
SetMockTime(GetTime<std::chrono::minutes>());
std::promise<void> promise;
std::shared_future<void> blocker(promise.get_future());
int blocking_height = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->nHeight);
IndexReorgCrash index(interfaces::MakeChain(m_node), blocker, blocking_height);
BOOST_REQUIRE(index.Init());
BOOST_REQUIRE(index.StartBackgroundSync());
auto func_wait_until = [&](int height, std::chrono::milliseconds timeout) {
auto deadline = std::chrono::steady_clock::now() + timeout;
while (index.GetSummary().best_block_height < height) {
if (std::chrono::steady_clock::now() > deadline) {
BOOST_FAIL(strprintf("Timeout waiting for index height %d (current: %d)", height, index.GetSummary().best_block_height));
return;
}
std::this_thread::sleep_for(100ms);
}
};
// Wait until the index is one block before the fork point
func_wait_until(blocking_height - 1, /*timeout=*/5s);
// Create a fork to trigger the reorg
std::vector<std::shared_ptr<CBlock>> fork;
const CBlockIndex* prev_tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->pprev);
BOOST_REQUIRE(BuildChain(prev_tip, GetScriptForDestination(PKHash(GenerateRandomKey().GetPubKey())), 3, fork));
for (const auto& block : fork) {
BOOST_REQUIRE(m_node.chainman->ProcessNewBlock(block, /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
}
// Unblock the index thread so it can process the reorg
promise.set_value();
// Wait for the index to reach the new tip
func_wait_until(blocking_height + 2, 5s);
index.Stop();
}
BOOST_AUTO_TEST_SUITE_END()