Merge bitcoin/bitcoin#32345: ipc: Handle unclean shutdowns better

2581258ec2 ipc: Handle bitcoin-wallet disconnections (Ryan Ofsky)
2160995916 ipc: Add Ctrl-C handler for spawned subprocesses (Ryan Ofsky)
0c28068ceb doc: Improve IPC interface comments (Ryan Ofsky)
7f65aac78b ipc: Avoid waiting for clients to disconnect when shutting down (Ryan Ofsky)
6eb09fd614 test: Add unit test coverage for Init and Shutdown code (Ryan Ofsky)
9a9fb19536 ipc: Use EventLoopRef instead of addClient/removeClient (Ryan Ofsky)
e886c65b6b Squashed 'src/ipc/libmultiprocess/' changes from 27c7e8e5a581..b4120d34bad2 (Ryan Ofsky)

Pull request description:

  This PR fixes various problems when IPC connections are broken or hang which were reported in https://github.com/bitcoin-core/libmultiprocess/issues/123, https://github.com/bitcoin-core/libmultiprocess/issues/176, and https://github.com/bitcoin-core/libmultiprocess/pull/182. The different fixes are described in commit messages.

  ---

  The first two commits of this PR update the libmultiprocess subtree including the following PRs:

  - https://github.com/bitcoin-core/libmultiprocess/pull/181
  - https://github.com/bitcoin-core/libmultiprocess/pull/179
  - https://github.com/bitcoin-core/libmultiprocess/pull/160
  - https://github.com/bitcoin-core/libmultiprocess/pull/184
  - https://github.com/bitcoin-core/libmultiprocess/pull/187
  - https://github.com/bitcoin-core/libmultiprocess/pull/186
  - https://github.com/bitcoin-core/libmultiprocess/pull/192

  The subtree changes can be verified by running `test/lint/git-subtree-check.sh src/ipc/libmultiprocess` as described in [developer notes](https://github.com/bitcoin/bitcoin/blob/master/doc/developer-notes.md#subtrees) and [lint instructions](https://github.com/bitcoin/bitcoin/tree/master/test/lint#git-subtree-checksh).

  The remaining commits are:

  - [`9a9fb19536fa` ipc: Use EventLoopRef instead of addClient/removeClient](9a9fb19536)
  - [`6eb09fd6141f` test: Add unit test coverage for Init and Shutdown code](6eb09fd614)
  - [`7f65aac78b95` ipc: Avoid waiting for clients to disconnect when shutting down](7f65aac78b)
  - [`0c28068ceb7b` doc: Improve IPC interface comments](0c28068ceb)
  - [`216099591632` ipc: Add Ctrl-C handler for spawned subprocesses](2160995916)
  - [`2581258ec200` ipc: Handle bitcoin-wallet disconnections](2581258ec2)

  The new commits depend on the subtree update, and because the subtree update includes an incompatible API change, the "Use EventLoopRef" commit needs to be part of the same PR to avoid breaking the build. The other commits also make sense to merge at the same time because the bitcoin & libmultiprocess changes were written and tested together.

  ---

  This PR is part of the [process separation project](https://github.com/bitcoin/bitcoin/issues/28722).

ACKs for top commit:
  Sjors:
    re-utACK 2581258ec2
  josibake:
    code review ACK 2581258ec2
  pinheadmz:
    re-ACK 2581258ec2

Tree-SHA512: 0095aa22d507803e2a2d46eff51fb6caf965cc0c97ccfa615bd97805d5d51e66a5b4b040640deb92896438b1fb9f6879847124c9d0e120283287bfce37b8d748
This commit is contained in:
merge-script 2025-08-18 20:19:19 +01:00
commit f58de8749e
No known key found for this signature in database
GPG Key ID: 2EEB9F5CC09526C1
78 changed files with 1119 additions and 477 deletions

View File

@ -589,6 +589,14 @@ void ArgsManager::AddHiddenArgs(const std::vector<std::string>& names)
}
}
void ArgsManager::ClearArgs()
{
LOCK(cs_args);
m_settings = {};
m_available_args.clear();
m_network_only_args.clear();
}
void ArgsManager::CheckMultipleCLIArgs() const
{
LOCK(cs_args);

View File

@ -359,11 +359,7 @@ protected:
/**
* Clear available arguments
*/
void ClearArgs() {
LOCK(cs_args);
m_available_args.clear();
m_network_only_args.clear();
}
void ClearArgs();
/**
* Check CLI command args

View File

@ -33,6 +33,7 @@
#include <interfaces/ipc.h>
#include <interfaces/mining.h>
#include <interfaces/node.h>
#include <ipc/exception.h>
#include <kernel/caches.h>
#include <kernel/context.h>
#include <key.h>
@ -298,6 +299,14 @@ void Shutdown(NodeContext& node)
StopREST();
StopRPC();
StopHTTPServer();
for (auto& client : node.chain_clients) {
try {
client->stop();
} catch (const ipc::Exception& e) {
LogDebug(BCLog::IPC, "Chain client did not disconnect cleanly: %s", e.what());
client.reset();
}
}
StopMapPort();
// Because these depend on each-other, we make sure that neither can be
@ -370,8 +379,11 @@ void Shutdown(NodeContext& node)
}
}
}
for (const auto& client : node.chain_clients) {
client->stop();
// If any -ipcbind clients are still connected, disconnect them now so they
// do not block shutdown.
if (interfaces::Ipc* ipc = node.init->ipc()) {
ipc->disconnectIncoming();
}
#ifdef ENABLE_ZMQ

View File

@ -59,17 +59,20 @@ public:
//! true. If this is not a spawned child process, return false.
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. connectAddress returns an interface pointer if
//! the connection was established, returns null if address is empty ("") or
//! disabled ("0") or if a connection was refused but not required ("auto"),
//! and throws an exception if there was an unexpected error.
//! Connect to a socket address and return a pointer to its Init interface.
//! Returns a non-null pointer if the connection was established, returns
//! null if address is empty ("") or disabled ("0") or if a connection was
//! refused but not required ("auto"), and throws an exception if there was
//! an unexpected error.
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. Throws an exception if there was an error.
//! Listen on a socket address exposing this process's init interface to
//! clients. Throws an exception if there was an error.
virtual void listenAddress(std::string& address) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to remote interface that will run when the
//! interface is deleted.
template<typename Interface>

View File

@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
m_loop_ref.reset();
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
@ -68,9 +65,20 @@ public:
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
if (ready_fn) ready_fn();
mp::ServeStream<messages::Init>(*m_loop, fd, init);
m_parent_connection = &m_loop->m_incoming_connections.back();
m_loop->loop();
m_loop.reset();
}
void disconnectIncoming() override
{
if (!m_loop) return;
// Delete incoming connections, except the connection to a parent
// process (if there is one), since a parent process should be able to
// monitor and control this process, even during shutdown.
m_loop->sync([&] {
m_loop->m_incoming_connections.remove_if([this](mp::Connection& c) { return &c != m_parent_connection; });
});
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
@ -83,10 +91,7 @@ public:
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
m_loop_ref.emplace(*m_loop);
promise.set_value();
m_loop->loop();
m_loop.reset();
@ -95,7 +100,14 @@ public:
}
Context m_context;
std::thread m_loop_thread;
//! EventLoop object which manages I/O events for all connections.
std::optional<mp::EventLoop> m_loop;
//! Reference to the same EventLoop. Increments the loops refcount on
//! creation, decrements on destruction. The loop thread exits when the
//! refcount reaches 0. Other IPC objects also hold their own EventLoopRef.
std::optional<mp::EventLoopRef> m_loop_ref;
//! Connection to parent, if this is a child process spawned by a parent process.
mp::Connection* m_parent_connection{nullptr};
};
} // namespace

View File

@ -13,6 +13,7 @@
#include <tinyformat.h>
#include <util/fs.h>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
@ -26,6 +27,28 @@
namespace ipc {
namespace {
#ifndef WIN32
std::string g_ignore_ctrl_c;
void HandleCtrlC(int)
{
// (void)! needed to suppress -Wunused-result warning from GCC
(void)!write(STDOUT_FILENO, g_ignore_ctrl_c.data(), g_ignore_ctrl_c.size());
}
#endif
void IgnoreCtrlC(std::string message)
{
#ifndef WIN32
g_ignore_ctrl_c = std::move(message);
struct sigaction sa{};
sa.sa_handler = HandleCtrlC;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, nullptr);
#endif
}
class IpcImpl : public interfaces::Ipc
{
public:
@ -53,6 +76,7 @@ public:
if (!m_process->checkSpawned(argc, argv, fd)) {
return false;
}
IgnoreCtrlC(strprintf("[%s] SIGINT received — waiting for parent to shut down.\n", m_exe_name));
m_protocol->serve(fd, m_exe_name, m_init);
exit_status = EXIT_SUCCESS;
return true;
@ -86,6 +110,10 @@ public:
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
m_protocol->listen(fd, m_exe_name, m_init);
}
void disconnectIncoming() override
{
m_protocol->disconnectIncoming();
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
m_protocol->addCleanup(type, iface, std::move(cleanup));

View File

@ -1,41 +1,40 @@
Checks: '
-*,
bugprone-*,
-bugprone-easily-swappable-parameters,
-bugprone-exception-escape,
-bugprone-move-forwarding-reference,
-bugprone-narrowing-conversions,
-bugprone-reserved-identifier,
misc-*,
-misc-non-private-member-variables-in-classes,
-misc-no-recursion,
-misc-unconventional-assign-operator,
-misc-unused-parameters,
-misc-use-anonymous-namespace,
modernize-*,
-modernize-avoid-c-arrays,
-modernize-concat-nested-namespaces,
-modernize-deprecated-headers,
-modernize-use-nodiscard,
-modernize-use-trailing-return-type,
-modernize-use-using,
bugprone-argument-comment,
bugprone-move-forwarding-reference,
bugprone-string-constructor,
bugprone-use-after-move,
bugprone-lambda-function-name,
bugprone-unhandled-self-assignment,
misc-unused-using-decls,
misc-no-recursion,
modernize-deprecated-headers,
modernize-use-default-member-init,
modernize-use-emplace,
modernize-use-equals-default,
modernize-use-noexcept,
modernize-use-nullptr,
modernize-use-starts-ends-with,
performance-*,
-performance-avoid-endl,
-performance-enum-size,
-performance-inefficient-string-concatenation,
-performance-no-int-to-ptr,
-performance-noexcept-move-constructor,
readability-*,
-readability-braces-around-statements,
-readability-convert-member-functions-to-static,
-readability-else-after-return,
-readability-function-cognitive-complexity,
-readability-identifier-length,
-readability-implicit-bool-conversion,
-readability-inconsistent-declaration-parameter-name,
-readability-magic-numbers,
-readability-named-parameter,
-readability-uppercase-literal-suffix,
-readability-use-anyofallof,
-performance-unnecessary-value-param,
readability-const-return-type,
readability-redundant-declaration,
readability-redundant-string-init,
clang-analyzer-core.*,
-clang-analyzer-core.UndefinedBinaryOperatorResult,
clang-analyzer-optin.core.*,
'
HeaderFilterRegex: '.'
WarningsAsErrors: '*'
CheckOptions:
- key: modernize-use-override.IgnoreDestructors
value: true
HeaderFilterRegex: 'example/calculator.h|example/init.h|example/printer.h|include/mp/proxy-io.h|include/mp/proxy-types.h|include/mp/proxy.h|include/mp/util.h|test/mp/test/foo-types.h|test/mp/test/foo.h'
- key: modernize-deprecated-headers.CheckHeaderFile
value: false
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: false
- key: bugprone-unhandled-self-assignment.WarnOnlyIfThisHasSuspiciousField
value: false

View File

@ -0,0 +1,29 @@
name: CI
on:
push:
pull_request:
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
config: [default, llvm, gnu32, sanitize]
name: build • ${{ matrix.config }}
steps:
- uses: actions/checkout@v4
- name: Install Nix
uses: cachix/install-nix-action@v31 # 2025-05-27, from https://github.com/cachix/install-nix-action/tags
with:
nix_path: nixpkgs=channel:nixos-25.05 # latest release
- name: Run CI script
env:
CI_CONFIG: ci/configs/${{ matrix.config }}.bash
run: ci/scripts/run.sh

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -15,16 +15,35 @@ include("cmake/compat_find.cmake")
find_package(CapnProto REQUIRED)
find_package(Threads REQUIRED)
option(Libmultiprocess_ENABLE_CLANG_TIDY "Run clang-tidy with the compiler." OFF)
if(Libmultiprocess_ENABLE_CLANG_TIDY)
set(MPGEN_EXECUTABLE "" CACHE FILEPATH "If specified, should be full path to an external mpgen binary to use rather than the one built internally.")
option(MP_ENABLE_CLANG_TIDY "Run clang-tidy with the compiler." OFF)
if(MP_ENABLE_CLANG_TIDY)
find_program(CLANG_TIDY_EXECUTABLE NAMES clang-tidy)
if(NOT CLANG_TIDY_EXECUTABLE)
message(FATAL_ERROR "Libmultiprocess_ENABLE_CLANG_TIDY is ON but clang-tidy is not found.")
message(FATAL_ERROR "MP_ENABLE_CLANG_TIDY is ON but clang-tidy is not found.")
endif()
set(CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_EXECUTABLE}")
# Workaround for nix from https://gitlab.kitware.com/cmake/cmake/-/issues/20912#note_793338
# Nix injects header paths via $NIX_CFLAGS_COMPILE; CMake tags these as
# CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES and omits them from the compile
# database, so clang-tidy, which ignores $NIX_CFLAGS_COMPILE, can't find capnp
# headers. Setting them as standard passes them to clang-tidy.
set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES ${CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES})
endif()
set(MPGEN_EXECUTABLE "" CACHE FILEPATH "If specified, should be full path to an external mpgen binary to use rather than the one built internally.")
option(MP_ENABLE_IWYU "Run include-what-you-use with the compiler." OFF)
if(MP_ENABLE_IWYU)
find_program(IWYU_EXECUTABLE NAMES include-what-you-use iwyu)
if(NOT IWYU_EXECUTABLE)
message(FATAL_ERROR "MP_ENABLE_IWYU is ON but include-what-you-use was not found.")
endif()
set(CMAKE_CXX_INCLUDE_WHAT_YOU_USE "${IWYU_EXECUTABLE};-Xiwyu;--error")
if(DEFINED ENV{IWYU_MAPPING_FILE})
list(APPEND CMAKE_CXX_INCLUDE_WHAT_YOU_USE "-Xiwyu" "--mapping_file=$ENV{IWYU_MAPPING_FILE}")
endif()
endif()
include("cmake/compat_config.cmake")
include("cmake/pthread_checks.cmake")
@ -51,6 +70,7 @@ configure_file(include/mp/config.h.in "${CMAKE_CURRENT_BINARY_DIR}/include/mp/co
# Generated C++ Capn'Proto schema files
capnp_generate_cpp(MP_PROXY_SRCS MP_PROXY_HDRS include/mp/proxy.capnp)
set_source_files_properties("${MP_PROXY_SRCS}" PROPERTIES SKIP_LINTING TRUE) # Ignored before cmake 3.27
# util library
add_library(mputil OBJECT src/mp/util.cpp)

View File

@ -0,0 +1,25 @@
### CI quick-reference
All CI is just bash and nix.
* **Workflow**:
- `.github/workflows/ci.yml` lists the jobs (`default`, `llvm`, …).
* **Scripts**:
- `ci/scripts/run.sh` spins up the Nix shell then calls…
- `ci/scripts/ci.sh` …to configure, build, and test.
* **Configuration**:
- `ci/configs/*.sh` defines flags for each job.
- `shell.nix` defines build environment (compilers, tools, libraries).
* **Build directories**:
- `build-*/` separate build directories (like `build-default`, `build-llvm`) will be created for each job.
To run jobs locally:
```bash
CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/sanitize.bash ci/scripts/run.sh
```
By default CI jobs will reuse their build directories. `CI_CLEAN=1` can be specified to delete them before running instead.

View File

@ -0,0 +1,5 @@
CI_DESC="CI job using default libraries and tools, and running IWYU"
CI_DIR=build-default
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter"
CMAKE_ARGS=(-DMP_ENABLE_IWYU=ON)
BUILD_ARGS=(-k)

View File

@ -0,0 +1,9 @@
CI_DESC="CI job cross-compiling to 32-bit"
CI_DIR=build-gnu32
NIX_ARGS=(
--arg minimal true
--arg crossPkgs 'import <nixpkgs> { crossSystem = { config = "i686-unknown-linux-gnu"; }; }'
)
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter"
CMAKE_ARGS=(-G Ninja)
BUILD_ARGS=(-k 0)

View File

@ -0,0 +1,11 @@
CI_DESC="CI job using LLVM-based libraries and tools (clang, libc++, clang-tidy, iwyu) and testing Ninja"
CI_DIR=build-llvm
NIX_ARGS=(--arg enableLibcxx true)
export CXX=clang++
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter"
CMAKE_ARGS=(
-G Ninja
-DMP_ENABLE_CLANG_TIDY=ON
-DMP_ENABLE_IWYU=ON
)
BUILD_ARGS=(-k 0)

View File

@ -0,0 +1,7 @@
CI_DESC="CI job running ThreadSanitizer"
CI_DIR=build-sanitize
export CXX=clang++
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
CMAKE_ARGS=()
BUILD_ARGS=(-k -j4)
BUILD_TARGETS=(mptest)

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
#
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
export LC_ALL=C.UTF-8
set -o errexit -o nounset -o pipefail -o xtrace
[ "${CI_CONFIG+x}" ] && source "$CI_CONFIG"
: "${CI_DIR:=build}"
if ! [ -v BUILD_TARGETS ]; then
BUILD_TARGETS=(all tests mpexamples)
fi
[ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}"
cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
ctest --test-dir "$CI_DIR" --output-on-failure

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
#
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
export LC_ALL=C.UTF-8
set -o errexit -o nounset -o pipefail -o xtrace
[ "${CI_CONFIG+x}" ] && source "$CI_CONFIG"
nix-shell --pure --keep CI_CONFIG --keep CI_CLEAN "${NIX_ARGS[@]+"${NIX_ARGS[@]}"}" --run ci/scripts/ci.sh shell.nix

View File

@ -17,7 +17,7 @@ if ("Bin" IN_LIST ${CMAKE_FIND_PACKAGE_NAME}_FIND_COMPONENTS)
endif()
if ("Lib" IN_LIST ${CMAKE_FIND_PACKAGE_NAME}_FIND_COMPONENTS)
# Setting FOUND_LIBATOMIC is needed on debian & ubuntu systems to work around bug in
# Setting FOUND_LIBATOMIC is needed on Debian & Ubuntu systems to work around bug in
# their capnproto packages. See compat_find.cmake for a more complete explanation.
set(FOUND_LIBATOMIC TRUE)
include(CMakeFindDependencyMacro)

View File

@ -81,6 +81,8 @@ function(target_capnp_sources target include_prefix)
DEPENDS ${capnp_file}
VERBATIM
)
# Skip linting for capnp-generated files but keep it for mpgen-generated ones
set_source_files_properties(${capnp_file}.c++ PROPERTIES SKIP_LINTING TRUE) # Ignored before cmake 3.27
target_sources(${target} PRIVATE
${CMAKE_CURRENT_BINARY_DIR}/${capnp_file}.c++
${CMAKE_CURRENT_BINARY_DIR}/${capnp_file}.proxy-client.c++

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,18 +1,18 @@
# Copyright (c) 2024 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
# compat_find.cmake -- compatibility workarounds meant to be included before
# cmake find_package() calls are made
# Set FOUND_LIBATOMIC to work around bug in debian capnproto package that is
# debian-specific and does not happpen upstream. Debian includes a patch
# Set FOUND_LIBATOMIC to work around bug in Debian capnproto package that is
# Debian-specific and does not happen upstream. Debian includes a patch
# https://sources.debian.org/patches/capnproto/1.0.1-4/07_libatomic.patch/ which
# uses check_library_exists(atomic __atomic_load_8 ...) and it fails because the
# symbol name conflicts with a compiler instrinsic as described
# symbol name conflicts with a compiler intrinsic as described
# https://github.com/bitcoin-core/libmultiprocess/issues/68#issuecomment-1135150171.
# This could be fixed by improving the check_library_exists function as
# described in the github comment, or by changing the debian patch to check for
# described in the github comment, or by changing the Debian patch to check for
# the symbol a different way, but simplest thing to do is work around the
# problem by setting FOUND_LIBATOMIC. This problem has probably not
# been noticed upstream because it only affects CMake packages depending on

View File

@ -1,4 +1,4 @@
# Copyright (c) 2024 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,19 +1,23 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <calculator.h>
#include <charconv>
#include <fstream>
#include <init.capnp.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner)
#include <init.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep
#include <charconv>
#include <cstring>
#include <fstream>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
#include <kj/memory.h>
#include <memory>
#include <mp/proxy-io.h>
#include <printer.h>
#include <stdexcept>
#include <string>
#include <system_error>
#include <utility>
class CalculatorImpl : public Calculator

View File

@ -1,4 +1,4 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,13 +1,18 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <init.capnp.h>
#include <init.capnp.proxy.h>
#include <cstring> // IWYU pragma: keep
#include <filesystem>
#include <fstream>
#include <future>
#include <init.capnp.h>
#include <init.capnp.proxy.h>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
#include <memory>
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <stdexcept>

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -12,8 +12,7 @@ using Printer = import "printer.capnp";
$Proxy.include("calculator.h");
$Proxy.include("init.h");
$Proxy.include("printer.h");
$Proxy.includeTypes("calculator.capnp.proxy-types.h");
$Proxy.includeTypes("printer.capnp.proxy-types.h");
$Proxy.includeTypes("types.h");
interface InitInterface $Proxy.wrap("Init") {
construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap);

View File

@ -1,4 +1,4 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,18 +1,24 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <charconv>
#include <fstream>
#include <printer.h>
#include <init.capnp.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner)
#include <init.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep
#include <charconv>
#include <cstring>
#include <fstream>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
#include <kj/memory.h>
#include <memory>
#include <mp/proxy-io.h>
#include <printer.h>
#include <stdexcept>
#include <string>
#include <system_error>
class PrinterImpl : public Printer
{

View File

@ -1,4 +1,4 @@
// Copyright (c) 2021 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,14 +1,23 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef EXAMPLE_TYPES_H
#define EXAMPLE_TYPES_H
#include <calculator.capnp.proxy-types.h>
#include <printer.capnp.proxy-types.h>
// IWYU pragma: begin_exports
#include <mp/type-context.h>
#include <mp/type-decay.h>
#include <mp/type-interface.h>
#include <mp/type-string.h>
#include <mp/type-threadmap.h>
// IWYU pragma: end_exports
struct InitInterface; // IWYU pragma: export
struct CalculatorInterface; // IWYU pragma: export
struct PrinterInterface; // IWYU pragma: export
#endif // EXAMPLE_TYPES_H

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -13,12 +13,15 @@
#include <capnp/rpc-twoparty.h>
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <optional>
#include <kj/function.h>
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <thread>
namespace mp {
struct ThreadContext;
@ -63,16 +66,18 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
ProxyClient(const ProxyClient&) = delete;
~ProxyClient();
void setCleanup(const std::function<void()>& fn);
void setDisconnectCallback(const std::function<void()>& fn);
//! Cleanup function to run when the connection is closed. If the Connection
//! gets destroyed before this ProxyClient<Thread> object, this cleanup
//! callback lets it destroy this object and remove its entry in the
//! thread's request_threads or callback_threads map (after resetting
//! m_cleanup_it so the destructor does not try to access it). But if this
//! object gets destroyed before the Connection, there's no need to run the
//! cleanup function and the destructor will unregister it.
std::optional<CleanupIt> m_cleanup_it;
//! Reference to callback function that is run if there is a sudden
//! disconnect and the Connection object is destroyed before this
//! ProxyClient<Thread> object. The callback will destroy this object and
//! remove its entry from the thread's request_threads or callback_threads
//! map. It will also reset m_disconnect_cb so the destructor does not
//! access it. In the normal case where there is no sudden disconnect, the
//! destructor will unregister m_disconnect_cb so the callback is never run.
//! Since this variable is accessed from multiple threads, accesses should
//! be guarded with the associated Waiter::m_mutex.
std::optional<CleanupIt> m_disconnect_cb;
};
template <>
@ -129,6 +134,28 @@ std::string LongThreadName(const char* exe_name);
//! Event loop implementation.
//!
//! Cap'n Proto threading model is very simple: all I/O operations are
//! asynchronous and must be performed on a single thread. This includes:
//!
//! - Code starting an asynchronous operation (calling a function that returns a
//! promise object)
//! - Code notifying that an asynchronous operation is complete (code using a
//! fulfiller object)
//! - Code handling a completed operation (code chaining or waiting for a promise)
//!
//! All of this code needs to access shared state, and there is no mutex that
//! can be acquired to lock this state because Cap'n Proto
//! assumes it will only be accessed from one thread. So all this code needs to
//! actually run on one thread, and the EventLoop::loop() method is the entry point for
//! this thread. ProxyClient and ProxyServer objects that use other threads and
//! need to perform I/O operations post to this thread using EventLoop::post()
//! and EventLoop::sync() methods.
//!
//! Specifically, because ProxyClient methods can be called from arbitrary
//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
//! methods use the EventLoop thread to send requests, and ProxyServer methods
//! use the thread to return results.
//!
//! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
class EventLoop
{
@ -144,7 +171,7 @@ public:
//! Run function on event loop thread. Does not return until function completes.
//! Must be called while the loop() function is active.
void post(const std::function<void()>& fn);
void post(kj::Function<void()> fn);
//! Wrapper around EventLoop::post that takes advantage of the
//! fact that callable will not go out of scope to avoid requirement that it
@ -152,9 +179,13 @@ public:
template <typename Callable>
void sync(Callable&& callable)
{
post(std::ref(callable));
post(std::forward<Callable>(callable));
}
//! Register cleanup function to run on asynchronous worker thread without
//! blocking the event loop thread.
void addAsyncCleanup(std::function<void()> fn);
//! Start asynchronous worker thread if necessary. This is only done if
//! there are ProxyServerBase::m_impl objects that need to be destroyed
//! asynchronously, without tying up the event loop thread. This can happen
@ -166,13 +197,10 @@ public:
//! is important that ProxyServer::m_impl destructors do not run on the
//! eventloop thread because they may need it to do I/O if they perform
//! other IPC calls.
void startAsyncThread(std::unique_lock<std::mutex>& lock);
void startAsyncThread() MP_REQUIRES(m_mutex);
//! Add/remove remote client reference counts.
void addClient(std::unique_lock<std::mutex>& lock);
bool removeClient(std::unique_lock<std::mutex>& lock);
//! Check if loop should exit.
bool done(std::unique_lock<std::mutex>& lock) const;
bool done() const MP_REQUIRES(m_mutex);
Logger log()
{
@ -195,10 +223,10 @@ public:
std::thread m_async_thread;
//! Callback function to run on event loop thread during post() or sync() call.
const std::function<void()>* m_post_fn = nullptr;
kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
//! Callback functions to run on async thread.
CleanupList m_async_fns;
std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
//! Pipe read handle used to wake up the event loop thread.
int m_wait_fd = -1;
@ -208,11 +236,11 @@ public:
//! Number of clients holding references to ProxyServerBase objects that
//! reference this event loop.
int m_num_clients = 0;
int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
//! Mutex and condition variable used to post tasks to event loop and async
//! thread.
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;
//! Capnp IO context.
@ -263,20 +291,25 @@ struct Waiter
// in the case where a capnp response is sent and a brand new
// request is immediately received.
while (m_fn) {
auto fn = std::move(m_fn);
m_fn = nullptr;
lock.unlock();
fn();
lock.lock();
auto fn = std::move(*m_fn);
m_fn.reset();
Unlock(lock, fn);
}
const bool done = pred();
return done;
});
}
//! Mutex mainly used internally by waiter class, but also used externally
//! to guard access to related state. Specifically, since the thread_local
//! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
//! access to other parts of the struct to avoid needing to deal with more
//! mutexes than necessary. This mutex can be held at the same time as
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
//! EventLoop::m_mutex is locked second.
std::mutex m_mutex;
std::condition_variable m_cv;
std::function<void()> m_fn;
std::optional<kj::Function<void()>> m_fn;
};
//! Object holding network & rpc state associated with either an incoming server
@ -290,21 +323,13 @@ public:
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcClient(m_network))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
Connection(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream_,
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
//! Run cleanup functions. Must be called from the event loop thread. First
//! calls synchronous cleanup functions while blocked (to free capnp
@ -319,10 +344,6 @@ public:
CleanupIt addSyncCleanup(std::function<void()> fn);
void removeSyncCleanup(CleanupIt it);
//! Register asynchronous cleanup function to run on worker thread when
//! disconnect() is called.
void addAsyncCleanup(std::function<void()> fn);
//! Add disconnect handler.
template <typename F>
void onDisconnect(F&& f)
@ -333,12 +354,12 @@ public:
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::forward<F>(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
}
EventLoop& m_loop;
EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{m_loop};
LoggingErrorHandler m_error_handler{*m_loop};
kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@ -351,11 +372,10 @@ public:
//! ThreadMap.makeThread) used to service requests to clients.
::capnp::CapabilityServerSet<Thread> m_threads;
//! Cleanup functions to run if connection is broken unexpectedly.
//! Lists will be empty if all ProxyClient and ProxyServer objects are
//! destroyed cleanly before the connection is destroyed.
//! Cleanup functions to run if connection is broken unexpectedly. List
//! will be empty if all ProxyClient are destroyed cleanly before the
//! connection is destroyed.
CleanupList m_sync_cleanup_fns;
CleanupList m_async_cleanup_fns;
};
//! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
@ -381,21 +401,13 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
: m_client(std::move(client)), m_context(connection)
{
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
// Handler for the connection getting destroyed before this client object.
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
Lock lock{m_context.loop->m_mutex};
m_context.connection = nullptr;
});
@ -408,14 +420,10 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
// down while external code is still holding client references.
//
// The first case is handled here when m_context.connection is not null. The
// second case is handled by the cleanup function, which sets m_context.connection to
// null so nothing happens here.
m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
if (m_context.connection) {
// Remove cleanup callback so it doesn't run and try to access
// this object after it's already destroyed.
m_context.connection->removeSyncCleanup(cleanup_it);
// second case is handled by the disconnect_cb function, which sets
// m_context.connection to null so nothing happens here.
m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
{
// If the capnp interface defines a destroy method, call it to destroy
// the remote object, waiting for it to be deleted server side. If the
// capnp interface does not define a destroy method, this will just call
@ -423,16 +431,19 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
Sub::destroy(*this);
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.connection->m_loop.sync([&]() {
m_context.loop->sync([&]() {
// Remove disconnect callback on cleanup so it doesn't run and try
// to access this object after it's destroyed. This call needs to
// run inside loop->sync() on the event loop thread because
// otherwise, if there were an ill-timed disconnect, the
// onDisconnect handler could fire and delete the Connection object
// before the removeSyncCleanup call.
if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
if (destroy_connection) {
delete m_context.connection;
m_context.connection = nullptr;
@ -454,12 +465,20 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
: m_impl(std::move(impl)), m_context(&connection)
{
assert(m_impl);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
//! garbage collection code after there are no more references to this object.
//! This will typically happen when the corresponding ProxyClient object on the
//! other side of the connection is destroyed. It can also happen earlier if the
//! connection is broken or destroyed. In the latter case this destructor will
//! typically be called inside m_rpc_system.reset() call in the ~Connection
//! destructor while the Connection object still exists. However, because
//! ProxyServer objects are refcounted, and the Connection object could be
//! destroyed while asynchronous IPC calls are still in-flight, it's possible
//! for this destructor to be called after the Connection object no longer
//! exists, so it is NOT valid to dereference the m_context.connection pointer
//! from this function.
template <typename Interface, typename Impl>
ProxyServerBase<Interface, Impl>::~ProxyServerBase()
{
@ -483,14 +502,12 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
// connection is broken). Probably some refactoring of the destructor
// and invokeDestroy function is possible to make this cleaner and more
// consistent.
m_context.connection->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
impl.reset();
CleanupRun(fns);
});
}
assert(m_context.cleanup_fns.empty());
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
//! If the capnp interface defined a special "destroy" method, as described the

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -92,9 +92,9 @@ struct StructField
template <typename LocalType, typename EmplaceFn>
struct ReadDestEmplace
{
ReadDestEmplace(TypeList<LocalType>, EmplaceFn&& emplace_fn) : m_emplace_fn(emplace_fn) {}
ReadDestEmplace(TypeList<LocalType>, EmplaceFn emplace_fn) : m_emplace_fn(std::move(emplace_fn)) {}
//! Simple case. If ReadField impementation calls this construct() method
//! Simple case. If ReadField implementation calls this construct() method
//! with constructor arguments, just pass them on to the emplace function.
template <typename... Args>
decltype(auto) construct(Args&&... args)
@ -123,7 +123,7 @@ struct ReadDestEmplace
return temp;
}
}
EmplaceFn& m_emplace_fn;
EmplaceFn m_emplace_fn;
};
//! Helper function to create a ReadDestEmplace object that constructs a
@ -131,7 +131,7 @@ struct ReadDestEmplace
template <typename LocalType>
auto ReadDestTemp()
{
return ReadDestEmplace{TypeList<LocalType>(), [&](auto&&... args) -> decltype(auto) {
return ReadDestEmplace{TypeList<LocalType>(), [](auto&&... args) -> decltype(auto) {
return LocalType{std::forward<decltype(args)>(args)...};
}};
}
@ -191,7 +191,7 @@ void ThrowField(TypeList<std::exception>, InvokeContext& invoke_context, Input&&
}
template <typename... Values>
bool CustomHasValue(InvokeContext& invoke_context, Values&&... value)
bool CustomHasValue(InvokeContext& invoke_context, const Values&... value)
{
return true;
}
@ -199,7 +199,7 @@ bool CustomHasValue(InvokeContext& invoke_context, Values&&... value)
template <typename... LocalTypes, typename Context, typename... Values, typename Output>
void BuildField(TypeList<LocalTypes...>, Context& context, Output&& output, Values&&... values)
{
if (CustomHasValue(context, std::forward<Values>(values)...)) {
if (CustomHasValue(context, values...)) {
CustomBuildField(TypeList<LocalTypes...>(), Priority<3>(), context, std::forward<Values>(values)...,
std::forward<Output>(output));
}
@ -274,7 +274,7 @@ void MaybeReadField(std::false_type, Args&&...)
}
template <typename LocalType, typename Value, typename Output>
void MaybeSetWant(TypeList<LocalType*>, Priority<1>, Value&& value, Output&& output)
void MaybeSetWant(TypeList<LocalType*>, Priority<1>, const Value& value, Output&& output)
{
if (value) {
output.setWant();
@ -282,7 +282,7 @@ void MaybeSetWant(TypeList<LocalType*>, Priority<1>, Value&& value, Output&& out
}
template <typename LocalTypes, typename... Args>
void MaybeSetWant(LocalTypes, Priority<0>, Args&&...)
void MaybeSetWant(LocalTypes, Priority<0>, const Args&...)
{
}
@ -326,18 +326,18 @@ template <typename Derived, size_t N = 0>
struct IterateFieldsHelper
{
template <typename Arg1, typename Arg2, typename ParamList, typename NextFn, typename... NextFnArgs>
void handleChain(Arg1&& arg1, Arg2&& arg2, ParamList, NextFn&& next_fn, NextFnArgs&&... next_fn_args)
void handleChain(Arg1& arg1, Arg2& arg2, ParamList, NextFn&& next_fn, NextFnArgs&&... next_fn_args)
{
using S = Split<N, ParamList>;
handleChain(std::forward<Arg1>(arg1), std::forward<Arg2>(arg2), typename S::First());
next_fn.handleChain(std::forward<Arg1>(arg1), std::forward<Arg2>(arg2), typename S::Second(),
handleChain(arg1, arg2, typename S::First());
next_fn.handleChain(arg1, arg2, typename S::Second(),
std::forward<NextFnArgs>(next_fn_args)...);
}
template <typename Arg1, typename Arg2, typename ParamList>
void handleChain(Arg1&& arg1, Arg2&& arg2, ParamList)
void handleChain(Arg1& arg1, Arg2& arg2, ParamList)
{
static_cast<Derived*>(this)->handleField(std::forward<Arg1>(arg1), std::forward<Arg2>(arg2), ParamList());
static_cast<Derived*>(this)->handleField(arg1, arg2, ParamList());
}
private:
IterateFieldsHelper() = default;
@ -393,10 +393,10 @@ struct ClientParam
void handleField(ClientInvokeContext& invoke_context, Params& params, ParamList)
{
auto const fun = [&]<typename... Values>(Values&&... values) {
MaybeSetWant(
ParamList(), Priority<1>(), values..., Make<StructField, Accessor>(params));
MaybeBuildField(std::integral_constant<bool, Accessor::in>(), ParamList(), invoke_context,
Make<StructField, Accessor>(params), std::forward<Values>(values)...);
MaybeSetWant(
ParamList(), Priority<1>(), std::forward<Values>(values)..., Make<StructField, Accessor>(params));
};
// Note: The m_values tuple just consists of lvalue and rvalue
@ -568,7 +568,7 @@ template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
@ -577,7 +577,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
}
//! Entry point called by generated client code that looks like:
@ -592,12 +592,9 @@ void serverDestroy(Server& server)
template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{
if (!proxy_client.m_context.connection) {
throw std::logic_error("clientInvoke call made after disconnect");
}
if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
// If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop
@ -608,52 +605,68 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
ThreadContext& thread_context{g_thread_context};
std::optional<ClientInvokeContext> invoke_context; // Must outlive waiter->wait() call below
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
proxy_client.m_context.connection->m_loop.sync([&]() {
const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
thread_context.waiter->m_cv.notify_all();
return;
}
auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.loop->logPlain()
<< "{" << thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
proxy_client.m_context.loop->logPlain()
<< "{" << thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
IterateFields().handleChain(
invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
} catch (...) {
exception = std::current_exception();
}
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
invoke_context.thread_context.waiter->m_cv.notify_all();
thread_context.waiter->m_cv.notify_all();
},
[&](const ::kj::Exception& e) {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain()
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
invoke_context.thread_context.waiter->m_cv.notify_all();
thread_context.waiter->m_cv.notify_all();
}));
});
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
}
//! Invoke callable `fn()` that may return void. If it does return void, replace
@ -687,7 +700,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;
int req = ++server_reqs;
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
try {
@ -704,14 +717,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (const std::exception& e) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
server.m_context.loop->log() << "IPC server unhandled exception";
throw;
}
}

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -7,34 +7,31 @@
#include <mp/util.h>
#include <array>
#include <cassert>
#include <functional>
#include <list>
#include <memory>
#include <stddef.h>
#include <tuple>
#include <type_traits>
#include <utility>
#include <variant> // IWYU pragma: keep
namespace mp {
class Connection;
class EventLoop;
//! Mapping from capnp interface type to proxy client implementation (specializations are generated by
//! proxy-codegen.cpp).
template <typename Interface>
struct ProxyClient;
template <typename Interface> struct ProxyClient; // IWYU pragma: export
//! Mapping from capnp interface type to proxy server implementation (specializations are generated by
//! proxy-codegen.cpp).
template <typename Interface>
struct ProxyServer;
template <typename Interface> struct ProxyServer; // IWYU pragma: export
//! Mapping from capnp method params type to method traits (specializations are generated by proxy-codegen.cpp).
template <typename Params>
struct ProxyMethod;
template <typename Params> struct ProxyMethod; // IWYU pragma: export
//! Mapping from capnp struct type to struct traits (specializations are generated by proxy-codegen.cpp).
template <typename Struct>
struct ProxyStruct;
template <typename Struct> struct ProxyStruct; // IWYU pragma: export
//! Mapping from local c++ type to capnp type and traits (specializations are generated by proxy-codegen.cpp).
template <typename Type>
struct ProxyType;
template <typename Type> struct ProxyType; // IWYU pragma: export
using CleanupList = std::list<std::function<void()>>;
using CleanupIt = typename CleanupList::iterator;
@ -47,13 +44,34 @@ inline void CleanupRun(CleanupList& fns) {
}
}
//! Event loop smart pointer automatically managing m_num_clients.
//! If a lock pointer argument is passed, the specified lock will be used,
//! otherwise EventLoop::m_mutex will be locked when needed.
class EventLoopRef
{
public:
explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr);
EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; }
EventLoopRef(const EventLoopRef&) = delete;
EventLoopRef& operator=(const EventLoopRef&) = delete;
EventLoopRef& operator=(EventLoopRef&&) = delete;
~EventLoopRef() { reset(); }
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
EventLoop* operator->() const { assert(m_loop); return m_loop; }
void reset(bool relock=false);
EventLoop* m_loop{nullptr};
Lock* m_lock{nullptr};
};
//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
EventLoopRef loop;
CleanupList cleanup_fns;
ProxyContext(Connection* connection) : connection(connection) {}
ProxyContext(Connection* connection);
};
//! Base class for generated ProxyClient classes that implement a C++ interface
@ -67,6 +85,15 @@ public:
using Sub = ProxyClient<Interface>;
using Super = ProxyClientBase<Interface, Impl>;
//! Construct libmultiprocess client object wrapping Cap'n Proto client
//! object with a reference to the associated mp::Connection object.
//!
//! The destroy_connection option determines whether destroying this client
//! object closes the connection. It is set to true for the
//! ProxyClient<InitInterface> object returned by ConnectStream, to let IPC
//! clients close the connection by freeing the object. It is false for
//! other client objects so they can be destroyed without affecting the
//! connection.
ProxyClientBase(typename Interface::Client client, Connection* connection, bool destroy_connection);
~ProxyClientBase() noexcept;

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -64,13 +64,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
auto& server = server_context.proxy_server;
int req = server_context.req;
auto invoke = MakeAsyncCallable(
[fulfiller = kj::mv(future.fulfiller),
auto invoke = [fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
bool disconnected{false};
{
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
@ -102,7 +100,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER({
KJ_DEFER(if (erase_thread) {
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
// Call erase here with a Connection* argument instead
// of an iterator argument, because the `request_thread`
@ -113,54 +111,40 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// erases the thread from the map, and also because the
// ProxyServer<Thread> destructor calls
// request_threads.clear().
if (erase_thread) {
disconnected = !request_threads.erase(server.m_context.connection);
} else {
disconnected = !request_threads.count(server.m_context.connection);
}
request_threads.erase(server.m_context.connection);
});
fn.invoke(server_context, args...);
}
if (disconnected) {
// If disconnected is true, the Connection object was
// destroyed during the method call. Deal with this by
// returning without ever fulfilling the promise, which will
// cause the ProxyServer object to leak. This is not ideal,
// but fixing the leak will require nontrivial code changes
// because there is a lot of code assuming ProxyServer
// objects are destroyed before Connection objects.
return;
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.connection->m_loop.sync([&] {
server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_context.connection->m_loop.sync([&]() {
server.m_context.loop->sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
}
});
};
// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.connection->m_loop.log()
server.m_context.loop->log()
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
} else {
server.m_context.connection->m_loop.log()
server.m_context.loop->log()
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -24,7 +24,7 @@ template <typename Value, typename FnR, typename... FnParams, typename Output>
void CustomBuildField(TypeList<std::function<FnR(FnParams...)>>,
Priority<1>,
InvokeContext& invoke_context,
Value& value,
Value&& value,
Output&& output)
{
if (value) {

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -50,8 +50,14 @@ decltype(auto) CustomReadField(TypeList<LocalType>,
InvokeContext& invoke_context,
Input&& input,
ReadDest&& read_dest,
typename std::enable_if<std::is_enum<LocalType>::value>::type* enable = 0)
typename std::enable_if<std::is_enum<LocalType>::value>::type* enable = nullptr)
{
// Disable clang-tidy out-of-range enum value check which triggers when
// using an enum type that does not have a 0 value. The check correctly
// triggers when it detects that Cap'n Proto returns 0 when reading an
// integer field that is unset. But the warning is spurious because the
// corresponding BuildField call should never leave the field unset.
// NOLINTNEXTLINE(clang-analyzer-optin.core.EnumCastOutOfRange)
return read_dest.construct(static_cast<LocalType>(input.get()));
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -6,18 +6,17 @@
#define MP_UTIL_H
#include <capnp/schema.h>
#include <cassert>
#include <cstddef>
#include <cstring>
#include <functional>
#include <future>
#include <kj/common.h>
#include <kj/exception.h>
#include <kj/string-tree.h>
#include <memory>
#include <string.h>
#include <mutex>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>
#include <variant>
#include <vector>
namespace mp {
@ -130,6 +129,59 @@ const char* TypeName()
return short_name ? short_name + 1 : display_name;
}
//! Convenient wrapper around std::variant<T*, T>
template <typename T>
struct PtrOrValue {
std::variant<T*, T> data;
template <typename... Args>
PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant<T*, T>{std::in_place_type<T>, std::forward<Args>(args)...}) {}
T& operator*() { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() { return &**this; }
T& operator*() const { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() const { return &**this; }
};
// Annotated mutex and lock class (https://clang.llvm.org/docs/ThreadSafetyAnalysis.html)
#if defined(__clang__) && (!defined(SWIG))
#define MP_TSA(x) __attribute__((x))
#else
#define MP_TSA(x) // no-op
#endif
#define MP_CAPABILITY(x) MP_TSA(capability(x))
#define MP_SCOPED_CAPABILITY MP_TSA(scoped_lockable)
#define MP_REQUIRES(x) MP_TSA(requires_capability(x))
#define MP_ACQUIRE(...) MP_TSA(acquire_capability(__VA_ARGS__))
#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__))
#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x))
#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x))
#define MP_NO_TSA MP_TSA(no_thread_safety_analysis)
class MP_CAPABILITY("mutex") Mutex {
public:
void lock() MP_ACQUIRE() { m_mutex.lock(); }
void unlock() MP_RELEASE() { m_mutex.unlock(); }
std::mutex m_mutex;
};
class MP_SCOPED_CAPABILITY Lock {
public:
explicit Lock(Mutex& m) MP_ACQUIRE(m) : m_lock(m.m_mutex) {}
~Lock() MP_RELEASE() = default;
void unlock() MP_RELEASE() { m_lock.unlock(); }
void lock() MP_ACQUIRE() { m_lock.lock(); }
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
{
assert(m_lock.mutex() == &mutex.m_mutex);
assert(m_lock);
}
std::unique_lock<std::mutex> m_lock;
};
//! Analog to std::lock_guard that unlocks instead of locks.
template <typename Lock>
struct UnlockGuard
@ -146,46 +198,6 @@ void Unlock(Lock& lock, Callback&& callback)
callback();
}
//! Needed for libc++/macOS compatibility. Lets code work with shared_ptr nothrow declaration
//! https://github.com/capnproto/capnproto/issues/553#issuecomment-328554603
template <typename T>
struct DestructorCatcher
{
T value;
template <typename... Params>
DestructorCatcher(Params&&... params) : value(kj::fwd<Params>(params)...)
{
}
~DestructorCatcher() noexcept try {
} catch (const kj::Exception& e) { // NOLINT(bugprone-empty-catch)
}
};
//! Wrapper around callback function for compatibility with std::async.
//!
//! std::async requires callbacks to be copyable and requires noexcept
//! destructors, but this doesn't work well with kj types which are generally
//! move-only and not noexcept.
template <typename Callable>
struct AsyncCallable
{
AsyncCallable(Callable&& callable) : m_callable(std::make_shared<DestructorCatcher<Callable>>(std::move(callable)))
{
}
AsyncCallable(const AsyncCallable&) = default;
AsyncCallable(AsyncCallable&&) = default;
~AsyncCallable() noexcept = default;
ResultOf<Callable> operator()() const { return (m_callable->value)(); }
mutable std::shared_ptr<DestructorCatcher<Callable>> m_callable;
};
//! Construct AsyncCallable object.
template <typename Callable>
AsyncCallable<std::remove_reference_t<Callable>> MakeAsyncCallable(Callable&& callable)
{
return std::forward<Callable>(callable);
}
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::string ThreadName(const char* exe_name);

View File

@ -0,0 +1,28 @@
{ pkgs ? import <nixpkgs> {}
, crossPkgs ? import <nixpkgs> {}
, enableLibcxx ? false # Whether to use libc++ toolchain and libraries instead of libstdc++
, minimal ? false # Whether to create minimal shell without extra tools (faster when cross compiling)
}:
let
lib = pkgs.lib;
llvm = crossPkgs.llvmPackages_20;
capnproto = crossPkgs.capnproto.override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; });
clang = if enableLibcxx then llvm.libcxxClang else llvm.clang;
clang-tools = llvm.clang-tools.override { inherit enableLibcxx; };
in crossPkgs.mkShell {
buildInputs = [
capnproto
];
nativeBuildInputs = with pkgs; [
cmake
include-what-you-use
ninja
] ++ lib.optionals (!minimal) [
clang
clang-tools
];
# Tell IWYU where its libc++ mapping lives
IWYU_MAPPING_FILE = if enableLibcxx then "${llvm.libcxx.dev}/include/c++/v1/libcxx.imp" else null;
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -6,13 +6,15 @@
#include <mp/util.h>
#include <algorithm>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <cerrno>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <errno.h>
#include <fstream>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <kj/array.h>
#include <kj/common.h>
@ -26,6 +28,7 @@
#include <string>
#include <system_error>
#include <unistd.h>
#include <utility>
#include <vector>
#define PROXY_BIN "mpgen"
@ -76,7 +79,7 @@ static bool GetAnnotationInt32(const Reader& reader, uint64_t id, int32_t* resul
return false;
}
static void ForEachMethod(const capnp::InterfaceSchema& interface, const std::function<void(const capnp::InterfaceSchema& interface, const capnp::InterfaceSchema::Method)>& callback)
static void ForEachMethod(const capnp::InterfaceSchema& interface, const std::function<void(const capnp::InterfaceSchema& interface, const capnp::InterfaceSchema::Method)>& callback) // NOLINT(misc-no-recursion)
{
for (const auto super : interface.getSuperclasses()) {
ForEachMethod(super, callback);
@ -198,19 +201,45 @@ static void Generate(kj::StringPtr src_prefix,
std::ofstream cpp_server(output_path + ".proxy-server.c++");
cpp_server << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
cpp_server << "// IWYU pragma: no_include <kj/memory.h>\n";
cpp_server << "// IWYU pragma: no_include <memory>\n";
cpp_server << "// IWYU pragma: begin_keep\n";
cpp_server << "#include <" << include_path << ".proxy.h>\n";
cpp_server << "#include <" << include_path << ".proxy-types.h>\n";
cpp_server << "#include <" << PROXY_TYPES << ">\n\n";
cpp_server << "#include <capnp/generated-header-support.h>\n";
cpp_server << "#include <cstring>\n";
cpp_server << "#include <kj/async.h>\n";
cpp_server << "#include <kj/common.h>\n";
cpp_server << "#include <kj/exception.h>\n";
cpp_server << "#include <mp/proxy.h>\n";
cpp_server << "#include <mp/util.h>\n";
cpp_server << "#include <" << PROXY_TYPES << ">\n";
cpp_server << "// IWYU pragma: end_keep\n\n";
cpp_server << "namespace mp {\n";
std::ofstream cpp_client(output_path + ".proxy-client.c++");
cpp_client << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
cpp_client << "// IWYU pragma: no_include <kj/memory.h>\n";
cpp_client << "// IWYU pragma: no_include <memory>\n";
cpp_client << "// IWYU pragma: begin_keep\n";
cpp_client << "#include <" << include_path << ".h>\n";
cpp_client << "#include <" << include_path << ".proxy.h>\n";
cpp_client << "#include <" << include_path << ".proxy-types.h>\n";
cpp_client << "#include <" << PROXY_TYPES << ">\n\n";
cpp_client << "#include <capnp/generated-header-support.h>\n";
cpp_client << "#include <cstring>\n";
cpp_client << "#include <kj/common.h>\n";
cpp_client << "#include <mp/proxy.h>\n";
cpp_client << "#include <mp/util.h>\n";
cpp_client << "#include <" << PROXY_TYPES << ">\n";
cpp_client << "// IWYU pragma: end_keep\n\n";
cpp_client << "namespace mp {\n";
std::ofstream cpp_types(output_path + ".proxy-types.c++");
cpp_types << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
cpp_types << "#include <" << include_path << ".proxy-types.h>\n";
cpp_types << "// IWYU pragma: no_include \"mp/proxy.h\"\n";
cpp_types << "// IWYU pragma: no_include \"mp/proxy-io.h\"\n";
cpp_types << "#include <" << include_path << ".proxy.h>\n";
cpp_types << "#include <" << include_path << ".proxy-types.h> // IWYU pragma: keep\n";
cpp_types << "#include <" << PROXY_TYPES << ">\n\n";
cpp_types << "namespace mp {\n";
@ -226,10 +255,12 @@ static void Generate(kj::StringPtr src_prefix,
inl << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
inl << "#ifndef " << guard << "_PROXY_TYPES_H\n";
inl << "#define " << guard << "_PROXY_TYPES_H\n\n";
inl << "#include <" << include_path << ".proxy.h>\n";
inl << "// IWYU pragma: no_include \"mp/proxy.h\"\n";
inl << "#include <mp/proxy.h> // IWYU pragma: keep\n";
inl << "#include <" << include_path << ".proxy.h> // IWYU pragma: keep\n";
for (const auto annotation : file_schema.getProto().getAnnotations()) {
if (annotation.getId() == INCLUDE_TYPES_ANNOTATION_ID) {
inl << "#include <" << annotation.getValue().getText() << ">\n";
inl << "#include \"" << annotation.getValue().getText() << "\" // IWYU pragma: export\n";
}
}
inl << "namespace mp {\n";
@ -238,10 +269,10 @@ static void Generate(kj::StringPtr src_prefix,
h << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
h << "#ifndef " << guard << "_PROXY_H\n";
h << "#define " << guard << "_PROXY_H\n\n";
h << "#include <" << include_path << ".h>\n";
h << "#include <" << include_path << ".h> // IWYU pragma: keep\n";
for (const auto annotation : file_schema.getProto().getAnnotations()) {
if (annotation.getId() == INCLUDE_ANNOTATION_ID) {
h << "#include <" << annotation.getValue().getText() << ">\n";
h << "#include \"" << annotation.getValue().getText() << "\" // IWYU pragma: export\n";
}
}
h << "#include <" << PROXY_DECL << ">\n\n";

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -10,23 +10,23 @@
#include <mp/type-threadmap.h>
#include <mp/util.h>
#include <assert.h>
#include <atomic>
#include <capnp/blob.h>
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <condition_variable>
#include <functional>
#include <future>
#include <kj/async-io.h>
#include <kj/async.h>
#include <kj/async-io.h>
#include <kj/async-prelude.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/function.h>
#include <kj/memory.h>
#include <map>
#include <memory>
#include <mutex>
#include <stddef.h>
#include <optional>
#include <stdexcept>
#include <string>
#include <sys/socket.h>
@ -37,9 +37,6 @@
namespace mp {
template <typename Interface>
struct ProxyServer;
thread_local ThreadContext g_thread_context;
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
@ -48,12 +45,49 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
m_loop.log() << "Uncaught exception in daemonized task.";
}
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
{
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
loop_lock->assert_locked(m_loop->m_mutex);
m_loop->m_num_clients += 1;
}
// Due to the conditionals in this function, MP_NO_TSA is required to avoid
// error "error: mutex 'loop_lock' is not held on every path through here
// [-Wthread-safety-analysis]"
void EventLoopRef::reset(bool relock) MP_NO_TSA
{
if (auto* loop{m_loop}) {
m_loop = nullptr;
auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
loop_lock->assert_locked(loop->m_mutex);
assert(loop->m_num_clients > 0);
loop->m_num_clients -= 1;
if (loop->done()) {
loop->m_cv.notify_all();
int post_fd{loop->m_post_fd};
loop_lock->unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
// By default, do not try to relock `loop_lock` after writing,
// because the event loop could wake up and destroy itself and the
// mutex might no longer exist.
if (relock) loop_lock->lock();
}
}
}
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
Connection::~Connection()
{
// Shut down RPC system first, since this will garbage collect Server
// objects that were not freed before the connection was closed, some of
// which may call addAsyncCleanup and add more cleanup callbacks which can
// run below.
// Shut down RPC system first, since this will garbage collect any
// ProxyServer objects that were not freed before the connection was closed.
// Typically all ProxyServer objects associated with this connection will be
// freed before this call returns. However that will not be the case if
// there are asynchronous IPC calls over this connection still currently
// executing. In that case, Cap'n Proto will destroy the ProxyServer objects
// after the calls finish.
m_rpc_system.reset();
// ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
@ -98,23 +132,17 @@ Connection::~Connection()
// on clean and unclean shutdowns. In unclean shutdown case when the
// connection is broken, sync and async cleanup lists will filled with
// callbacks. In the clean shutdown case both lists will be empty.
Lock lock{m_loop->m_mutex};
while (!m_sync_cleanup_fns.empty()) {
m_sync_cleanup_fns.front()();
m_sync_cleanup_fns.pop_front();
CleanupList fn;
fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
Unlock(lock, fn.front());
}
while (!m_async_cleanup_fns.empty()) {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
m_async_cleanup_fns.pop_front();
}
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.startAsyncThread(lock);
m_loop.removeClient(lock);
}
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_loop->m_mutex);
// Add cleanup callbacks to the front of list, so sync cleanup functions run
// in LIFO order. This is a good approach because sync cleanup functions are
// added as client objects are created, and it is natural to clean up
@ -128,13 +156,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
void Connection::removeSyncCleanup(CleanupIt it)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_loop->m_mutex);
m_sync_cleanup_fns.erase(it);
}
void Connection::addAsyncCleanup(std::function<void()> fn)
void EventLoop::addAsyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_mutex);
// Add async cleanup callbacks to the back of the list. Unlike the sync
// cleanup list, this list order is more significant because it determines
// the order server objects are destroyed when there is a sudden disconnect,
@ -151,7 +179,8 @@ void Connection::addAsyncCleanup(std::function<void()> fn)
// process, otherwise shared pointer counts of the CWallet objects (which
// inherit from Chain::Notification) will not be 1 when WalletLoader
// destructor runs and it will wait forever for them to be released.
m_async_cleanup_fns.emplace(m_async_cleanup_fns.end(), std::move(fn));
m_async_fns->emplace_back(std::move(fn));
startAsyncThread();
}
EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
@ -170,9 +199,9 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
EventLoop::~EventLoop()
{
if (m_async_thread.joinable()) m_async_thread.join();
const std::lock_guard<std::mutex> lock(m_mutex);
const Lock lock(m_mutex);
KJ_ASSERT(m_post_fn == nullptr);
KJ_ASSERT(m_async_fns.empty());
KJ_ASSERT(!m_async_fns);
KJ_ASSERT(m_wait_fd == -1);
KJ_ASSERT(m_post_fd == -1);
KJ_ASSERT(m_num_clients == 0);
@ -188,6 +217,12 @@ void EventLoop::loop()
g_thread_context.loop_thread = true;
KJ_DEFER(g_thread_context.loop_thread = false);
{
const Lock lock(m_mutex);
assert(!m_async_fns);
m_async_fns.emplace();
}
kj::Own<kj::AsyncIoStream> wait_stream{
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
int post_fd{m_post_fd};
@ -195,14 +230,14 @@ void EventLoop::loop()
for (;;) {
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
std::unique_lock<std::mutex> lock(m_mutex);
Lock lock(m_mutex);
if (m_post_fn) {
Unlock(lock, *m_post_fn);
m_post_fn = nullptr;
m_cv.notify_all();
} else if (done(lock)) {
} else if (done()) {
// Intentionally do not break if m_post_fn was set, even if done()
// would return true, to ensure that the removeClient write(post_fd)
// would return true, to ensure that the EventLoopRef write(post_fd)
// call always succeeds and the loop does not exit between the time
// that the done condition is set and the write call is made.
break;
@ -213,76 +248,61 @@ void EventLoop::loop()
log() << "EventLoop::loop bye.";
wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd));
const std::unique_lock<std::mutex> lock(m_mutex);
const Lock lock(m_mutex);
m_wait_fd = -1;
m_post_fd = -1;
m_async_fns.reset();
m_cv.notify_all();
}
void EventLoop::post(const std::function<void()>& fn)
void EventLoop::post(kj::Function<void()> fn)
{
if (std::this_thread::get_id() == m_thread_id) {
fn();
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
addClient(lock);
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
Lock lock(m_mutex);
EventLoopRef ref(*this, &lock);
m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
m_post_fn = &fn;
int post_fd{m_post_fd};
Unlock(lock, [&] {
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1));
});
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
removeClient(lock);
m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
}
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
{
m_num_clients -= 1;
if (done(lock)) {
m_cv.notify_all();
int post_fd{m_post_fd};
lock.unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
return true;
}
return false;
}
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
void EventLoop::startAsyncThread()
{
assert (std::this_thread::get_id() == m_thread_id);
if (m_async_thread.joinable()) {
// Notify to wake up the async thread if it is already running.
m_cv.notify_all();
} else if (!m_async_fns.empty()) {
} else if (!m_async_fns->empty()) {
m_async_thread = std::thread([this] {
std::unique_lock<std::mutex> lock(m_mutex);
while (true) {
if (!m_async_fns.empty()) {
addClient(lock);
const std::function<void()> fn = std::move(m_async_fns.front());
m_async_fns.pop_front();
Lock lock(m_mutex);
while (m_async_fns) {
if (!m_async_fns->empty()) {
EventLoopRef ref{*this, &lock};
const std::function<void()> fn = std::move(m_async_fns->front());
m_async_fns->pop_front();
Unlock(lock, fn);
if (removeClient(lock)) break;
// Important to relock because of the wait() call below.
ref.reset(/*relock=*/true);
// Continue without waiting in case there are more async_fns
continue;
} else if (m_num_clients == 0) {
break;
}
m_cv.wait(lock);
m_cv.wait(lock.m_lock);
}
});
}
}
bool EventLoop::done(std::unique_lock<std::mutex>& lock) const
bool EventLoop::done() const
{
assert(m_num_clients >= 0);
assert(lock.owns_lock());
assert(lock.mutex() == &m_mutex);
return m_num_clients == 0 && m_async_fns.empty();
return m_num_clients == 0 && m_async_fns->empty();
}
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
@ -293,18 +313,18 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
thread = threads.emplace(
std::piecewise_construct, std::forward_as_tuple(connection),
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
thread->second.setCleanup([&threads, &mutex, thread] {
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
// Note: it is safe to use the `thread` iterator in this cleanup
// function, because the iterator would only be invalid if the map entry
// was removed, and if the map entry is removed the ProxyClient<Thread>
// destructor unregisters the cleanup.
// Connection is being destroyed before thread client is, so reset
// thread client m_cleanup_it member so thread client destructor does not
// try unregister this callback after connection is destroyed.
thread->second.m_cleanup_it.reset();
// thread client m_disconnect_cb member so thread client destructor does not
// try to unregister this callback after connection is destroyed.
// Remove connection pointer about to be destroyed from the map
const std::unique_lock<std::mutex> lock(mutex);
thread->second.m_disconnect_cb.reset();
threads.erase(thread);
});
return {thread, true};
@ -315,16 +335,16 @@ ProxyClient<Thread>::~ProxyClient()
// If thread is being destroyed before connection is destroyed, remove the
// cleanup callback that was registered to handle the connection being
// destroyed before the thread being destroyed.
if (m_cleanup_it) {
m_context.connection->removeSyncCleanup(*m_cleanup_it);
if (m_disconnect_cb) {
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
}
}
void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
{
assert(fn);
assert(!m_cleanup_it);
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
assert(!m_disconnect_cb);
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
}
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
@ -375,7 +395,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);

View File

@ -1,16 +1,16 @@
// Copyright (c) 2018-2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <mp/config.h>
#include <mp/util.h>
#include <errno.h>
#include <cerrno>
#include <cstdio>
#include <kj/common.h>
#include <kj/string-tree.h>
#include <pthread.h>
#include <sstream>
#include <stdio.h>
#include <string>
#include <sys/resource.h>
#include <sys/socket.h>

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -13,7 +13,7 @@ add_custom_target(mptests)
add_custom_target(mpcheck COMMAND ${CMAKE_CTEST_COMMAND} DEPENDS mptests)
# Only add more convenient tests and check targets if project is being built
# standlone, to prevent clashes with external projects.
# standalone, to prevent clashes with external projects.
if (MP_STANDALONE)
add_custom_target(tests DEPENDS mptests)
add_custom_target(check DEPENDS mpcheck)

View File

@ -1,13 +1,20 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef MP_TEST_FOO_TYPES_H
#define MP_TEST_FOO_TYPES_H
#include <mp/proxy.h>
#include <mp/proxy-types.h>
// IWYU pragma: begin_exports
#include <capnp/common.h>
#include <cstddef>
#include <mp/test/foo.capnp.h>
#include <mp/type-context.h>
#include <mp/type-decay.h>
#include <mp/type-function.h>
#include <mp/type-interface.h>
#include <mp/type-map.h>
#include <mp/type-message.h>
@ -17,9 +24,18 @@
#include <mp/type-struct.h>
#include <mp/type-threadmap.h>
#include <mp/type-vector.h>
#include <string>
#include <type_traits>
// IWYU pragma: end_exports
namespace mp {
namespace test {
namespace messages {
struct ExtendedCallback; // IWYU pragma: export
struct FooCallback; // IWYU pragma: export
struct FooFn; // IWYU pragma: export
struct FooInterface; // IWYU pragma: export
} // namespace messages
template <typename Output>
void CustomBuildField(TypeList<FooCustom>, Priority<1>, InvokeContext& invoke_context, const FooCustom& value, Output&& output)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 The Bitcoin Core developers
# Copyright (c) The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -28,6 +28,9 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
passMessage @13 (arg :FooMessage) -> (result :FooMessage);
passMutable @14 (arg :FooMutable) -> (arg :FooMutable);
passEnum @15 (arg :Int32) -> (result :Int32);
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
callFn @17 () -> ();
callFnAsync @18 (context :Proxy.Context) -> ();
}
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {
@ -39,6 +42,11 @@ interface ExtendedCallback extends(FooCallback) $Proxy.wrap("mp::test::ExtendedC
callExtended @0 (context :Proxy.Context, arg :Int32) -> (result :Int32);
}
interface FooFn $Proxy.wrap("ProxyCallback<std::function<int()>>") {
destroy @0 (context :Proxy.Context) -> ();
call @1 (context :Proxy.Context) -> (result :Int32);
}
struct FooStruct $Proxy.wrap("mp::test::FooStruct") {
name @0 :Text;
setint @1 :List(Int32);

View File

@ -1,10 +1,12 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef MP_TEST_FOO_H
#define MP_TEST_FOO_H
#include <cassert>
#include <functional>
#include <map>
#include <memory>
#include <string>
@ -75,7 +77,11 @@ public:
FooMessage passMessage(FooMessage foo) { foo.message += " call"; return foo; }
void passMutable(FooMutable& foo) { foo.message += " call"; }
FooEnum passEnum(FooEnum foo) { return foo; }
int passFn(std::function<int()> fn) { return fn(); }
std::shared_ptr<FooCallback> m_callback;
void callFn() { assert(m_fn); m_fn(); }
void callFnAsync() { assert(m_fn); m_fn(); }
std::function<void()> m_fn;
};
} // namespace test

View File

@ -1,54 +1,117 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <mp/proxy-io.h>
#include <mp/test/foo.capnp.h>
#include <mp/test/foo.capnp.proxy.h>
#include <mp/test/foo.h>
#include <capnp/capability.h>
#include <cstdio>
#include <future>
#include <capnp/rpc.h>
#include <cstring>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <kj/async.h>
#include <kj/async-io.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/memory.h>
#include <kj/test.h>
#include <memory>
#include <mp/proxy.h>
#include <mp/proxy-io.h>
#include <optional>
#include <set>
#include <stdexcept>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
namespace mp {
namespace test {
/**
* Test setup class creating a two way connection between a
* ProxyServer<FooInterface> object and a ProxyClient<FooInterface>.
*
* Provides client_disconnect and server_disconnect lambdas that can be used to
* trigger disconnects and test handling of broken and closed connections.
*
* Accepts a client_owns_connection option to test different ProxyClient
* destroy_connection values and control whether destroying the ProxyClient
* object destroys the client Connection object. Normally it makes sense for
* this to be true to simplify shutdown and avoid needing to call
* client_disconnect manually, but false allows testing more ProxyClient
* behavior and the "IPC client method called after disconnect" code path.
*/
class TestSetup
{
public:
std::function<void()> server_disconnect;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
ProxyServer<messages::FooInterface>* server{nullptr};
//! Thread variable should be after other struct members so the thread does
//! not start until the other members are initialized.
std::thread thread;
TestSetup(bool client_owns_connection = true)
: thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
if (raise) throw std::runtime_error(log);
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
auto server_connection =
std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) {
auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
std::make_shared<FooImplementation>(), connection);
server = server_proxy;
return capnp::Capability::Client(kj::mv(server_proxy));
});
server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
// Set handler to destroy the server when the client disconnects. This
// is ignored if server_disconnect() is called instead.
server_connection->onDisconnect([&] { server_connection.reset(); });
auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
client_connection.get(), /* destroy_connection= */ client_owns_connection);
if (client_owns_connection) {
client_connection.release();
} else {
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
}
client_promise.set_value(std::move(client_proxy));
loop.loop();
}}
{
client = client_promise.get_future().get();
}
~TestSetup()
{
// Test that client cleanup_fns are executed.
bool destroyed = false;
client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; });
client.reset();
KJ_EXPECT(destroyed);
thread.join();
}
};
KJ_TEST("Call FooInterface methods")
{
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>(
connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) {
auto foo_server = kj::heap<ProxyServer<messages::FooInterface>>(std::make_shared<FooImplementation>(), connection);
return capnp::Capability::Client(kj::mv(foo_server));
});
connection_server->onDisconnect([&] { connection_server.reset(); });
loop.loop();
});
auto foo = foo_promise.get_future().get();
KJ_EXPECT(foo->add(1, 2) == 3);
FooStruct in;
@ -128,13 +191,110 @@ KJ_TEST("Call FooInterface methods")
foo->passMutable(mut);
KJ_EXPECT(mut.message == "init build pass call return read");
disconnect_client();
thread.join();
KJ_EXPECT(foo->passFn([]{ return 10; }) == 10);
}
bool destroyed = false;
foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; });
foo.reset();
KJ_EXPECT(destroyed);
KJ_TEST("Call IPC method after client connection is closed")
{
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.client_disconnect();
bool disconnected{false};
try {
foo->add(1, 2);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
KJ_TEST("Calling IPC method after server connection is closed")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.server_disconnect();
bool disconnected{false};
try {
foo->add(1, 2);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
KJ_TEST("Calling IPC method and disconnecting during the call")
{
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
// Set m_fn to initiate client disconnect when server is in the middle of
// handling the callFn call to make sure this case is handled cleanly.
setup.server->m_impl->m_fn = setup.client_disconnect;
bool disconnected{false};
try {
foo->callFn();
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
{
// This test is similar to last test, except that instead of letting the IPC
// call return immediately after triggering a disconnect, make it disconnect
// & wait so server is forced to deal with having a disconnection and call
// in flight at the same time.
//
// Test uses callFnAsync() instead of callFn() to implement this. Both of
// these methods have the same implementation, but the callFnAsync() capnp
// method declaration takes an mp.Context argument so the method executes on
// an asynchronous thread instead of executing in the event loop thread, so
// it is able to block without deadlocking the event lock thread.
//
// This test adds important coverage because it causes the server Connection
// object to be destroyed before ProxyServer object, which is not a
// condition that usually happens because the m_rpc_system.reset() call in
// the ~Connection destructor usually would immediately free all remaining
// ProxyServer objects associated with the connection. Having an in-progress
// RPC call requires keeping the ProxyServer longer.
std::promise<void> signal;
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
foo->initThreadMap();
setup.server->m_impl->m_fn = [&] {
EventLoopRef loop{*setup.server->m_context.loop};
setup.client_disconnect();
signal.get_future().get();
};
bool disconnected{false};
try {
foo->callFnAsync();
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
// Now that the disconnect has been detected, set signal allowing the
// callFnAsync() IPC call to return. Since signalling may not wake up the
// thread right away, it is important for the signal variable to be declared
// *before* the TestSetup variable so is not destroyed while
// signal.get_future().get() is called.
signal.set_value();
}
} // namespace test

View File

@ -58,6 +58,9 @@ public:
//! clients and servers independently.
virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to interface that will run when the interface is
//! deleted.
virtual void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) = 0;

View File

@ -266,6 +266,12 @@ std::optional<CService> GetLocalAddrForPeer(CNode& node)
return std::nullopt;
}
void ClearLocal()
{
LOCK(g_maplocalhost_mutex);
return mapLocalHost.clear();
}
// learn a new local address
bool AddLocal(const CService& addr_, int nScore)
{

View File

@ -158,6 +158,7 @@ enum
/** Returns a local address that we should advertise to this peer. */
std::optional<CService> GetLocalAddrForPeer(CNode& node);
void ClearLocal();
bool AddLocal(const CService& addr, int nScore = LOCAL_NONE);
bool AddLocal(const CNetAddr& addr, int nScore = LOCAL_NONE);
void RemoveLocal(const CService& addr);

View File

@ -121,6 +121,13 @@ public:
m_reachable.clear();
}
void Reset() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
AssertLockNotHeld(m_mutex);
LOCK(m_mutex);
m_reachable = DefaultNets();
}
[[nodiscard]] bool Contains(Network net) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
AssertLockNotHeld(m_mutex);
@ -142,17 +149,21 @@ public:
}
private:
mutable Mutex m_mutex;
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){
NET_UNROUTABLE,
NET_IPV4,
NET_IPV6,
NET_ONION,
NET_I2P,
NET_CJDNS,
NET_INTERNAL
static std::unordered_set<Network> DefaultNets()
{
return {
NET_UNROUTABLE,
NET_IPV4,
NET_IPV6,
NET_ONION,
NET_I2P,
NET_CJDNS,
NET_INTERNAL
};
};
mutable Mutex m_mutex;
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){DefaultNets()};
};
extern ReachableNets g_reachable_nets;

View File

@ -317,6 +317,12 @@ void SetRPCWarmupStatus(const std::string& newStatus)
rpcWarmupStatus = newStatus;
}
void SetRPCWarmupStarting()
{
LOCK(g_rpc_warmup_mutex);
fRPCInWarmup = true;
}
void SetRPCWarmupFinished()
{
LOCK(g_rpc_warmup_mutex);

View File

@ -29,6 +29,7 @@ void RpcInterruptionPoint();
* immediately with RPC_IN_WARMUP.
*/
void SetRPCWarmupStatus(const std::string& newStatus);
void SetRPCWarmupStarting();
/* Mark warmup as done. RPC calls will be processed from now on. */
void SetRPCWarmupFinished();

View File

@ -64,6 +64,7 @@ add_executable(test_bitcoin
net_peer_eviction_tests.cpp
net_tests.cpp
netbase_tests.cpp
node_init_tests.cpp
node_warnings_tests.cpp
orphanage_tests.cpp
pcp_tests.cpp

View File

@ -55,7 +55,6 @@ void IpcPipeTest()
{
// Setup: create FooImplementation object and listen for FooInterface requests
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogInfo("LOG%i: %s", raise, log); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@ -63,9 +62,9 @@ void IpcPipeTest()
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
connection_client.get(), /* destroy_connection= */ true);
connection_client.release();
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
@ -106,8 +105,8 @@ void IpcPipeTest()
auto script2{foo->passScript(script1)};
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
// Test cleanup: disconnect pipe and join thread
disconnect_client();
// Test cleanup: disconnect and join thread
foo.reset();
thread.join();
}

View File

@ -0,0 +1,51 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <init.h>
#include <interfaces/init.h>
#include <rpc/server.h>
#include <boost/test/unit_test.hpp>
#include <test/util/setup_common.h>
using node::NodeContext;
BOOST_FIXTURE_TEST_SUITE(node_init_tests, BasicTestingSetup)
//! Custom implementation of interfaces::Init for testing.
class TestInit : public interfaces::Init
{
public:
TestInit(NodeContext& node) : m_node(node)
{
InitContext(m_node);
m_node.init = this;
}
std::unique_ptr<interfaces::Chain> makeChain() override { return interfaces::MakeChain(m_node); }
std::unique_ptr<interfaces::WalletLoader> makeWalletLoader(interfaces::Chain& chain) override
{
return MakeWalletLoader(chain, *Assert(m_node.args));
}
NodeContext& m_node;
};
BOOST_AUTO_TEST_CASE(init_test)
{
// Clear state set by BasicTestingSetup that AppInitMain assumes is unset.
LogInstance().DisconnectTestLogger();
m_node.args->SetConfigFilePath({});
// Prevent the test from trying to listen on ports 8332 and 8333.
m_node.args->ForceSetArg("-server", "0");
m_node.args->ForceSetArg("-listen", "0");
// Run through initialization and shutdown code.
TestInit init{m_node};
BOOST_CHECK(AppInitInterfaces(m_node));
BOOST_CHECK(AppInitMain(m_node));
Interrupt(m_node);
Shutdown(m_node);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -116,6 +116,14 @@ BasicTestingSetup::BasicTestingSetup(const ChainType chainType, TestOpts opts)
if (!EnableFuzzDeterminism()) {
SeedRandomForTest(SeedRand::FIXED_SEED);
}
// Reset globals
fDiscover = true;
fListen = true;
SetRPCWarmupStarting();
g_reachable_nets.Reset();
ClearLocal();
m_node.shutdown_signal = &m_interrupt;
m_node.shutdown_request = [this]{ return m_interrupt(); };
m_node.args = &gArgs;
@ -215,7 +223,10 @@ BasicTestingSetup::~BasicTestingSetup()
} else {
fs::remove_all(m_path_root);
}
// Clear all arguments except for -datadir, which GUI tests currently rely
// on to be set even after the testing setup is destroyed.
gArgs.ClearArgs();
gArgs.ForceSetArg("-datadir", fs::PathToString(m_path_root));
}
ChainTestingSetup::ChainTestingSetup(const ChainType chainType, TestOpts opts)