mirror of https://github.com/bitcoin/bitcoin.git
index: implement index parallel sync
This introduces parallel sync for the indexes initial sync, distributing the workload across multiple threads. When enabled, the chain is divided into fixed-size block ranges called "tasks". Worker threads consume tasks concurrently, calling to the CustomProcessBlock over their assigned range, storing results in a shared context while opportunistically batching and dumping the collected information to disk sequentially (when needed). Since large reorgs are improbable during initial sync (headers-chain PoW dictates the valid chain), reorgs are detected only before syncing begins and once it completes. Any new blocks connected during the process are caught up sequentially at the end. This, together with the fact that we no longer depend on an intermediate "index best block" that might be out of sync with the index m_best_block_index, allows us to remove the index_reorg_crash test, as it is no longer possible to call Rewind on a block that is not the index tip.
This commit is contained in:
parent
30e59ce958
commit
89ff1b6406
|
@ -13,6 +13,7 @@
|
|||
#include <node/context.h>
|
||||
#include <node/database_args.h>
|
||||
#include <node/interface_ui.h>
|
||||
#include <util/threadpool.h>
|
||||
#include <tinyformat.h>
|
||||
#include <undo.h>
|
||||
#include <util/string.h>
|
||||
|
@ -20,6 +21,7 @@
|
|||
#include <util/translation.h>
|
||||
#include <validation.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
|
@ -149,7 +151,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
|
|||
return chain.Next(chain.FindFork(pindex_prev));
|
||||
}
|
||||
|
||||
bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
|
||||
std::any BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
|
||||
{
|
||||
interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data);
|
||||
|
||||
|
@ -158,7 +160,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
|
|||
if (!m_chainstate->m_blockman.ReadBlock(block, *pindex)) {
|
||||
FatalErrorf("Failed to read block %s from disk",
|
||||
pindex->GetBlockHash().ToString());
|
||||
return false;
|
||||
return {};
|
||||
}
|
||||
block_info.data = █
|
||||
}
|
||||
|
@ -168,87 +170,292 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
|
|||
if (pindex->nHeight > 0 && !m_chainstate->m_blockman.ReadBlockUndo(block_undo, *pindex)) {
|
||||
FatalErrorf("Failed to read undo block data %s from disk",
|
||||
pindex->GetBlockHash().ToString());
|
||||
return false;
|
||||
return {};
|
||||
}
|
||||
block_info.undo_data = &block_undo;
|
||||
}
|
||||
|
||||
if (!CustomAppend(block_info)) {
|
||||
FatalErrorf("Failed to write block %s to index database",
|
||||
pindex->GetBlockHash().ToString());
|
||||
return false;
|
||||
const auto& any_obj = CustomProcessBlock(block_info);
|
||||
if (!any_obj.has_value()) {
|
||||
FatalErrorf("Failed to process block %s for index %s",
|
||||
pindex->GetBlockHash().GetHex(), GetName());
|
||||
return {};
|
||||
}
|
||||
|
||||
return true;
|
||||
return any_obj;
|
||||
}
|
||||
|
||||
std::vector<std::any> BaseIndex::ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end)
|
||||
{
|
||||
std::vector<std::any> results;
|
||||
if (process_in_order) {
|
||||
// When ordering is required, collect all block indexes from [end..start] in order
|
||||
std::vector<const CBlockIndex*> ordered_blocks;
|
||||
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
|
||||
ordered_blocks.emplace_back(block);
|
||||
}
|
||||
|
||||
// And process blocks in forward order: from start to end
|
||||
for (auto it = ordered_blocks.rbegin(); it != ordered_blocks.rend(); ++it) {
|
||||
auto op_res = ProcessBlock(*it);
|
||||
if (!op_res.has_value()) return {};
|
||||
results.emplace_back(std::move(op_res));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// If ordering is not required, process blocks directly from end to start
|
||||
for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
|
||||
auto op_res = ProcessBlock(block);
|
||||
if (!op_res.has_value()) return {};
|
||||
results.emplace_back(std::move(op_res));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
struct Task {
|
||||
int id;
|
||||
const CBlockIndex* start_index;
|
||||
const CBlockIndex* end_index;
|
||||
std::vector<std::any> result;
|
||||
|
||||
Task(int task_id, const CBlockIndex* start, const CBlockIndex* end)
|
||||
: id(task_id), start_index(start), end_index(end) {}
|
||||
|
||||
// Disallow copy
|
||||
Task(const Task&) = delete;
|
||||
Task& operator=(const Task&) = delete;
|
||||
Task(Task&&) noexcept = default;
|
||||
};
|
||||
|
||||
// Context shared across the initial sync workers
|
||||
struct SyncContext {
|
||||
Mutex mutex_pending_tasks;
|
||||
std::queue<Task> pending_tasks GUARDED_BY(mutex_pending_tasks);
|
||||
|
||||
Mutex mutex_processed_tasks;
|
||||
std::map<int, Task> processed_tasks GUARDED_BY(mutex_processed_tasks);
|
||||
|
||||
std::atomic<int> next_id_to_process{0};
|
||||
};
|
||||
|
||||
// Synchronizes the index with the active chain.
|
||||
//
|
||||
// If parallel sync is enabled, this method uses WorkersCount()+1 threads (including the current thread)
|
||||
// to process block ranges concurrently. Each worker handles up to 'm_blocks_per_worker' blocks each time
|
||||
// (this is called a "task"), which are processed via CustomProcessBlock calls. Results are stored in the
|
||||
// SyncContext's 'processed_tasks' map so they can be sequentially post-processed later.
|
||||
//
|
||||
// After completing a task, workers opportunistically post-process completed tasks *in order* using
|
||||
// CustomPostProcessBlocks. This continues until all blocks have been fully processed and committed.
|
||||
//
|
||||
// Reorgs are detected and handled before syncing begins, ensuring the index starts aligned with the active chain.
|
||||
void BaseIndex::Sync()
|
||||
{
|
||||
if (m_synced) return; // we are synced, nothing to do
|
||||
|
||||
// Before anything, verify we are in the active chain
|
||||
const CBlockIndex* pindex = m_best_block_index.load();
|
||||
if (!m_synced) {
|
||||
auto last_log_time{NodeClock::now()};
|
||||
auto last_locator_write_time{last_log_time};
|
||||
while (true) {
|
||||
if (m_interrupt) {
|
||||
LogInfo("%s: m_interrupt set; exiting ThreadSync", GetName());
|
||||
const int tip_height = WITH_LOCK(cs_main, return m_chainstate->m_chain.Height());
|
||||
// Note: be careful, could return null if there is no more work to do or if pindex is not found (erased blocks dir).
|
||||
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
|
||||
if (!pindex_next) {
|
||||
m_synced = true;
|
||||
return;
|
||||
}
|
||||
|
||||
SetBestBlockIndex(pindex);
|
||||
// No need to handle errors in Commit. If it fails, the error will be already be
|
||||
// logged. The best way to recover is to continue, as index cannot be corrupted by
|
||||
// a missed commit to disk for an advanced index state.
|
||||
Commit();
|
||||
return;
|
||||
}
|
||||
// Handle potential reorgs; if the next block's parent doesn't match our current tip,
|
||||
// rewind our index state to match the chain and resume from there.
|
||||
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
|
||||
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
|
||||
return;
|
||||
}
|
||||
|
||||
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
|
||||
// If pindex_next is null, it means pindex is the chain tip, so
|
||||
// commit data indexed so far.
|
||||
if (!pindex_next) {
|
||||
SetBestBlockIndex(pindex);
|
||||
// No need to handle errors in Commit. See rationale above.
|
||||
Commit();
|
||||
// Compute tasks ranges
|
||||
const int blocks_to_sync = tip_height - pindex_next->nHeight;
|
||||
const int num_tasks = blocks_to_sync / m_blocks_per_worker;
|
||||
const int remaining_blocks = blocks_to_sync % m_blocks_per_worker;
|
||||
const bool process_in_order = !AllowParallelSync();
|
||||
|
||||
// If pindex is still the chain tip after committing, exit the
|
||||
// sync loop. It is important for cs_main to be locked while
|
||||
// setting m_synced = true, otherwise a new block could be
|
||||
// attached while m_synced is still false, and it would not be
|
||||
// indexed.
|
||||
LOCK(::cs_main);
|
||||
pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain);
|
||||
if (!pindex_next) {
|
||||
m_synced = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
|
||||
FatalErrorf("Failed to rewind %s to a previous chain tip", GetName());
|
||||
return;
|
||||
}
|
||||
pindex = pindex_next;
|
||||
std::shared_ptr<SyncContext> ctx = std::make_shared<SyncContext>();
|
||||
{
|
||||
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
|
||||
// Create fixed-size tasks
|
||||
const CBlockIndex* it_start = pindex;
|
||||
const CBlockIndex* it_end;
|
||||
for (int id = 0; id < num_tasks; ++id) {
|
||||
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
|
||||
it_end = m_chainstate->m_chain[it_start->nHeight + m_blocks_per_worker - 1];
|
||||
ctx->pending_tasks.emplace(id, it_start, it_end);
|
||||
it_start = it_end;
|
||||
}
|
||||
|
||||
|
||||
if (!ProcessBlock(pindex)) return; // error logged internally
|
||||
|
||||
auto current_time{NodeClock::now()};
|
||||
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
|
||||
LogInfo("Syncing %s with block chain from height %d", GetName(), pindex->nHeight);
|
||||
last_log_time = current_time;
|
||||
}
|
||||
|
||||
if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
|
||||
SetBestBlockIndex(pindex);
|
||||
last_locator_write_time = current_time;
|
||||
// No need to handle errors in Commit. See rationale above.
|
||||
Commit();
|
||||
}
|
||||
// Add final task with the remaining blocks, if any
|
||||
if (remaining_blocks > 0) {
|
||||
it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
|
||||
it_end = m_chainstate->m_chain[it_start->nHeight + remaining_blocks];
|
||||
ctx->pending_tasks.emplace(/*task_id=*/num_tasks, it_start, it_end);
|
||||
}
|
||||
}
|
||||
|
||||
if (pindex) {
|
||||
LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight);
|
||||
} else {
|
||||
LogInfo("%s is enabled", GetName());
|
||||
// Returns nullopt only when there are no pending tasks
|
||||
const auto& try_get_task = [](auto& ctx) -> std::optional<Task> {
|
||||
LOCK(ctx->mutex_pending_tasks);
|
||||
if (ctx->pending_tasks.empty()) return std::nullopt;
|
||||
Task t = std::move(ctx->pending_tasks.front());
|
||||
ctx->pending_tasks.pop();
|
||||
return t;
|
||||
};
|
||||
|
||||
enum class WorkerStatus { ABORT, PROCESSING, FINISHED };
|
||||
|
||||
const auto& func_worker = [this, try_get_task, process_in_order](auto& ctx) -> WorkerStatus {
|
||||
if (m_interrupt) return WorkerStatus::FINISHED;
|
||||
|
||||
// Try to obtain a task and process it
|
||||
if (std::optional<Task> maybe_task = try_get_task(ctx)) {
|
||||
Task task = std::move(*maybe_task);
|
||||
task.result = ProcessBlocks(process_in_order, task.start_index, task.end_index);
|
||||
if (task.result.empty()) {
|
||||
// Empty result indicates an internal error (logged internally).
|
||||
m_interrupt(); // notify other workers and abort.
|
||||
return WorkerStatus::ABORT;
|
||||
}
|
||||
|
||||
LOCK(ctx->mutex_processed_tasks);
|
||||
ctx->processed_tasks.emplace(task.id, std::move(task));
|
||||
} else {
|
||||
// No pending tasks — might be finished
|
||||
// If we still have processed task to consume, proceed to finalize them.
|
||||
LOCK(ctx->mutex_processed_tasks);
|
||||
if (ctx->processed_tasks.empty()) return WorkerStatus::FINISHED;
|
||||
}
|
||||
|
||||
// Post-process completed tasks opportunistically
|
||||
std::vector<Task> to_process;
|
||||
{
|
||||
TRY_LOCK(ctx->mutex_processed_tasks, locked);
|
||||
if (!locked) return WorkerStatus::PROCESSING;
|
||||
|
||||
// Collect ready-to-process tasks in order
|
||||
int next_id = ctx->next_id_to_process.load();
|
||||
while (true) {
|
||||
auto it = ctx->processed_tasks.find(next_id);
|
||||
if (it == ctx->processed_tasks.end()) break;
|
||||
to_process.push_back(std::move(it->second));
|
||||
ctx->processed_tasks.erase(it);
|
||||
++next_id;
|
||||
}
|
||||
|
||||
// Nothing to process right now, keep processing
|
||||
if (to_process.empty()) return WorkerStatus::PROCESSING;
|
||||
}
|
||||
|
||||
// Post-Process tasks
|
||||
for (const Task& task : to_process) {
|
||||
for (auto it = task.result.rbegin(); it != task.result.rend(); ++it) {
|
||||
if (!CustomPostProcessBlocks(*it)) { // error logged internally
|
||||
m_interrupt();
|
||||
FatalErrorf("Index %s: Failed to post process blocks", GetName());
|
||||
return WorkerStatus::ABORT;
|
||||
}
|
||||
}
|
||||
// Update progress
|
||||
SetBestBlockIndex(task.end_index);
|
||||
ctx->next_id_to_process.store(task.id + 1);
|
||||
}
|
||||
|
||||
// Check if there's anything left to do
|
||||
LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks);
|
||||
if (ctx->pending_tasks.empty() && ctx->processed_tasks.empty()) {
|
||||
return WorkerStatus::FINISHED;
|
||||
}
|
||||
|
||||
return WorkerStatus::PROCESSING;
|
||||
};
|
||||
|
||||
// Process task in parallel if enabled
|
||||
std::vector<std::future<void>> workers_job;
|
||||
if (m_thread_pool) {
|
||||
for (size_t i = 0; i < m_thread_pool->WorkersCount(); ++i) {
|
||||
workers_job.emplace_back(m_thread_pool->Submit([this, ctx, func_worker]() {
|
||||
WorkerStatus status{WorkerStatus::PROCESSING};
|
||||
while (!m_synced && status == WorkerStatus::PROCESSING) {
|
||||
status = func_worker(ctx);
|
||||
if (m_interrupt) return;
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Main index thread
|
||||
// Active-wait: we process blocks in this thread too.
|
||||
auto last_log_time{NodeClock::now()};
|
||||
auto last_locator_write_time{last_log_time};
|
||||
|
||||
while (!m_synced) {
|
||||
const WorkerStatus status{func_worker(ctx)};
|
||||
if (m_interrupt || status == WorkerStatus::ABORT) {
|
||||
m_interrupt();
|
||||
// Ensure all workers are interrupted before returning.
|
||||
// This avoids accessing any local variable post-destruction.
|
||||
for (const auto& job : workers_job) job.wait();
|
||||
return;
|
||||
}
|
||||
|
||||
if (status == WorkerStatus::FINISHED) {
|
||||
// No more tasks to process; wait for all workers to finish their current tasks
|
||||
for (const auto& job : workers_job) job.wait();
|
||||
// No need to handle errors in Commit. If it fails, the error will be already be
|
||||
// logged. The best way to recover is to continue, as index cannot be corrupted by
|
||||
// a missed commit to disk for an advanced index state.
|
||||
Commit();
|
||||
|
||||
// Before finishing, check if any new blocks were connected while we were syncing.
|
||||
// If so, process them synchronously before exiting.
|
||||
//
|
||||
// Note: it is important for cs_main to be locked while setting m_synced = true,
|
||||
// otherwise a new block could be attached while m_synced is still false, and
|
||||
// it would not be indexed.
|
||||
LOCK2(ctx->mutex_pending_tasks, ::cs_main);
|
||||
const CBlockIndex* curr_tip{m_best_block_index.load()};
|
||||
pindex_next = NextSyncBlock(curr_tip, m_chainstate->m_chain);
|
||||
// If the next block is null, it means we are done!
|
||||
if (!pindex_next) {
|
||||
m_synced = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// New blocks arrived during sync.
|
||||
// Handle potential reorgs; if the next block's parent doesn't match our tip,
|
||||
// rewind index state to the correct chain, then resume.
|
||||
if (pindex_next->pprev != curr_tip && !Rewind(curr_tip, pindex_next->pprev)) {
|
||||
FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
|
||||
return;
|
||||
}
|
||||
|
||||
// Queue the final range of blocks to process.
|
||||
ctx->pending_tasks.emplace(ctx->next_id_to_process.load(),
|
||||
/*start_index=*/pindex_next,
|
||||
/*end_index=*/m_chainstate->m_chain.Tip());
|
||||
}
|
||||
|
||||
auto current_time{NodeClock::now()};
|
||||
// Log progress every so often
|
||||
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
|
||||
LogInfo("Syncing %s with block chain from height %d\n",
|
||||
GetName(), m_best_block_index.load()->nHeight);
|
||||
last_log_time = current_time;
|
||||
}
|
||||
|
||||
// Commit changes every so often
|
||||
if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
|
||||
Commit(); // No need to handle errors in Commit. See rationale above.
|
||||
last_locator_write_time = current_time;
|
||||
}
|
||||
}
|
||||
|
||||
LogInfo("%s is enabled at height %d\n", GetName(), (m_best_block_index) ? m_best_block_index.load()->nHeight : 0);
|
||||
}
|
||||
|
||||
bool BaseIndex::Commit()
|
||||
|
@ -273,6 +480,7 @@ bool BaseIndex::Commit()
|
|||
|
||||
bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
|
||||
{
|
||||
assert(current_tip == m_best_block_index);
|
||||
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);
|
||||
|
||||
CBlock block;
|
||||
|
@ -354,7 +562,7 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr<const
|
|||
}
|
||||
|
||||
// Dispatch block to child class; errors are logged internally and abort the node.
|
||||
if (ProcessBlock(pindex, block.get())) {
|
||||
if (CustomPostProcessBlocks(ProcessBlock(pindex, block.get()))) {
|
||||
// Setting the best block index is intentionally the last step of this
|
||||
// function, so BlockUntilSyncedToCurrentChain callers waiting for the
|
||||
// best block index to be updated can rely on the block being fully
|
||||
|
@ -402,7 +610,7 @@ void BaseIndex::ChainStateFlushed(ChainstateRole role, const CBlockLocator& loca
|
|||
return;
|
||||
}
|
||||
|
||||
// No need to handle errors in Commit. If it fails, the error will be already be logged. The
|
||||
// No need to handle errors in Commit. If it fails, the error will already be logged. The
|
||||
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
|
||||
// for an advanced index state.
|
||||
Commit();
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include <util/threadinterrupt.h>
|
||||
#include <validationinterface.h>
|
||||
|
||||
#include <any>
|
||||
#include <string>
|
||||
|
||||
class CBlock;
|
||||
|
@ -27,6 +28,8 @@ class Chain;
|
|||
static constexpr int16_t MAX_INDEX_WORKERS_COUNT = 100;
|
||||
/** Number of concurrent jobs during the initial sync process */
|
||||
static constexpr int16_t INDEX_WORKERS_COUNT = 0;
|
||||
/** Number of tasks processed by each worker */
|
||||
static constexpr int16_t INDEX_WORK_PER_CHUNK = 1000;
|
||||
|
||||
struct IndexSummary {
|
||||
std::string name;
|
||||
|
@ -87,6 +90,7 @@ private:
|
|||
CThreadInterrupt m_interrupt;
|
||||
|
||||
ThreadPool* m_thread_pool{nullptr};
|
||||
int m_blocks_per_worker{INDEX_WORK_PER_CHUNK};
|
||||
|
||||
/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
|
||||
///
|
||||
|
@ -101,7 +105,8 @@ private:
|
|||
/// Loop over disconnected blocks and call CustomRemove.
|
||||
bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);
|
||||
|
||||
bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
|
||||
std::any ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
|
||||
std::vector<std::any> ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end);
|
||||
|
||||
virtual bool AllowPrune() const = 0;
|
||||
|
||||
|
@ -138,6 +143,26 @@ protected:
|
|||
/// Update the internal best block index as well as the prune lock.
|
||||
void SetBestBlockIndex(const CBlockIndex* block);
|
||||
|
||||
/// If 'AllowParallelSync()' returns true, 'ProcessBlock()' will run concurrently in batches.
|
||||
/// The 'std::any' result will be passed to 'CustomPostProcessBlocks()' so the index can process
|
||||
/// async result batches in a synchronous fashion (if required).
|
||||
[[nodiscard]] virtual std::any CustomProcessBlock(const interfaces::BlockInfo& block_info) {
|
||||
// If parallel sync is enabled, the child class must implement this method.
|
||||
if (AllowParallelSync()) return std::any();
|
||||
|
||||
// Default, synchronous write
|
||||
if (!CustomAppend(block_info)) {
|
||||
throw std::runtime_error(strprintf("%s: Failed to write block %s to index database",
|
||||
__func__, block_info.hash.ToString()));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// 'CustomPostProcessBlocks()' is called in a synchronous manner after a batch of async 'ProcessBlock()'
|
||||
/// calls have completed.
|
||||
/// Here the index usually links and dump information that cannot be processed in an asynchronous fashion.
|
||||
[[nodiscard]] virtual bool CustomPostProcessBlocks(const std::any& obj) { return true; };
|
||||
|
||||
public:
|
||||
BaseIndex(std::unique_ptr<interfaces::Chain> chain, std::string name);
|
||||
/// Destructor interrupts sync thread if running and blocks until it exits.
|
||||
|
@ -174,6 +199,9 @@ public:
|
|||
/// Stops the instance from staying in sync with blockchain updates.
|
||||
void Stop();
|
||||
|
||||
/// Number of blocks each worker thread will process at a time
|
||||
void SetBlocksPerWorker(int count) { m_blocks_per_worker = count; }
|
||||
|
||||
/// True if the child class allows concurrent sync.
|
||||
virtual bool AllowParallelSync() { return false; }
|
||||
|
||||
|
|
|
@ -306,81 +306,4 @@ BOOST_FIXTURE_TEST_CASE(blockfilter_index_init_destroy, BasicTestingSetup)
|
|||
BOOST_CHECK(filter_index == nullptr);
|
||||
}
|
||||
|
||||
class IndexReorgCrash : public BaseIndex
|
||||
{
|
||||
private:
|
||||
std::unique_ptr<BaseIndex::DB> m_db;
|
||||
std::shared_future<void> m_blocker;
|
||||
int m_blocking_height;
|
||||
|
||||
public:
|
||||
explicit IndexReorgCrash(std::unique_ptr<interfaces::Chain> chain, std::shared_future<void> blocker,
|
||||
int blocking_height) : BaseIndex(std::move(chain), "test index"), m_blocker(blocker),
|
||||
m_blocking_height(blocking_height)
|
||||
{
|
||||
const fs::path path = gArgs.GetDataDirNet() / "index";
|
||||
fs::create_directories(path);
|
||||
m_db = std::make_unique<BaseIndex::DB>(path / "db", /*n_cache_size=*/0, /*f_memory=*/true, /*f_wipe=*/false);
|
||||
}
|
||||
|
||||
bool AllowPrune() const override { return false; }
|
||||
BaseIndex::DB& GetDB() const override { return *m_db; }
|
||||
|
||||
bool CustomAppend(const interfaces::BlockInfo& block) override
|
||||
{
|
||||
// Simulate a delay so new blocks can get connected during the initial sync
|
||||
if (block.height == m_blocking_height) m_blocker.wait();
|
||||
|
||||
// Move mock time forward so the best index gets updated only when we are not at the blocking height
|
||||
if (block.height == m_blocking_height - 1 || block.height > m_blocking_height) {
|
||||
SetMockTime(GetTime<std::chrono::seconds>() + 31s);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(index_reorg_crash, BuildChainTestingSetup)
|
||||
{
|
||||
// Enable mock time
|
||||
SetMockTime(GetTime<std::chrono::minutes>());
|
||||
|
||||
std::promise<void> promise;
|
||||
std::shared_future<void> blocker(promise.get_future());
|
||||
int blocking_height = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->nHeight);
|
||||
|
||||
IndexReorgCrash index(interfaces::MakeChain(m_node), blocker, blocking_height);
|
||||
BOOST_REQUIRE(index.Init());
|
||||
BOOST_REQUIRE(index.StartBackgroundSync());
|
||||
|
||||
auto func_wait_until = [&](int height, std::chrono::milliseconds timeout) {
|
||||
auto deadline = std::chrono::steady_clock::now() + timeout;
|
||||
while (index.GetSummary().best_block_height < height) {
|
||||
if (std::chrono::steady_clock::now() > deadline) {
|
||||
BOOST_FAIL(strprintf("Timeout waiting for index height %d (current: %d)", height, index.GetSummary().best_block_height));
|
||||
return;
|
||||
}
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
};
|
||||
|
||||
// Wait until the index is one block before the fork point
|
||||
func_wait_until(blocking_height - 1, /*timeout=*/5s);
|
||||
|
||||
// Create a fork to trigger the reorg
|
||||
std::vector<std::shared_ptr<CBlock>> fork;
|
||||
const CBlockIndex* prev_tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveChain().Tip()->pprev);
|
||||
BOOST_REQUIRE(BuildChain(prev_tip, GetScriptForDestination(PKHash(GenerateRandomKey().GetPubKey())), 3, fork));
|
||||
|
||||
for (const auto& block : fork) {
|
||||
BOOST_REQUIRE(m_node.chainman->ProcessNewBlock(block, /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
|
||||
}
|
||||
|
||||
// Unblock the index thread so it can process the reorg
|
||||
promise.set_value();
|
||||
// Wait for the index to reach the new tip
|
||||
func_wait_until(blocking_height + 2, 5s);
|
||||
index.Stop();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
|
Loading…
Reference in New Issue