Add enhanced subscription functionality with EOSE result modes

This commit is contained in:
2025-10-02 15:00:50 -04:00
parent 0f897ab1b3
commit 54a6044083
12 changed files with 19452 additions and 1517 deletions

View File

@@ -100,12 +100,27 @@ struct nostr_pool_subscription {
// Callbacks
void (*on_event)(cJSON* event, const char* relay_url, void* user_data);
void (*on_eose)(void* user_data);
void (*on_eose)(cJSON** events, int event_count, void* user_data);
void* user_data;
int closed;
int close_on_eose; // Auto-close subscription when all relays send EOSE
nostr_relay_pool_t* pool; // Back reference to pool
// New subscription control parameters
int enable_deduplication; // Per-subscription deduplication control
nostr_pool_eose_result_mode_t result_mode; // EOSE result selection mode
int relay_timeout_seconds; // Timeout for individual relay operations
int eose_timeout_seconds; // Timeout for waiting for EOSE completion
time_t subscription_start_time; // When subscription was created
// Event collection for EOSE result modes
cJSON** collected_events;
int collected_event_count;
int collected_events_capacity;
// Per-relay timeout tracking
time_t* relay_last_activity; // Last activity time per relay
};
struct nostr_relay_pool {
@@ -544,9 +559,13 @@ nostr_pool_subscription_t* nostr_relay_pool_subscribe(
int relay_count,
cJSON* filter,
void (*on_event)(cJSON* event, const char* relay_url, void* user_data),
void (*on_eose)(void* user_data),
void (*on_eose)(cJSON** events, int event_count, void* user_data),
void* user_data,
int close_on_eose) {
int close_on_eose,
int enable_deduplication,
nostr_pool_eose_result_mode_t result_mode,
int relay_timeout_seconds,
int eose_timeout_seconds) {
if (!pool || !relay_urls || relay_count <= 0 || !filter ||
pool->subscription_count >= NOSTR_POOL_MAX_SUBSCRIPTIONS) {
@@ -603,6 +622,55 @@ nostr_pool_subscription_t* nostr_relay_pool_subscribe(
sub->closed = 0;
sub->close_on_eose = close_on_eose;
sub->pool = pool;
// Set new subscription control parameters
sub->enable_deduplication = enable_deduplication;
sub->result_mode = result_mode;
sub->relay_timeout_seconds = relay_timeout_seconds;
sub->eose_timeout_seconds = eose_timeout_seconds;
sub->subscription_start_time = time(NULL);
// Initialize event collection arrays (only for EOSE result modes)
if (result_mode != NOSTR_POOL_EOSE_FIRST) {
sub->collected_events_capacity = 10; // Initial capacity
sub->collected_events = calloc(sub->collected_events_capacity, sizeof(cJSON*));
if (!sub->collected_events) {
// Cleanup on failure
cJSON_Delete(sub->filter);
for (int j = 0; j < relay_count; j++) {
free(sub->relay_urls[j]);
}
free(sub->relay_urls);
free(sub->eose_received);
free(sub);
return NULL;
}
sub->collected_event_count = 0;
} else {
sub->collected_events = NULL;
sub->collected_event_count = 0;
sub->collected_events_capacity = 0;
}
// Initialize per-relay activity tracking
sub->relay_last_activity = calloc(relay_count, sizeof(time_t));
if (!sub->relay_last_activity) {
// Cleanup on failure
if (sub->collected_events) free(sub->collected_events);
cJSON_Delete(sub->filter);
for (int j = 0; j < relay_count; j++) {
free(sub->relay_urls[j]);
}
free(sub->relay_urls);
free(sub->eose_received);
free(sub);
return NULL;
}
// Initialize all relay activity times to current time
time_t now = time(NULL);
for (int i = 0; i < relay_count; i++) {
sub->relay_last_activity[i] = now;
}
// Add to pool
pool->subscriptions[pool->subscription_count++] = sub;
@@ -700,43 +768,56 @@ static void process_relay_message(nostr_relay_pool_t* pool, relay_connection_t*
if (event_id_json && cJSON_IsString(event_id_json)) {
const char* event_id = cJSON_GetStringValue(event_id_json);
// Check for duplicate
if (!is_event_seen(pool, event_id)) {
mark_event_seen(pool, event_id);
relay->stats.events_received++;
// Measure query latency (first event response)
double latency_ms = remove_subscription_timing(relay, subscription_id);
if (latency_ms > 0.0) {
// Update query latency statistics
if (relay->stats.query_samples == 0) {
relay->stats.query_latency_avg = latency_ms;
relay->stats.query_latency_min = latency_ms;
relay->stats.query_latency_max = latency_ms;
} else {
relay->stats.query_latency_avg =
(relay->stats.query_latency_avg * relay->stats.query_samples + latency_ms) /
(relay->stats.query_samples + 1);
if (latency_ms < relay->stats.query_latency_min) {
relay->stats.query_latency_min = latency_ms;
}
if (latency_ms > relay->stats.query_latency_max) {
relay->stats.query_latency_max = latency_ms;
}
}
relay->stats.query_samples++;
// Find subscription first
nostr_pool_subscription_t* sub = NULL;
for (int i = 0; i < pool->subscription_count; i++) {
if (pool->subscriptions[i] && !pool->subscriptions[i]->closed &&
strcmp(pool->subscriptions[i]->subscription_id, subscription_id) == 0) {
sub = pool->subscriptions[i];
break;
}
// Find subscription and call callback
for (int i = 0; i < pool->subscription_count; i++) {
nostr_pool_subscription_t* sub = pool->subscriptions[i];
if (sub && !sub->closed &&
strcmp(sub->subscription_id, subscription_id) == 0) {
if (sub->on_event) {
sub->on_event(event, relay->url, sub->user_data);
}
if (sub) {
// Check for duplicate (per-subscription deduplication)
int is_duplicate = 0;
if (sub->enable_deduplication) {
if (is_event_seen(pool, event_id)) {
is_duplicate = 1;
} else {
mark_event_seen(pool, event_id);
}
}
if (!is_duplicate) {
relay->stats.events_received++;
// Measure query latency (first event response)
double latency_ms = remove_subscription_timing(relay, subscription_id);
if (latency_ms > 0.0) {
// Update query latency statistics
if (relay->stats.query_samples == 0) {
relay->stats.query_latency_avg = latency_ms;
relay->stats.query_latency_min = latency_ms;
relay->stats.query_latency_max = latency_ms;
} else {
relay->stats.query_latency_avg =
(relay->stats.query_latency_avg * relay->stats.query_samples + latency_ms) /
(relay->stats.query_samples + 1);
if (latency_ms < relay->stats.query_latency_min) {
relay->stats.query_latency_min = latency_ms;
}
if (latency_ms > relay->stats.query_latency_max) {
relay->stats.query_latency_max = latency_ms;
}
}
break;
relay->stats.query_samples++;
}
// Call event callback
if (sub->on_event) {
sub->on_event(event, relay->url, sub->user_data);
}
}
}
@@ -776,7 +857,14 @@ static void process_relay_message(nostr_relay_pool_t* pool, relay_connection_t*
if (all_eose) {
if (sub->on_eose) {
sub->on_eose(sub->user_data);
// Pass collected events based on result mode
if (sub->result_mode == NOSTR_POOL_EOSE_FIRST) {
// FIRST mode: no events collected, pass NULL/0
sub->on_eose(NULL, 0, sub->user_data);
} else {
// FULL_SET or MOST_RECENT: pass collected events
sub->on_eose(sub->collected_events, sub->collected_event_count, sub->user_data);
}
}
// Auto-close subscription if close_on_eose is enabled

View File

@@ -1,6 +1,12 @@
#ifndef NOSTR_CORE_H
#define NOSTR_CORE_H
// Version information (auto-updated by increment_and_push.sh)
#define VERSION "v0.4.3"
#define VERSION_MAJOR 0
#define VERSION_MINOR 4
#define VERSION_PATCH 3
/*
* NOSTR Core Library - Complete API Reference
*
@@ -159,6 +165,13 @@ typedef enum {
NOSTR_POOL_RELAY_ERROR = -1
} nostr_pool_relay_status_t;
// EOSE result mode for subscriptions
typedef enum {
NOSTR_POOL_EOSE_FULL_SET, // Wait for all relays, return all events
NOSTR_POOL_EOSE_MOST_RECENT, // Wait for all relays, return most recent event
NOSTR_POOL_EOSE_FIRST // Return results on first EOSE (fastest response)
} nostr_pool_eose_result_mode_t;
typedef struct {
int connection_attempts;
int connection_failures;
@@ -204,6 +217,22 @@ void nostr_relay_pool_destroy(nostr_relay_pool_t* pool);
// Subscription management
nostr_pool_subscription_t* nostr_relay_pool_subscribe(
nostr_relay_pool_t* pool,
const char** relay_urls,
int relay_count,
cJSON* filter,
void (*on_event)(cJSON* event, const char* relay_url, void* user_data),
void (*on_eose)(cJSON** events, int event_count, void* user_data),
void* user_data,
int close_on_eose,
int enable_deduplication,
nostr_pool_eose_result_mode_t result_mode,
int relay_timeout_seconds,
int eose_timeout_seconds);
int nostr_pool_subscription_close(nostr_pool_subscription_t* subscription);
// Backward compatibility wrapper
nostr_pool_subscription_t* nostr_relay_pool_subscribe_compat(
nostr_relay_pool_t* pool,
const char** relay_urls,
int relay_count,
@@ -212,7 +241,6 @@ nostr_pool_subscription_t* nostr_relay_pool_subscribe(
void (*on_eose)(void* user_data),
void* user_data,
int close_on_eose);
int nostr_pool_subscription_close(nostr_pool_subscription_t* subscription);
// Event loop functions
int nostr_relay_pool_run(nostr_relay_pool_t* pool, int timeout_ms);