diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index a818dba7205..d8bbb76a0ba 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -103,6 +103,7 @@ add_executable(test_bitcoin sync_tests.cpp system_tests.cpp testnet4_miner_tests.cpp + threadpool_tests.cpp timeoffsets_tests.cpp torcontrol_tests.cpp transaction_tests.cpp diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp new file mode 100644 index 00000000000..e245fc3bc34 --- /dev/null +++ b/src/test/threadpool_tests.cpp @@ -0,0 +1,268 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +#include + +BOOST_AUTO_TEST_SUITE(threadpool_tests) + +constexpr auto TIMEOUT_SECS = std::chrono::seconds(120); + +template +void WaitFor(std::vector>& futures, const std::string& context) +{ + for (size_t i = 0; i < futures.size(); ++i) { + if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) { + throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i)); + } + } +} + +BOOST_AUTO_TEST_CASE(threadpool_basic) +{ + // Test Cases + // 1) Submit tasks and verify completion. + // 2) Maintain all threads busy except one. + // 3) Wait for work to finish. + // 4) Wait for result object. + // 5) The task throws an exception, catch must be done in the consumer side. + // 6) Busy workers, help them by processing tasks from outside. + + const int NUM_WORKERS_DEFAULT = 3; + const std::string POOL_NAME = "test"; + + // Test case 1, submit tasks and verify completion. + { + int num_tasks = 50; + + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic counter = 0; + + // Store futures to ensure completion before checking counter. + std::vector> futures; + futures.reserve(num_tasks); + + for (int i = 1; i <= num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter, i]() { + counter.fetch_add(i); + })); + } + + // Wait for all tasks to finish + WaitFor(futures, /*context=*/"test1 task"); + int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum. + BOOST_CHECK_EQUAL(counter.load(), expected_value); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); + } + + // Test case 2, maintain all threads busy except one. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + // Single blocking future for all threads + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + + // Use per-thread ready promises to ensure all blocked threads have started + std::vector> ready_promises(NUM_WORKERS_DEFAULT - 1); + std::vector> ready_futures; + ready_futures.reserve(NUM_WORKERS_DEFAULT - 1); + for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future()); + + // Submit blocking task to all threads except one + std::vector> blocking_tasks; + blocking_tasks.reserve(NUM_WORKERS_DEFAULT - 1); + for (int i = 0; i < NUM_WORKERS_DEFAULT - 1; i++) { + std::promise& ready = ready_promises[i]; + blocking_tasks.emplace_back(threadPool.Submit([&ready, blocker_future]() { + ready.set_value(); + blocker_future.wait(); + })); + } + // Wait until all blocked threads are actually blocked + WaitFor(ready_futures, /*context=*/"test2 blocking tasks enabled"); + + // Now execute tasks on the single available worker + // and check that all the tasks are executed. + int num_tasks = 15; + std::atomic counter = 0; + + // Store futures to wait on + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 0; i < num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter]() { + counter.fetch_add(1); + })); + } + + WaitFor(futures, /*context=*/"test2 tasks"); + BOOST_CHECK_EQUAL(counter.load(), num_tasks); + + blocker.set_value(); + WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled"); + threadPool.Stop(); + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); + } + + // Test case 3, wait for work to finish. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic flag = false; + std::future future = threadPool.Submit([&flag]() { + std::this_thread::sleep_for(std::chrono::milliseconds{200}); + flag.store(true); + }); + future.wait(); + BOOST_CHECK(flag.load()); + } + + // Test case 4, obtain result object. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::future future_bool = threadPool.Submit([]() { + return true; + }); + BOOST_CHECK(future_bool.get()); + + std::future future_str = threadPool.Submit([]() { + return std::string("true"); + }); + std::string result = future_str.get(); + BOOST_CHECK_EQUAL(result, "true"); + } + + // Test case 5, throw exception and catch it on the consumer side. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + int ROUNDS = 5; + std::string err_msg{"something wrong happened"}; + std::vector> futures; + futures.reserve(ROUNDS); + for (int i = 0; i < ROUNDS; i++) { + futures.emplace_back(threadPool.Submit([err_msg, i]() { + throw std::runtime_error(err_msg + util::ToString(i)); + })); + } + + for (int i = 0; i < ROUNDS; i++) { + try { + futures.at(i).get(); + BOOST_FAIL("Expected exception not thrown"); + } catch (const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i)); + } + } + } + + // Test case 6, all workers are busy, help them by processing tasks from outside. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + + // Submit blocking task + for (int i = 0; i < NUM_WORKERS_DEFAULT; i++) { + threadPool.Submit([blocker_future]() { + blocker_future.wait(); + }); + } + + // Now submit tasks and check that none of them are executed. + int num_tasks = 20; + std::atomic counter = 0; + for (int i = 0; i < num_tasks; i++) { + threadPool.Submit([&counter]() { + counter.fetch_add(1); + }); + } + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20); + + // Now process manually + for (int i = 0; i < num_tasks; i++) { + threadPool.ProcessTask(); + } + BOOST_CHECK_EQUAL(counter.load(), num_tasks); + blocker.set_value(); + threadPool.Stop(); + } + + // Test case 7, recursive submission of tasks. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise signal; + threadPool.Submit([&]() { + threadPool.Submit([&]() { + signal.set_value(); + }); + }); + + signal.get_future().wait(); + threadPool.Stop(); + } + + // Test case 8, submit a task when all threads are busy and then stop the pool. + { + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + + // Per-thread ready promises to ensure all workers are actually blocked + std::vector> ready_promises(NUM_WORKERS_DEFAULT); + std::vector> ready_futures; + ready_futures.reserve(NUM_WORKERS_DEFAULT); + for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future()); + + // Fill all workers with blocking tasks + for (int i = 0; i < NUM_WORKERS_DEFAULT; i++) { + std::promise& ready = ready_promises[i]; + threadPool.Submit([blocker_future, &ready]() { + ready.set_value(); + blocker_future.wait(); + }); + } + + // Wait until all threads are actually blocked + WaitFor(ready_futures, /*context=*/"test8 blocking tasks enabled"); + + // Submit an extra task that should execute once a worker is free + std::future future = threadPool.Submit([]() { return true; }); + + // At this point, all workers are blocked, and the extra task is queued + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1); + + // Wait a short moment before unblocking the threads to mimic a concurrent shutdown + std::thread thread_unblocker([&blocker]() { + std::this_thread::sleep_for(std::chrono::milliseconds{300}); + blocker.set_value(); + }); + + // Stop the pool while the workers are still blocked + threadPool.Stop(); + + // Expect the submitted task to complete + BOOST_CHECK(future.get()); + thread_unblocker.join(); + + // Pool should be stopped and no workers remaining + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); + } + +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 00000000000..628efd48ea3 --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,191 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_THREADPOOL_H +#define BITCOIN_UTIL_THREADPOOL_H + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Fixed-size thread pool for running arbitrary tasks concurrently. + * + * The thread pool maintains a set of worker threads that consume and execute + * tasks submitted through Submit(). Once started, tasks can be queued and + * processed asynchronously until Stop() is called. + * + * ### Thread-safety and lifecycle + * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. + * Calling `Stop()` from a worker thread will deadlock, as it waits for all + * workers to join, including the current one. + * + * - `Submit()` can be called from any thread, including workers. It safely + * enqueues new work for execution as long as the pool has active workers. + * + * - `Stop()` prevents further task submission and wakes all worker threads. + * Workers finish processing all remaining queued tasks before exiting, + * guaranteeing that no caller waits forever on a pending future. + */ +class ThreadPool { + +private: + std::string m_name; + Mutex m_mutex; + std::queue> m_work_queue GUARDED_BY(m_mutex); + std::condition_variable m_cv; + // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable. + // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. + // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable + bool m_interrupt GUARDED_BY(m_mutex){false}; + std::vector m_workers GUARDED_BY(m_mutex); + + void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + WAIT_LOCK(m_mutex, wait_lock); + for (;;) { + std::function task; + { + // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. + if (!m_interrupt && m_work_queue.empty()) { + // Block until the pool is interrupted or a task is available. + m_cv.wait(wait_lock,[&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); }); + } + + // If stopped and no work left, exit worker + if (m_interrupt && m_work_queue.empty()) { + return; + } + + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + + { + // Execute the task without the lock + REVERSE_LOCK(wait_lock, m_mutex); + task(); + } + } + } + +public: + explicit ThreadPool(const std::string& name) : m_name(name) {} + + ~ThreadPool() + { + Stop(); // In case it hasn't been stopped. + } + + /** + * @brief Start worker threads. + * + * Creates and launches `num_workers` threads that begin executing tasks + * from the queue. If the pool is already started, throws. + * + * Must be called from a controller (non-worker) thread. + */ + void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); + m_interrupt = false; // Reset + + // Create workers + for (int i = 0; i < num_workers; i++) { + m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); }); + } + } + + /** + * @brief Stop all worker threads and wait for them to exit. + * + * Sets the interrupt flag, wakes all waiting workers, and joins them. + * Any remaining tasks in the queue will be processed before returning. + * + * Must be called from a controller (non-worker) thread. + */ + void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + // Notify workers and join them. + std::vector threads_to_join; + { + LOCK(m_mutex); + m_interrupt = true; + threads_to_join.swap(m_workers); + } + m_cv.notify_all(); + for (auto& worker : threads_to_join) worker.join(); + // m_interrupt is left true until next Start() + } + + /** + * @brief Submit a new task for asynchronous execution. + * + * Enqueues a callable to be executed by one of the worker threads. + * Returns a `std::future` that can be used to retrieve the task’s result. + */ + template EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + auto Submit(T task) -> std::future + { + using TaskType = std::packaged_task; + auto ptr_task = std::make_shared(std::move(task)); + std::future future = ptr_task->get_future(); + { + LOCK(m_mutex); + if (m_workers.empty() || m_interrupt) { + throw std::runtime_error("No active workers; cannot accept new tasks"); + } + m_work_queue.emplace([ptr_task]() { + (*ptr_task)(); + }); + } + m_cv.notify_one(); + return future; + } + + /** + * @brief Execute a single queued task synchronously. + * Removes one task from the queue and executes it on the calling thread. + */ + void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + std::function task; + { + LOCK(m_mutex); + if (m_work_queue.empty()) return; + + // Pop the task + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + task(); + } + + size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_work_queue.size()); + } + + size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_workers.size()); + } +}; + +#endif // BITCOIN_UTIL_THREADPOOL_H