Getting the relay pool up to speed

This commit is contained in:
2025-10-02 11:51:41 -04:00
parent 0d910ca181
commit 0f897ab1b3
14 changed files with 6515 additions and 308 deletions

View File

@@ -65,22 +65,26 @@ typedef struct relay_connection {
char* url;
nostr_ws_client_t* ws_client;
nostr_pool_relay_status_t status;
// Connection management
time_t last_ping;
time_t connect_time;
nostr_relay_pool_t* pool; // Back reference to pool for config access
// Reconnection management
int reconnect_attempts;
// Ping management for latency measurement
time_t last_reconnect_attempt;
time_t next_reconnect_time;
// Connection health monitoring (ping/pong)
time_t last_ping_sent;
time_t next_ping_time; // last_ping_sent + NOSTR_POOL_PING_INTERVAL
time_t last_pong_received;
int ping_pending;
double pending_ping_start_ms; // High-resolution timestamp for ping measurement
int ping_pending; // Flag to track if ping response is expected
// Multi-subscription latency tracking (REQ->first EVENT/EOSE)
subscription_timing_t pending_subscriptions[NOSTR_POOL_MAX_PENDING_SUBSCRIPTIONS];
int pending_subscription_count;
// Statistics
nostr_relay_stats_t stats;
} relay_connection_t;
@@ -88,34 +92,38 @@ typedef struct relay_connection {
struct nostr_pool_subscription {
char subscription_id[NOSTR_POOL_SUBSCRIPTION_ID_SIZE];
cJSON* filter;
// Relay-specific subscription tracking
char** relay_urls;
int relay_count;
int* eose_received; // Track EOSE from each relay
// Callbacks
void (*on_event)(cJSON* event, const char* relay_url, void* user_data);
void (*on_eose)(void* user_data);
void* user_data;
int closed;
int close_on_eose; // Auto-close subscription when all relays send EOSE
nostr_relay_pool_t* pool; // Back reference to pool
};
struct nostr_relay_pool {
relay_connection_t* relays[NOSTR_POOL_MAX_RELAYS];
int relay_count;
// Reconnection configuration
nostr_pool_reconnect_config_t reconnect_config;
// Event deduplication - simple hash table with linear probing
char seen_event_ids[NOSTR_POOL_MAX_SEEN_EVENTS][65]; // 64 hex chars + null terminator
int seen_count;
int seen_next_index;
// Active subscriptions
nostr_pool_subscription_t* subscriptions[NOSTR_POOL_MAX_SUBSCRIPTIONS];
int subscription_count;
// Pool-wide settings
int default_timeout_ms;
};
@@ -163,33 +171,211 @@ static int add_subscription_timing(relay_connection_t* relay, const char* subscr
// Helper function to find and remove subscription timing
static double remove_subscription_timing(relay_connection_t* relay, const char* subscription_id) {
if (!relay || !subscription_id) return -1.0;
for (int i = 0; i < relay->pending_subscription_count; i++) {
if (relay->pending_subscriptions[i].active &&
if (relay->pending_subscriptions[i].active &&
strcmp(relay->pending_subscriptions[i].subscription_id, subscription_id) == 0) {
// Calculate latency
double current_time_ms = get_current_time_ms();
double latency_ms = current_time_ms - relay->pending_subscriptions[i].start_time_ms;
// Mark as inactive and remove by shifting remaining entries
relay->pending_subscriptions[i].active = 0;
for (int j = i; j < relay->pending_subscription_count - 1; j++) {
relay->pending_subscriptions[j] = relay->pending_subscriptions[j + 1];
}
relay->pending_subscription_count--;
return latency_ms;
}
}
return -1.0; // Not found
}
// Helper function to check if event ID has been seen
// Helper function to ensure relay connection
static int ensure_relay_connection(relay_connection_t* relay) {
if (!relay) {
return -1;
}
if (relay->ws_client && nostr_ws_get_state(relay->ws_client) == NOSTR_WS_CONNECTED) {
relay->status = NOSTR_POOL_RELAY_CONNECTED;
return 0; // Already connected
}
// Close existing connection if any
if (relay->ws_client) {
nostr_ws_close(relay->ws_client);
relay->ws_client = NULL;
}
// Attempt connection
relay->status = NOSTR_POOL_RELAY_CONNECTING;
relay->stats.connection_attempts++;
relay->ws_client = nostr_ws_connect(relay->url);
if (!relay->ws_client) {
relay->status = NOSTR_POOL_RELAY_ERROR;
relay->reconnect_attempts++;
relay->stats.connection_failures++;
return -1;
}
nostr_ws_state_t state = nostr_ws_get_state(relay->ws_client);
if (state == NOSTR_WS_CONNECTED) {
relay->status = NOSTR_POOL_RELAY_CONNECTED;
relay->connect_time = time(NULL);
relay->reconnect_attempts = 0;
// Initialize ping/pong monitoring on new connection
relay->last_ping_sent = time(NULL);
relay->last_pong_received = time(NULL);
relay->ping_pending = 0;
return 0;
} else {
relay->status = NOSTR_POOL_RELAY_ERROR;
relay->reconnect_attempts++;
relay->stats.connection_failures++;
// Close the failed connection
nostr_ws_close(relay->ws_client);
relay->ws_client = NULL;
return -1;
}
}
// Reconnection helper functions
static int should_attempt_reconnect(relay_connection_t* relay) {
if (!relay->pool->reconnect_config.enable_auto_reconnect) return 0;
if (relay->status == NOSTR_POOL_RELAY_CONNECTED) return 0;
if (relay->reconnect_attempts >= relay->pool->reconnect_config.max_reconnect_attempts) return 0;
time_t now = time(NULL);
return (now >= relay->next_reconnect_time);
}
static int calculate_reconnect_delay(relay_connection_t* relay) {
int delay = relay->pool->reconnect_config.initial_reconnect_delay_ms;
// Apply exponential backoff
for (int i = 1; i < relay->reconnect_attempts; i++) {
delay *= relay->pool->reconnect_config.reconnect_backoff_multiplier;
if (delay > relay->pool->reconnect_config.max_reconnect_delay_ms) {
delay = relay->pool->reconnect_config.max_reconnect_delay_ms;
break;
}
}
return delay;
}
static void restore_subscriptions_on_reconnect(relay_connection_t* relay) {
// Find subscriptions that should be active on this relay
for (int i = 0; i < relay->pool->subscription_count; i++) {
nostr_pool_subscription_t* sub = relay->pool->subscriptions[i];
if (!sub->closed) {
// Check if this subscription should be on this relay
for (int j = 0; j < sub->relay_count; j++) {
if (strcmp(sub->relay_urls[j], relay->url) == 0) {
// Re-send the subscription
if (nostr_relay_send_req(relay->ws_client, sub->subscription_id, sub->filter) >= 0) {
// Add timing for latency measurement
add_subscription_timing(relay, sub->subscription_id);
}
break;
}
}
}
}
}
static void attempt_reconnect(relay_connection_t* relay) {
relay->status = NOSTR_POOL_RELAY_CONNECTING;
relay->last_reconnect_attempt = time(NULL);
relay->reconnect_attempts++;
if (ensure_relay_connection(relay) == 0) {
// Success! Reset reconnection state
relay->reconnect_attempts = 0;
relay->next_reconnect_time = 0;
// Restore subscriptions on reconnect
restore_subscriptions_on_reconnect(relay);
} else {
// Failed - schedule next attempt with backoff
int delay_ms = calculate_reconnect_delay(relay);
relay->next_reconnect_time = time(NULL) + (delay_ms / 1000);
}
}
// Connection health monitoring (ping/pong)
static void check_connection_health(relay_connection_t* relay) {
time_t now = time(NULL);
// Send ping if interval elapsed and ping is enabled
if (relay->pool->reconnect_config.ping_interval_seconds > 0 &&
now - relay->last_ping_sent >= relay->pool->reconnect_config.ping_interval_seconds &&
!relay->ping_pending) {
if (nostr_ws_ping(relay->ws_client) == 0) {
relay->last_ping_sent = now;
relay->ping_pending = 1;
// Store high-resolution start time for latency measurement
relay->pending_ping_start_ms = get_current_time_ms();
}
}
// Check for pong timeout
if (relay->ping_pending &&
now - relay->last_ping_sent > relay->pool->reconnect_config.pong_timeout_seconds) {
// No pong received - connection is dead
relay->status = NOSTR_POOL_RELAY_DISCONNECTED;
relay->ping_pending = 0;
}
}
static void handle_pong_response(relay_connection_t* relay) {
relay->last_pong_received = time(NULL);
if (relay->ping_pending) {
// Calculate ping latency
double current_time_ms = get_current_time_ms();
double ping_latency = current_time_ms - relay->pending_ping_start_ms;
// Update ping statistics
if (relay->stats.ping_samples == 0) {
relay->stats.ping_latency_avg = ping_latency;
relay->stats.ping_latency_min = ping_latency;
relay->stats.ping_latency_max = ping_latency;
} else {
relay->stats.ping_latency_avg =
(relay->stats.ping_latency_avg * relay->stats.ping_samples + ping_latency) /
(relay->stats.ping_samples + 1);
if (ping_latency < relay->stats.ping_latency_min) {
relay->stats.ping_latency_min = ping_latency;
}
if (ping_latency > relay->stats.ping_latency_max) {
relay->stats.ping_latency_max = ping_latency;
}
}
relay->stats.ping_latency_current = ping_latency;
relay->stats.ping_samples++;
relay->ping_pending = 0;
}
}
static int is_event_seen(nostr_relay_pool_t* pool, const char* event_id) {
if (!pool || !event_id) return 0;
for (int i = 0; i < pool->seen_count; i++) {
if (strcmp(pool->seen_event_ids[i], event_id) == 0) {
return 1;
@@ -201,28 +387,49 @@ static int is_event_seen(nostr_relay_pool_t* pool, const char* event_id) {
// Helper function to mark event as seen
static void mark_event_seen(nostr_relay_pool_t* pool, const char* event_id) {
if (!pool || !event_id) return;
// Don't add duplicates
if (is_event_seen(pool, event_id)) return;
// Use circular buffer for seen events
strncpy(pool->seen_event_ids[pool->seen_next_index], event_id, 64);
pool->seen_event_ids[pool->seen_next_index][64] = '\0';
pool->seen_next_index = (pool->seen_next_index + 1) % NOSTR_POOL_MAX_SEEN_EVENTS;
if (pool->seen_count < NOSTR_POOL_MAX_SEEN_EVENTS) {
pool->seen_count++;
}
}
// Default configuration helper
nostr_pool_reconnect_config_t* nostr_pool_reconnect_config_default(void) {
static nostr_pool_reconnect_config_t config = {
.enable_auto_reconnect = 1,
.max_reconnect_attempts = 10,
.initial_reconnect_delay_ms = 1000,
.max_reconnect_delay_ms = 30000,
.reconnect_backoff_multiplier = 2,
.ping_interval_seconds = 30,
.pong_timeout_seconds = 10
};
return &config;
}
// Pool management functions
nostr_relay_pool_t* nostr_relay_pool_create(void) {
nostr_relay_pool_t* nostr_relay_pool_create(nostr_pool_reconnect_config_t* config) {
if (!config) {
config = nostr_pool_reconnect_config_default();
}
nostr_relay_pool_t* pool = calloc(1, sizeof(nostr_relay_pool_t));
if (!pool) {
return NULL;
}
// Copy configuration
pool->reconnect_config = *config;
pool->default_timeout_ms = NOSTR_POOL_DEFAULT_TIMEOUT;
return pool;
}
@@ -250,15 +457,19 @@ int nostr_relay_pool_add_relay(nostr_relay_pool_t* pool, const char* relay_url)
relay->status = NOSTR_POOL_RELAY_DISCONNECTED;
relay->ws_client = NULL;
relay->last_ping = 0;
relay->connect_time = 0;
relay->pool = pool; // Set back reference
// Initialize reconnection state
relay->reconnect_attempts = 0;
// Initialize ping management
relay->last_reconnect_attempt = 0;
relay->next_reconnect_time = 0;
// Initialize ping/pong monitoring
relay->last_ping_sent = 0;
relay->next_ping_time = 0;
relay->pending_ping_start_ms = 0.0;
relay->last_pong_received = 0;
relay->ping_pending = 0;
relay->pending_ping_start_ms = 0.0;
// Initialize statistics
memset(&relay->stats, 0, sizeof(relay->stats));
@@ -325,85 +536,17 @@ void nostr_relay_pool_destroy(nostr_relay_pool_t* pool) {
free(pool);
}
// Helper function to ensure relay connection
static int ensure_relay_connection(relay_connection_t* relay) {
if (!relay) {
return -1;
}
if (relay->ws_client && nostr_ws_get_state(relay->ws_client) == NOSTR_WS_CONNECTED) {
relay->status = NOSTR_POOL_RELAY_CONNECTED;
return 0; // Already connected
}
// Close existing connection if any
if (relay->ws_client) {
nostr_ws_close(relay->ws_client);
relay->ws_client = NULL;
}
// Attempt connection
relay->status = NOSTR_POOL_RELAY_CONNECTING;
relay->stats.connection_attempts++;
relay->ws_client = nostr_ws_connect(relay->url);
if (!relay->ws_client) {
relay->status = NOSTR_POOL_RELAY_ERROR;
relay->reconnect_attempts++;
relay->stats.connection_failures++;
return -1;
}
nostr_ws_state_t state = nostr_ws_get_state(relay->ws_client);
if (state == NOSTR_WS_CONNECTED) {
relay->status = NOSTR_POOL_RELAY_CONNECTED;
relay->connect_time = time(NULL);
relay->reconnect_attempts = 0;
// PING FUNCTIONALITY DISABLED - Initial ping on connection establishment
/* COMMENTED OUT - PING FUNCTIONALITY DISABLED
// Trigger immediate ping on new connection
time_t current_time = time(NULL);
relay->pending_ping_start_ms = get_current_time_ms();
relay->ping_pending = 1;
relay->last_ping_sent = current_time;
relay->next_ping_time = current_time + NOSTR_POOL_PING_INTERVAL;
if (nostr_ws_send_ping(relay->ws_client, "ping", 4) < 0) {
relay->ping_pending = 0;
}
*/
return 0;
} else {
relay->status = NOSTR_POOL_RELAY_ERROR;
relay->reconnect_attempts++;
relay->stats.connection_failures++;
// Close the failed connection
nostr_ws_close(relay->ws_client);
relay->ws_client = NULL;
return -1;
}
}
// Subscription management
nostr_pool_subscription_t* nostr_relay_pool_subscribe(
nostr_relay_pool_t* pool,
const char** relay_urls,
const char** relay_urls,
int relay_count,
cJSON* filter,
void (*on_event)(cJSON* event, const char* relay_url, void* user_data),
void (*on_eose)(void* user_data),
void* user_data) {
void* user_data,
int close_on_eose) {
if (!pool || !relay_urls || relay_count <= 0 || !filter ||
pool->subscription_count >= NOSTR_POOL_MAX_SUBSCRIPTIONS) {
@@ -458,6 +601,7 @@ nostr_pool_subscription_t* nostr_relay_pool_subscribe(
sub->on_eose = on_eose;
sub->user_data = user_data;
sub->closed = 0;
sub->close_on_eose = close_on_eose;
sub->pool = pool;
// Add to pool
@@ -630,8 +774,15 @@ static void process_relay_message(nostr_relay_pool_t* pool, relay_connection_t*
}
}
if (all_eose && sub->on_eose) {
sub->on_eose(sub->user_data);
if (all_eose) {
if (sub->on_eose) {
sub->on_eose(sub->user_data);
}
// Auto-close subscription if close_on_eose is enabled
if (sub->close_on_eose && !sub->closed) {
nostr_pool_subscription_close(sub);
}
}
break;
}
@@ -652,39 +803,8 @@ static void process_relay_message(nostr_relay_pool_t* pool, relay_connection_t*
}
}
} else if (strcmp(msg_type, "PONG") == 0) {
// PING FUNCTIONALITY DISABLED - Handle PONG response
/* COMMENTED OUT - PING FUNCTIONALITY DISABLED
if (relay->ping_pending) {
double current_time_ms = get_current_time_ms();
double ping_latency = current_time_ms - relay->pending_ping_start_ms;
// Update ping statistics
if (relay->stats.ping_samples == 0) {
relay->stats.ping_latency_avg = ping_latency;
relay->stats.ping_latency_min = ping_latency;
relay->stats.ping_latency_max = ping_latency;
} else {
relay->stats.ping_latency_avg =
(relay->stats.ping_latency_avg * relay->stats.ping_samples + ping_latency) /
(relay->stats.ping_samples + 1);
if (ping_latency < relay->stats.ping_latency_min) {
relay->stats.ping_latency_min = ping_latency;
}
if (ping_latency > relay->stats.ping_latency_max) {
relay->stats.ping_latency_max = ping_latency;
}
}
relay->stats.ping_latency_current = ping_latency;
relay->stats.ping_samples++;
relay->ping_pending = 0;
#ifdef NOSTR_DEBUG_ENABLED
printf("🏓 DEBUG: PONG from %s - latency: %.2f ms\n", relay->url, ping_latency);
#endif
}
*/
// Handle PONG response for connection health monitoring
handle_pong_response(relay);
}
if (msg_type) free(msg_type);
@@ -757,11 +877,17 @@ cJSON** nostr_relay_pool_query_sync(
int len = nostr_ws_receive(relay->ws_client, buffer, sizeof(buffer) - 1, 100);
if (len > 0) {
buffer[len] = '\0';
char* msg_type = NULL;
cJSON* parsed = NULL;
if (nostr_parse_relay_message(buffer, &msg_type, &parsed) == 0) {
if (msg_type && strcmp(msg_type, "EVENT") == 0) {
// Check if this is a pong message (WebSocket library prefixes pong messages)
if (strncmp(buffer, "__PONG__", 8) == 0) {
// Handle pong response for connection health monitoring
handle_pong_response(relay);
} else {
// Process as regular NOSTR message
char* msg_type = NULL;
cJSON* parsed = NULL;
if (nostr_parse_relay_message(buffer, &msg_type, &parsed) == 0) {
if (msg_type && strcmp(msg_type, "EVENT") == 0) {
// Handle EVENT message
if (cJSON_IsArray(parsed) && cJSON_GetArraySize(parsed) >= 3) {
cJSON* sub_id_json = cJSON_GetArrayItem(parsed, 1);
@@ -806,6 +932,7 @@ cJSON** nostr_relay_pool_query_sync(
}
if (msg_type) free(msg_type);
if (parsed) cJSON_Delete(parsed);
}
}
}
}
@@ -1059,76 +1186,78 @@ double nostr_relay_pool_get_relay_query_latency(
}
int nostr_relay_pool_ping_relay(
nostr_relay_pool_t* pool,
nostr_relay_pool_t* pool,
const char* relay_url) {
// PING FUNCTIONALITY DISABLED
/* COMMENTED OUT - PING FUNCTIONALITY DISABLED
if (!pool || !relay_url) {
return NOSTR_ERROR_INVALID_INPUT;
}
relay_connection_t* relay = find_relay_by_url(pool, relay_url);
if (!relay || !relay->ws_client) {
return NOSTR_ERROR_INVALID_INPUT;
}
if (ensure_relay_connection(relay) != 0) {
return NOSTR_ERROR_NETWORK_FAILED;
}
time_t current_time = time(NULL);
relay->pending_ping_start_ms = get_current_time_ms();
relay->ping_pending = 1;
relay->last_ping_sent = current_time;
relay->next_ping_time = current_time + NOSTR_POOL_PING_INTERVAL;
if (nostr_ws_send_ping(relay->ws_client, "ping", 4) < 0) {
relay->ping_pending = 0;
return NOSTR_ERROR_NETWORK_FAILED;
}
return NOSTR_SUCCESS;
*/
(void)pool; // Suppress unused parameter warning
(void)relay_url; // Suppress unused parameter warning
return NOSTR_ERROR_INVALID_INPUT; // Function disabled
}
int nostr_relay_pool_ping_relay_sync(
nostr_relay_pool_t* pool,
const char* relay_url,
nostr_relay_pool_t* pool,
const char* relay_url,
int timeout_seconds) {
// PING FUNCTIONALITY DISABLED
/* COMMENTED OUT - PING FUNCTIONALITY DISABLED
if (!pool || !relay_url) {
return NOSTR_ERROR_INVALID_INPUT;
}
relay_connection_t* relay = find_relay_by_url(pool, relay_url);
if (!relay || !relay->ws_client) {
return NOSTR_ERROR_INVALID_INPUT;
}
if (ensure_relay_connection(relay) != 0) {
return NOSTR_ERROR_NETWORK_FAILED;
}
if (timeout_seconds <= 0) {
timeout_seconds = 5;
}
time_t current_time = time(NULL);
relay->pending_ping_start_ms = get_current_time_ms();
relay->ping_pending = 1;
relay->last_ping_sent = current_time;
relay->next_ping_time = current_time + NOSTR_POOL_PING_INTERVAL;
if (nostr_ws_send_ping(relay->ws_client, "ping", 4) < 0) {
relay->ping_pending = 0;
return NOSTR_ERROR_NETWORK_FAILED;
}
// Wait for PONG response
time_t wait_start = time(NULL);
while (time(NULL) - wait_start < timeout_seconds && relay->ping_pending) {
@@ -1136,7 +1265,7 @@ int nostr_relay_pool_ping_relay_sync(
int len = nostr_ws_receive(relay->ws_client, buffer, sizeof(buffer) - 1, 1000);
if (len > 0) {
buffer[len] = '\0';
char* msg_type = NULL;
cJSON* parsed = NULL;
if (nostr_parse_relay_message(buffer, &msg_type, &parsed) == 0) {
@@ -1146,17 +1275,17 @@ int nostr_relay_pool_ping_relay_sync(
if (relay->ping_pending) {
double current_time_ms = get_current_time_ms();
double ping_latency = current_time_ms - relay->pending_ping_start_ms;
// Update ping statistics
if (relay->stats.ping_samples == 0) {
relay->stats.ping_latency_avg = ping_latency;
relay->stats.ping_latency_min = ping_latency;
relay->stats.ping_latency_max = ping_latency;
} else {
relay->stats.ping_latency_avg =
(relay->stats.ping_latency_avg * relay->stats.ping_samples + ping_latency) /
relay->stats.ping_latency_avg =
(relay->stats.ping_latency_avg * relay->stats.ping_samples + ping_latency) /
(relay->stats.ping_samples + 1);
if (ping_latency < relay->stats.ping_latency_min) {
relay->stats.ping_latency_min = ping_latency;
}
@@ -1164,11 +1293,11 @@ int nostr_relay_pool_ping_relay_sync(
relay->stats.ping_latency_max = ping_latency;
}
}
relay->stats.ping_latency_current = ping_latency;
relay->stats.ping_samples++;
relay->ping_pending = 0;
if (msg_type) free(msg_type);
if (parsed) cJSON_Delete(parsed);
return NOSTR_SUCCESS;
@@ -1179,12 +1308,15 @@ int nostr_relay_pool_ping_relay_sync(
}
}
}
// Timeout
relay->ping_pending = 0;
return NOSTR_ERROR_NETWORK_FAILED;
*/
(void)pool; // Suppress unused parameter warning
(void)relay_url; // Suppress unused parameter warning
(void)timeout_seconds; // Suppress unused parameter warning
return NOSTR_ERROR_INVALID_INPUT; // Function disabled
}
@@ -1233,52 +1365,58 @@ int nostr_relay_pool_poll(nostr_relay_pool_t* pool, int timeout_ms) {
if (!pool) {
return -1;
}
int events_processed = 0;
for (int i = 0; i < pool->relay_count; i++) {
relay_connection_t* relay = pool->relays[i];
if (!relay || !relay->ws_client) {
if (!relay) {
continue;
}
// Check if reconnection is needed
if (should_attempt_reconnect(relay)) {
attempt_reconnect(relay);
}
// Skip if no WebSocket client
if (!relay->ws_client) {
continue;
}
// Check connection state
nostr_ws_state_t state = nostr_ws_get_state(relay->ws_client);
if (state != NOSTR_WS_CONNECTED) {
relay->status = (state == NOSTR_WS_ERROR) ? NOSTR_POOL_RELAY_ERROR : NOSTR_POOL_RELAY_DISCONNECTED;
continue;
}
relay->status = NOSTR_POOL_RELAY_CONNECTED;
// PING FUNCTIONALITY DISABLED - Automatic ping management
/* COMMENTED OUT - PING FUNCTIONALITY DISABLED
// Check if we need to send a ping to keep the connection alive
if (current_time >= relay->next_ping_time && !relay->ping_pending) {
relay->pending_ping_start_ms = get_current_time_ms();
relay->ping_pending = 1;
relay->last_ping_sent = current_time;
relay->next_ping_time = current_time + NOSTR_POOL_PING_INTERVAL;
if (nostr_ws_send_ping(relay->ws_client, "ping", 4) < 0) {
relay->ping_pending = 0;
}
}
*/
// Connection health monitoring (ping/pong)
check_connection_health(relay);
// Process incoming messages
char buffer[8192];
int timeout_per_relay = timeout_ms / pool->relay_count;
int len = nostr_ws_receive(relay->ws_client, buffer, sizeof(buffer) - 1, timeout_per_relay);
if (len > 0) {
buffer[len] = '\0';
process_relay_message(pool, relay, buffer);
// Check if this is a pong message (WebSocket library prefixes pong messages)
if (strncmp(buffer, "__PONG__", 8) == 0) {
// Handle pong response for connection health monitoring
handle_pong_response(relay);
} else {
// Process as regular NOSTR message
process_relay_message(pool, relay, buffer);
}
events_processed++;
}
}
return events_processed;
}

View File

@@ -184,8 +184,20 @@ typedef struct {
typedef struct nostr_relay_pool nostr_relay_pool_t;
typedef struct nostr_pool_subscription nostr_pool_subscription_t;
// Reconnection configuration
typedef struct {
int enable_auto_reconnect; // 1 = enable, 0 = disable
int max_reconnect_attempts; // Max attempts per relay
int initial_reconnect_delay_ms; // Initial delay between attempts
int max_reconnect_delay_ms; // Max delay (cap exponential backoff)
int reconnect_backoff_multiplier; // Delay multiplier
int ping_interval_seconds; // How often to ping (0 = disable)
int pong_timeout_seconds; // How long to wait for pong before reconnecting
} nostr_pool_reconnect_config_t;
// Relay pool management functions
nostr_relay_pool_t* nostr_relay_pool_create(void);
nostr_relay_pool_t* nostr_relay_pool_create(nostr_pool_reconnect_config_t* config);
nostr_pool_reconnect_config_t* nostr_pool_reconnect_config_default(void);
int nostr_relay_pool_add_relay(nostr_relay_pool_t* pool, const char* relay_url);
int nostr_relay_pool_remove_relay(nostr_relay_pool_t* pool, const char* relay_url);
void nostr_relay_pool_destroy(nostr_relay_pool_t* pool);
@@ -198,7 +210,8 @@ nostr_pool_subscription_t* nostr_relay_pool_subscribe(
cJSON* filter,
void (*on_event)(cJSON* event, const char* relay_url, void* user_data),
void (*on_eose)(void* user_data),
void* user_data);
void* user_data,
int close_on_eose);
int nostr_pool_subscription_close(nostr_pool_subscription_t* subscription);
// Event loop functions
@@ -242,6 +255,9 @@ int nostr_relay_pool_reset_relay_stats(
double nostr_relay_pool_get_relay_query_latency(
nostr_relay_pool_t* pool,
const char* relay_url);
double nostr_relay_pool_get_relay_ping_latency(
nostr_relay_pool_t* pool,
const char* relay_url);
// Synchronous relay operations (one-off queries/publishes)
typedef enum {