This commit is contained in:
Cory Fields 2025-10-08 16:13:37 +00:00 committed by GitHub
commit 3725870e1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 308 additions and 92 deletions

View File

@ -30,10 +30,36 @@
namespace ipc { namespace ipc {
namespace capnp { namespace capnp {
namespace { namespace {
void IpcLogFn(bool raise, std::string message)
BCLog::Level ConvertIPCLogLevel(mp::Log level)
{ {
LogDebug(BCLog::IPC, "%s\n", message); switch (level) {
if (raise) throw Exception(message); case mp::Log::Trace: return BCLog::Level::Trace;
case mp::Log::Debug: return BCLog::Level::Debug;
case mp::Log::Info: return BCLog::Level::Info;
case mp::Log::Warning: return BCLog::Level::Warning;
case mp::Log::Error: return BCLog::Level::Error;
case mp::Log::Raise: return BCLog::Level::Error;
} // no default case, so the compiler can warn about missing cases
// Be conservative and assume that if MP ever adds a new log level, it
// should only be shown at our most verbose level.
return BCLog::Level::Trace;
}
mp::Log GetRequestedIPCLogLevel()
{
if (LogAcceptCategory(BCLog::IPC, BCLog::Level::Trace)) return mp::Log::Trace;
if (LogAcceptCategory(BCLog::IPC, BCLog::Level::Debug)) return mp::Log::Debug;
// Info, Warning, and Error are logged unconditionally
return mp::Log::Info;
}
void IpcLogFn(mp::LogMessage message)
{
LogPrintLevel(BCLog::IPC, ConvertIPCLogLevel(message.level), "%s\n", message.message);
if (message.level == mp::Log::Raise) throw Exception(message.message);
} }
class CapnpProtocol : public Protocol class CapnpProtocol : public Protocol
@ -62,7 +88,11 @@ public:
{ {
assert(!m_loop); assert(!m_loop);
mp::g_thread_context.thread_name = mp::ThreadName(exe_name); mp::g_thread_context.thread_name = mp::ThreadName(exe_name);
m_loop.emplace(exe_name, &IpcLogFn, &m_context); mp::LogOptions opts = {
.log_fn = IpcLogFn,
.log_level = GetRequestedIPCLogLevel()
};
m_loop.emplace(exe_name, std::move(opts), &m_context);
if (ready_fn) ready_fn(); if (ready_fn) ready_fn();
mp::ServeStream<messages::Init>(*m_loop, fd, init); mp::ServeStream<messages::Init>(*m_loop, fd, init);
m_parent_connection = &m_loop->m_incoming_connections.back(); m_parent_connection = &m_loop->m_incoming_connections.back();
@ -90,7 +120,11 @@ public:
std::promise<void> promise; std::promise<void> promise;
m_loop_thread = std::thread([&] { m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop"); util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context); mp::LogOptions opts = {
.log_fn = IpcLogFn,
.log_level = GetRequestedIPCLogLevel()
};
m_loop.emplace(exe_name, std::move(opts), &m_context);
m_loop_ref.emplace(*m_loop); m_loop_ref.emplace(*m_loop);
promise.set_value(); promise.set_value();
m_loop->loop(); m_loop->loop();

View File

@ -1,5 +1,6 @@
CI_DESC="CI job running ThreadSanitizer" CI_DESC="CI job running ThreadSanitizer"
CI_DIR=build-sanitize CI_DIR=build-sanitize
NIX_ARGS=(--arg enableLibcxx true --argstr libcxxSanitizers "Thread" --argstr capnprotoSanitizers "thread")
export CXX=clang++ export CXX=clang++
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread" export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread"
CMAKE_ARGS=() CMAKE_ARGS=()

View File

@ -2,8 +2,8 @@
Given an interface description of an object with one or more methods, libmultiprocess generates: Given an interface description of an object with one or more methods, libmultiprocess generates:
* A C++ `ProxyClient` class with an implementation of each interface method that sends a request over a socket, waits for a response, and returns the result. * A C++ `ProxyClient` class template specialization with an implementation of each interface method that sends a request over a socket, waits for a response, and returns the result.
* A C++ `ProxyServer` class that listens for requests over a socket and calls a wrapped C++ object implementing the same interface to actually execute the requests. * A C++ `ProxyServer` class template specialization that listens for requests over a socket and calls a wrapped C++ object implementing the same interface to actually execute the requests.
The function call ⇆ request translation supports input and output arguments, standard types like `unique_ptr`, `vector`, `map`, and `optional`, and bidirectional calls between processes through interface pointer and `std::function` arguments. The function call ⇆ request translation supports input and output arguments, standard types like `unique_ptr`, `vector`, `map`, and `optional`, and bidirectional calls between processes through interface pointer and `std::function` arguments.
@ -15,7 +15,7 @@ Libmultiprocess acts as a pure wrapper or layer over the underlying protocol. Cl
### Internals ### Internals
The `ProxyClient` and `ProxyServer` generated classes are not directly exposed to the user, as described in [usage.md](usage.md). Instead, they wrap c++ interfaces and appear to the user as pointers to an interface. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively. The `ProxyClient` and `ProxyServer` generated classes are not directly exposed to the user, as described in [usage.md](usage.md). Instead, they wrap C++ interfaces and appear to the user as pointers to an interface. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively.
The `InitInterface` interface will typically have methods which return other interfaces, giving the connecting process the ability to call other functions in the serving process. Interfaces can also have methods accepting other interfaces as parameters, giving serving processes the ability to call back and invoke functions in connecting processes. Creating new interfaces does not create new connections, and typically many interface objects will share the same connection. The `InitInterface` interface will typically have methods which return other interfaces, giving the connecting process the ability to call other functions in the serving process. Interfaces can also have methods accepting other interfaces as parameters, giving serving processes the ability to call back and invoke functions in connecting processes. Creating new interfaces does not create new connections, and typically many interface objects will share the same connection.
@ -23,13 +23,13 @@ Both `ConnectStream` and `ServeStream` also require an instantiation of the `Eve
When a generated method on the `ProxyClient` is called, it calls `clientInvoke` with the capnp-translated types. `clientInvoke` creates a self-executing promise (`kj::TaskSet`) that drives the execution of the request and gives ownership of it to the `EventLoop`. `clientInvoke` blocks until a response is received, or until there is a call from the server that needs to run on the same client thread, using a `Waiter` object. When a generated method on the `ProxyClient` is called, it calls `clientInvoke` with the capnp-translated types. `clientInvoke` creates a self-executing promise (`kj::TaskSet`) that drives the execution of the request and gives ownership of it to the `EventLoop`. `clientInvoke` blocks until a response is received, or until there is a call from the server that needs to run on the same client thread, using a `Waiter` object.
On the server side, the `capnp::RpcSystem` receives the capnp request and invokes the corresponding c++ method through the corresponding `ProxyServer` and the heavily templated `serverInvoke` triggering a `ServerCall`. Its return values from the actual c++ methods are copied into capnp responses by `ServerRet` and exceptions are caught and copied by `ServerExcept`. The two are connected through `ServerField`. The main method driving execution of a request is `PassField`, which is invoked through `ServerField`. Instantiated interfaces, or capabilities in capnp speak, are tracked and owned by the server's `capnp::RpcSystem`. On the server side, the `capnp::RpcSystem` receives the capnp request and invokes the corresponding C++ method through the corresponding `ProxyServer` and the heavily templated `serverInvoke` triggering a `ServerCall`. The return values from the actual C++ methods are copied into capnp responses by `ServerRet` and exceptions are caught and copied by `ServerExcept`. The two are connected through `ServerField`. The main method driving execution of a request is `PassField`, which is invoked through `ServerField`. Instantiated interfaces, or capabilities in capnp speak, are tracked and owned by the server's `capnp::RpcSystem`.
## Interface descriptions ## Interface descriptions
As explained in the [usage](usage.md) document, interface descriptions need to be consumed both by the _libmultiprocess_ code generator, and by C++ code that calls and implements the interfaces. The C++ code only needs to know about C++ arguments and return types, while the code generator only needs to know about capnp arguments and return types, but both need to know class and method names, so the corresponding `.h` and `.capnp` source files contain some of the same information, and have to be kept in sync manually when methods or parameters change. Despite the redundancy, reconciling the interface definitions is designed to be _straightforward_ and _safe_. _Straightforward_ because there is no need to write manual serialization code or use awkward intermediate types like [`UniValue`](https://github.com/bitcoin/bitcoin/blob/master/src/univalue/include/univalue.h) instead of native types. _Safe_ because if there are any inconsistencies between API and data definitions (even minor ones like using a narrow int data type for a wider int API input), there are errors at build time instead of errors or bugs at runtime. As explained in the [usage](usage.md) document, interface descriptions need to be consumed both by the _libmultiprocess_ code generator, and by C++ code that calls and implements the interfaces. The C++ code only needs to know about C++ arguments and return types, while the code generator only needs to know about capnp arguments and return types, but both need to know class and method names, so the corresponding `.h` and `.capnp` source files contain some of the same information, and have to be kept in sync manually when methods or parameters change. Despite the redundancy, reconciling the interface definitions is designed to be _straightforward_ and _safe_. _Straightforward_ because there is no need to write manual serialization code or use awkward intermediate types like [`UniValue`](https://github.com/bitcoin/bitcoin/blob/master/src/univalue/include/univalue.h) instead of native types. _Safe_ because if there are any inconsistencies between API and data definitions (even minor ones like using a narrow int data type for a wider int API input), there are errors at build time instead of errors or bugs at runtime.
In the future, it would be possible to combine API and data definitions together using [C++ attributes](https://en.cppreference.com/w/cpp/language/attributes). To do this we would add attributes to the API definition files, and then generate the data definitions from the API definitions and attributes. I didn't take this approach mostly because it would be extra work, but also because until c++ standardizes reflection, this would require either hooking into compiler APIs like https://github.com/RosettaCommons/binder, or parsing c++ code manually like http://www.swig.org/. In the future, it would be possible to combine API and data definitions together using [C++ attributes](https://en.cppreference.com/w/cpp/language/attributes). To do this we would add attributes to the API definition files, and then generate the data definitions from the API definitions and attributes. I didn't take this approach mostly because it would be extra work, but also because until C++ standardizes reflection, this would require either hooking into compiler APIs like https://github.com/RosettaCommons/binder, or parsing C++ code manually like http://www.swig.org/.
## What is `kj`? ## What is `kj`?
@ -39,6 +39,6 @@ basis in this library to construct the event-loop necessary to service IPC reque
## Future directions ## Future directions
_libmultiprocess_ uses the [Cap'n Proto](https://capnproto.org) interface description language and protocol, but it could be extended or changed to use a different IDL/protocol like [gRPC](https://grpc.io). The nice thing about _Cap'n Proto_ compared to _gRPC_ and most other lower level protocols is that it allows interface pointers (_Services_ in gRPC parlance) to be passed as method arguments and return values, so object references and bidirectional requests work out of the box. Supporting a lower-level protocol would require writing adding maps and tracking code to proxy objects. _libmultiprocess_ uses the [Cap'n Proto](https://capnproto.org) interface description language and protocol, but it could be extended or changed to use a different IDL/protocol like [gRPC](https://grpc.io). The nice thing about _Cap'n Proto_ compared to _gRPC_ and most other lower level protocols is that it allows interface pointers (_Services_ in gRPC parlance) to be passed as method arguments and return values, so object references and bidirectional requests work out of the box. Supporting a lower-level protocol would require adding maps and tracking code to proxy objects.
_libmultiprocess_ is currently compatible with sandboxing but could add platform-specific sandboxing support or integration with a sandboxing library like [SAPI](https://github.com/google/sandboxed-api). _libmultiprocess_ is currently compatible with sandboxing but could add platform-specific sandboxing support or integration with a sandboxing library like [SAPI](https://github.com/google/sandboxed-api).

View File

@ -4,9 +4,9 @@
_libmultiprocess_ is a library and code generator that allows calling C++ class interfaces across different processes. For an interface to be available from other processes, it needs two definitions: _libmultiprocess_ is a library and code generator that allows calling C++ class interfaces across different processes. For an interface to be available from other processes, it needs two definitions:
- An **API definition** declaring how the interface is called. Included examples: [calculator.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/calculator.h), [printer.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/printer.h), [init.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/init.h). Bitcoin examples: [node.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/node.h), [wallet.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/wallet.h), [echo.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/echo.h), [init.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/init.h). - An **API definition** declaring how the interface is called. Included examples: [calculator.h](../example/calculator.h), [printer.h](../example/printer.h), [init.h](../example/init.h). Bitcoin examples: [node.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/node.h), [wallet.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/wallet.h), [echo.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/echo.h), [init.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/init.h).
- A **data definition** declaring how interface calls get sent across the wire. Included examples: [calculator.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/calculator.capnp), [printer.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/printer.capnp), [init.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/init.capnp). Bitcoin examples: [node.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/node.capnp), [wallet.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/wallet.capnp), [echo.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/echo.capnp), [init.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/init.capnp). - A **data definition** declaring how interface calls get sent across the wire. Included examples: [calculator.capnp](../example/calculator.capnp), [printer.capnp](../example/printer.capnp), [init.capnp](../example/init.capnp). Bitcoin examples: [node.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/node.capnp), [wallet.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/wallet.capnp), [echo.capnp](https://github.com/bitcoin/bitcoin/blob/master/src/ipc/capnp/echo.capnp), [init.capnp](https://github.com/bitcoin/bitcoin/blob/master/src/ipc/capnp/init.capnp).
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets. The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.

View File

@ -9,6 +9,7 @@
#include <charconv> #include <charconv>
#include <cstring> #include <cstring>
#include <fstream> #include <fstream>
#include <functional>
#include <iostream> #include <iostream>
#include <kj/async.h> #include <kj/async.h>
#include <kj/common.h> #include <kj/common.h>
@ -37,6 +38,7 @@ public:
} }
}; };
// Exercises deprecated log callback signature
static void LogPrint(bool raise, const std::string& message) static void LogPrint(bool raise, const std::string& message)
{ {
if (raise) throw std::runtime_error(message); if (raise) throw std::runtime_error(message);

View File

@ -35,10 +35,10 @@ static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const s
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid); return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid);
} }
static void LogPrint(bool raise, const std::string& message) static void LogPrint(mp::LogMessage log_data)
{ {
if (raise) throw std::runtime_error(message); if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl; std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
} }
int main(int argc, char** argv) int main(int argc, char** argv)

View File

@ -32,10 +32,10 @@ public:
std::unique_ptr<Printer> makePrinter() override { return std::make_unique<PrinterImpl>(); } std::unique_ptr<Printer> makePrinter() override { return std::make_unique<PrinterImpl>(); }
}; };
static void LogPrint(bool raise, const std::string& message) static void LogPrint(mp::LogMessage log_data)
{ {
if (raise) throw std::runtime_error(message); if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl; std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
} }
int main(int argc, char** argv) int main(int argc, char** argv)

View File

@ -98,36 +98,29 @@ public:
EventLoop& m_loop; EventLoop& m_loop;
}; };
using LogFn = std::function<void(bool raise, std::string message)>; //! Log flags. Update stringify function if changed!
enum class Log {
class Logger Trace = 0,
{ Debug,
public: Info,
Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {} Warning,
Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {} Error,
~Logger() noexcept(false) Raise,
{
if (m_fn) m_fn(m_raise, m_buffer.str());
}
template <typename T>
friend Logger& operator<<(Logger& logger, T&& value)
{
if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
return logger;
}
template <typename T>
friend Logger& operator<<(Logger&& logger, T&& value)
{
return logger << std::forward<T>(value);
}
bool m_raise;
LogFn& m_fn;
std::ostringstream m_buffer;
}; };
kj::StringPtr KJ_STRINGIFY(Log flags);
struct LogMessage {
//! Message to be logged
std::string message;
//! The severity level of this message
Log level;
};
using LogFn = std::function<void(LogMessage)>;
struct LogOptions { struct LogOptions {
//! External logging callback. //! External logging callback.
@ -136,8 +129,60 @@ struct LogOptions {
//! Maximum number of characters to use when representing //! Maximum number of characters to use when representing
//! request and response structs as strings. //! request and response structs as strings.
size_t max_chars{200}; size_t max_chars{200};
//! Messages with a severity level less than log_level will not be
//! reported.
Log log_level{Log::Trace};
}; };
class Logger
{
public:
Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
Logger(Logger&&) = delete;
Logger& operator=(Logger&&) = delete;
Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;
~Logger() noexcept(false)
{
if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
}
template <typename T>
friend Logger& operator<<(Logger& logger, T&& value)
{
if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
return logger;
}
template <typename T>
friend Logger& operator<<(Logger&& logger, T&& value)
{
return logger << std::forward<T>(value);
}
explicit operator bool() const
{
return enabled();
}
private:
bool enabled() const
{
return m_options.log_fn && m_log_level >= m_options.log_level;
}
const LogOptions& m_options;
Log m_log_level;
std::ostringstream m_buffer;
};
#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
std::string LongThreadName(const char* exe_name); std::string LongThreadName(const char* exe_name);
//! Event loop implementation. //! Event loop implementation.
@ -168,8 +213,19 @@ std::string LongThreadName(const char* exe_name);
class EventLoop class EventLoop
{ {
public: public:
//! Construct event loop object. //! Construct event loop object with default logging options.
EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr); EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
: EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
//! Construct event loop object with specified logging options.
EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
//! Backwards-compatible constructor for previous (deprecated) logging callback signature
EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
: EventLoop(exe_name,
LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
context){}
~EventLoop(); ~EventLoop();
//! Run event loop. Does not return until shutdown. This should only be //! Run event loop. Does not return until shutdown. This should only be
@ -210,15 +266,6 @@ public:
//! Check if loop should exit. //! Check if loop should exit.
bool done() const MP_REQUIRES(m_mutex); bool done() const MP_REQUIRES(m_mutex);
Logger log()
{
Logger logger(false, m_log_opts.log_fn);
logger << "{" << LongThreadName(m_exe_name) << "} ";
return logger;
}
Logger logPlain() { return {false, m_log_opts.log_fn}; }
Logger raise() { return {true, m_log_opts.log_fn}; }
//! Process name included in thread names so combined debug output from //! Process name included in thread names so combined debug output from
//! multiple processes is easier to understand. //! multiple processes is easier to understand.
const char* m_exe_name; const char* m_exe_name;
@ -281,12 +328,13 @@ struct Waiter
Waiter() = default; Waiter() = default;
template <typename Fn> template <typename Fn>
void post(Fn&& fn) bool post(Fn&& fn)
{ {
const Lock lock(m_mutex); const Lock lock(m_mutex);
assert(!m_fn); if (m_fn) return false;
m_fn = std::forward<Fn>(fn); m_fn = std::forward<Fn>(fn);
m_cv.notify_all(); m_cv.notify_all();
return true;
} }
template <class Predicate> template <class Predicate>
@ -642,7 +690,7 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>(); init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get(); Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] { connection->onDisconnect([&loop, connection_ptr] {
loop.log() << "IPC client: unexpected network disconnect."; MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
delete connection_ptr; delete connection_ptr;
}); });
}); });
@ -665,7 +713,7 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
}); });
auto it = loop.m_incoming_connections.begin(); auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] { it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected."; MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it); loop.m_incoming_connections.erase(it);
}); });
} }

View File

@ -568,7 +568,7 @@ template <typename Client>
void clientDestroy(Client& client) void clientDestroy(Client& client)
{ {
if (client.m_context.connection) { if (client.m_context.connection) {
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name(); MP_LOG(*client.m_context.loop, Log::Info) << "IPC client destroy " << typeid(client).name();
} else { } else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
} }
@ -577,7 +577,7 @@ void clientDestroy(Client& client)
template <typename Server> template <typename Server>
void serverDestroy(Server& server) void serverDestroy(Server& server)
{ {
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name(); MP_LOG(*server.m_context.loop, Log::Info) << "IPC server destroy " << typeid(server).name();
} }
//! Entry point called by generated client code that looks like: //! Entry point called by generated client code that looks like:
@ -605,7 +605,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread. // declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread); assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>(); g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.loop->logPlain() MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << g_thread_context.thread_name << "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter"; << "} IPC client first request from current thread, constructing waiter";
} }
@ -629,15 +629,19 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields; using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
invoke_context.emplace(*proxy_client.m_context.connection, thread_context); invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.loop->logPlain() MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client send " << "{" << thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); << TypeName<typename Request::Params>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "send data: " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
proxy_client.m_context.loop->m_task_set->add(request.send().then( proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) { [&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.loop->logPlain() MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client recv " << "{" << thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); << TypeName<typename Request::Results>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "recv data: " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
try { try {
IterateFields().handleChain( IterateFields().handleChain(
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...); *invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
@ -653,7 +657,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
disconnected = "IPC client method call interrupted by disconnect."; disconnected = "IPC client method call interrupted by disconnect.";
} else { } else {
kj_exception = kj::str("kj::Exception: ", e).cStr(); kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain() MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception; << "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
} }
const Lock lock(thread_context.waiter->m_mutex); const Lock lock(thread_context.waiter->m_mutex);
@ -665,8 +669,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
Lock lock(thread_context.waiter->m_mutex); Lock lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; }); thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception); if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; if (!kj_exception.empty()) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected; if (disconnected) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << disconnected;
} }
//! Invoke callable `fn()` that may return void. If it does return void, replace //! Invoke callable `fn()` that may return void. If it does return void, replace
@ -700,8 +704,10 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds; using Results = typename decltype(call_context.getResults())::Builds;
int req = ++server_reqs; int req = ++server_reqs;
server.m_context.loop->log() << "IPC server recv request #" << req << " " MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars); << TypeName<typename Params::Reads>();
MP_LOG(*server.m_context.loop, Log::Trace) << "request data: "
<< LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars);
try { try {
using ServerContext = ServerInvokeContext<Server, CallContext>; using ServerContext = ServerInvokeContext<Server, CallContext>;
@ -717,14 +723,15 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); }) [&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) { .then([&server, req](CallContext call_context) {
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>() MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName<Results>();
<< " " << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars); MP_LOG(*server.m_context.loop, Log::Trace) << "response data: "
<< LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars);
}); });
} catch (const std::exception& e) { } catch (const std::exception& e) {
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what(); MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception: " << e.what();
throw; throw;
} catch (...) { } catch (...) {
server.m_context.loop->log() << "IPC server unhandled exception"; MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception";
throw; throw;
} }
} }

View File

@ -150,11 +150,16 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// thread. // thread.
KJ_IF_MAYBE (thread_server, perhaps) { KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server); const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.loop->log() MP_LOG(*server.m_context.loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke)); if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
} else { } else {
server.m_context.loop->log() MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request"; << "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle"); throw std::runtime_error("invalid thread handle");
} }

View File

@ -3,12 +3,19 @@
, enableLibcxx ? false # Whether to use libc++ toolchain and libraries instead of libstdc++ , enableLibcxx ? false # Whether to use libc++ toolchain and libraries instead of libstdc++
, minimal ? false # Whether to create minimal shell without extra tools (faster when cross compiling) , minimal ? false # Whether to create minimal shell without extra tools (faster when cross compiling)
, capnprotoVersion ? null , capnprotoVersion ? null
, capnprotoSanitizers ? null # Optional sanitizers to build cap'n proto with
, cmakeVersion ? null , cmakeVersion ? null
, libcxxSanitizers ? null # Optional LLVM_USE_SANITIZER value to use for libc++, see https://llvm.org/docs/CMake.html
}: }:
let let
lib = pkgs.lib; lib = pkgs.lib;
llvm = crossPkgs.llvmPackages_20; llvmBase = crossPkgs.llvmPackages_21;
llvm = llvmBase // lib.optionalAttrs (libcxxSanitizers != null) {
libcxx = llvmBase.libcxx.override {
devExtraCmakeFlags = [ "-DLLVM_USE_SANITIZER=${libcxxSanitizers}" ];
};
};
capnprotoHashes = { capnprotoHashes = {
"0.7.0" = "sha256-Y/7dUOQPDHjniuKNRw3j8dG1NI9f/aRWpf8V0WzV9k8="; "0.7.0" = "sha256-Y/7dUOQPDHjniuKNRw3j8dG1NI9f/aRWpf8V0WzV9k8=";
"0.7.1" = "sha256-3cBpVmpvCXyqPUXDp12vCFCk32ZXWpkdOliNH37UwWE="; "0.7.1" = "sha256-3cBpVmpvCXyqPUXDp12vCFCk32ZXWpkdOliNH37UwWE=";
@ -35,7 +42,17 @@ let
} // (lib.optionalAttrs (lib.versionOlder capnprotoVersion "0.10") { } // (lib.optionalAttrs (lib.versionOlder capnprotoVersion "0.10") {
env = { }; # Drop -std=c++20 flag forced by nixpkgs env = { }; # Drop -std=c++20 flag forced by nixpkgs
})); }));
capnproto = capnprotoBase.override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; }); capnproto = (capnprotoBase.overrideAttrs (old: lib.optionalAttrs (capnprotoSanitizers != null) {
env = (old.env or { }) // {
CXXFLAGS =
lib.concatStringsSep " " [
(old.env.CXXFLAGS or "")
"-fsanitize=${capnprotoSanitizers}"
"-fno-omit-frame-pointer"
"-g"
];
};
})).override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; });
clang = if enableLibcxx then llvm.libcxxClang else llvm.clang; clang = if enableLibcxx then llvm.libcxxClang else llvm.clang;
clang-tools = llvm.clang-tools.override { inherit enableLibcxx; }; clang-tools = llvm.clang-tools.override { inherit enableLibcxx; };
cmakeHashes = { cmakeHashes = {

View File

@ -24,6 +24,7 @@
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/function.h> #include <kj/function.h>
#include <kj/memory.h> #include <kj/memory.h>
#include <kj/string.h>
#include <map> #include <map>
#include <memory> #include <memory>
#include <optional> #include <optional>
@ -42,7 +43,7 @@ thread_local ThreadContext g_thread_context;
void LoggingErrorHandler::taskFailed(kj::Exception&& exception) void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
{ {
KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception); KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
m_loop.log() << "Uncaught exception in daemonized task."; MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
} }
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock) EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
@ -191,13 +192,13 @@ void EventLoop::addAsyncCleanup(std::function<void()> fn)
startAsyncThread(); startAsyncThread();
} }
EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context) EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
: m_exe_name(exe_name), : m_exe_name(exe_name),
m_io_context(kj::setupAsyncIo()), m_io_context(kj::setupAsyncIo()),
m_task_set(new kj::TaskSet(m_error_handler)), m_task_set(new kj::TaskSet(m_error_handler)),
m_log_opts(std::move(log_opts)),
m_context(context) m_context(context)
{ {
m_log_opts.log_fn = log_fn;
int fds[2]; int fds[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
m_wait_fd = fds[0]; m_wait_fd = fds[0];
@ -251,9 +252,9 @@ void EventLoop::loop()
break; break;
} }
} }
log() << "EventLoop::loop done, cancelling event listeners."; MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
m_task_set.reset(); m_task_set.reset();
log() << "EventLoop::loop bye."; MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
wait_stream = nullptr; wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd)); KJ_SYSCALL(::close(post_fd));
const Lock lock(m_mutex); const Lock lock(m_mutex);
@ -430,4 +431,16 @@ std::string LongThreadName(const char* exe_name)
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name; return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
} }
kj::StringPtr KJ_STRINGIFY(Log v)
{
switch (v) {
case Log::Trace: return "Trace";
case Log::Debug: return "Debug";
case Log::Info: return "Info";
case Log::Warning: return "Warning";
case Log::Error: return "Error";
case Log::Raise: return "Raise";
}
return "<Log?>";
}
} // namespace mp } // namespace mp

View File

@ -7,6 +7,8 @@
#include <cerrno> #include <cerrno>
#include <cstdio> #include <cstdio>
#include <filesystem>
#include <iostream>
#include <kj/common.h> #include <kj/common.h>
#include <kj/string-tree.h> #include <kj/string-tree.h>
#include <pthread.h> #include <pthread.h>
@ -29,6 +31,8 @@
#include <pthread_np.h> #include <pthread_np.h>
#endif // HAVE_PTHREAD_GETTHREADID_NP #endif // HAVE_PTHREAD_GETTHREADID_NP
namespace fs = std::filesystem;
namespace mp { namespace mp {
namespace { namespace {
@ -138,6 +142,9 @@ void ExecProcess(const std::vector<std::string>& args)
argv.push_back(nullptr); argv.push_back(nullptr);
if (execvp(argv[0], argv.data()) != 0) { if (execvp(argv[0], argv.data()) != 0) {
perror("execvp failed"); perror("execvp failed");
if (errno == ENOENT && !args.empty()) {
std::cerr << "Missing executable: " << fs::weakly_canonical(args.front()) << '\n';
}
_exit(1); _exit(1);
} }
} }

View File

@ -13,6 +13,8 @@ $Proxy.includeTypes("mp/test/foo-types.h");
interface FooInterface $Proxy.wrap("mp::test::FooImplementation") { interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
add @0 (a :Int32, b :Int32) -> (result :Int32); add @0 (a :Int32, b :Int32) -> (result :Int32);
addOut @19 (a :Int32, b :Int32) -> (ret :Int32);
addInOut @20 (x :Int32, sum :Int32) -> (sum :Int32);
mapSize @1 (map :List(Pair(Text, Text))) -> (result :Int32); mapSize @1 (map :List(Pair(Text, Text))) -> (result :Int32);
pass @2 (arg :FooStruct) -> (result :FooStruct); pass @2 (arg :FooStruct) -> (result :FooStruct);
raise @3 (arg :FooStruct) -> (error :FooStruct $Proxy.exception("mp::test::FooStruct")); raise @3 (arg :FooStruct) -> (error :FooStruct $Proxy.exception("mp::test::FooStruct"));

View File

@ -62,6 +62,8 @@ class FooImplementation
{ {
public: public:
int add(int a, int b) { return a + b; } int add(int a, int b) { return a + b; }
void addOut(int a, int b, int& out) { out = a + b; }
void addInOut(int x, int& sum) { sum += x; }
int mapSize(const std::map<std::string, std::string>& map) { return map.size(); } int mapSize(const std::map<std::string, std::string>& map) { return map.size(); }
FooStruct pass(FooStruct foo) { return foo; } FooStruct pass(FooStruct foo) { return foo; }
void raise(FooStruct foo) { throw foo; } void raise(FooStruct foo) { throw foo; }

View File

@ -5,26 +5,32 @@
#include <mp/test/foo.capnp.h> #include <mp/test/foo.capnp.h>
#include <mp/test/foo.capnp.proxy.h> #include <mp/test/foo.capnp.proxy.h>
#include <atomic>
#include <capnp/capability.h> #include <capnp/capability.h>
#include <capnp/rpc.h> #include <capnp/rpc.h>
#include <condition_variable>
#include <cstring> #include <cstring>
#include <functional> #include <functional>
#include <future> #include <future>
#include <iostream>
#include <kj/async.h> #include <kj/async.h>
#include <kj/async-io.h> #include <kj/async-io.h>
#include <kj/common.h> #include <kj/common.h>
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/exception.h>
#include <kj/memory.h> #include <kj/memory.h>
#include <kj/string.h>
#include <kj/test.h> #include <kj/test.h>
#include <memory> #include <memory>
#include <mp/proxy.h> #include <mp/proxy.h>
#include <mp/proxy.capnp.h>
#include <mp/proxy-io.h> #include <mp/proxy-io.h>
#include <mp/util.h>
#include <optional> #include <optional>
#include <set> #include <set>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <system_error>
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -60,9 +66,10 @@ public:
TestSetup(bool client_owns_connection = true) TestSetup(bool client_owns_connection = true)
: thread{[&] { : thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) { EventLoop loop("mptest", [](mp::LogMessage log) {
std::cout << "LOG" << raise << ": " << log << "\n"; // Info logs are not printed by default, but will be shown with `mptest --verbose`
if (raise) throw std::runtime_error(log); KJ_LOG(INFO, log.level, log.message);
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
}); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe(); auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@ -113,6 +120,11 @@ KJ_TEST("Call FooInterface methods")
ProxyClient<messages::FooInterface>* foo = setup.client.get(); ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3); KJ_EXPECT(foo->add(1, 2) == 3);
int ret;
foo->addOut(3, 4, ret);
KJ_EXPECT(ret == 7);
foo->addInOut(3, ret);
KJ_EXPECT(ret == 10);
FooStruct in; FooStruct in;
in.name = "name"; in.name = "name";
@ -297,5 +309,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value(); signal.set_value();
} }
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
std::promise<void> signal;
foo->initThreadMap();
// Use callFnAsync() to get the client to set up the request_thread
// that will be used for the test.
setup.server->m_impl->m_fn = [&] {};
foo->callFnAsync();
ThreadContext& tc{g_thread_context};
Thread::Client *callback_thread, *request_thread;
foo->m_context.loop->sync([&] {
Lock lock(tc.waiter->m_mutex);
callback_thread = &tc.callback_threads.at(foo->m_context.connection)->m_client;
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
});
setup.server->m_impl->m_fn = [&] {
try
{
signal.get_future().get();
}
catch (const std::future_error& e)
{
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
}
};
auto client{foo->m_client};
bool caught_thread_busy = false;
// NOTE: '3' was chosen because it was the lowest number
// of simultaneous calls required to reliably catch a "thread busy" error
std::atomic<size_t> running{3};
foo->m_context.loop->sync([&]
{
for (size_t i = 0; i < running; i++)
{
auto request{client.callFnAsyncRequest()};
auto context{request.initContext()};
context.setCallbackThread(*callback_thread);
context.setThread(*request_thread);
foo->m_context.loop->m_task_set->add(request.send().then(
[&](auto&& results) {
running -= 1;
tc.waiter->m_cv.notify_all();
},
[&](kj::Exception&& e) {
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
"remote exception: std::exception: thread busy");
caught_thread_busy = true;
running -= 1;
signal.set_value();
tc.waiter->m_cv.notify_all();
}
));
}
});
{
Lock lock(tc.waiter->m_mutex);
tc.waiter->wait(lock, [&running] { return running == 0; });
}
KJ_EXPECT(caught_thread_busy);
}
} // namespace test } // namespace test
} // namespace mp } // namespace mp