diff --git a/.gitignore b/.gitignore index dba2132..a92bf47 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,3 @@ copy_executable_local.sh nostr_login_lite/ style_guide/ nostr-tools - diff --git a/docs/subscription_cleanup_simplified.md b/docs/subscription_cleanup_simplified.md new file mode 100644 index 0000000..0d557e5 --- /dev/null +++ b/docs/subscription_cleanup_simplified.md @@ -0,0 +1,532 @@ +# Subscription Cleanup - Simplified Design + +## Problem Summary + +The c-relay Nostr relay experienced severe performance degradation (90-100% CPU) due to subscription accumulation in the database. Investigation revealed **323,644 orphaned subscriptions** that were never properly closed when WebSocket connections dropped. + +## Solution: Two-Component Approach + +This simplified design focuses on two pragmatic solutions that align with Nostr's stateless design: + +1. **Startup Cleanup**: Close all subscriptions on relay restart +2. **Connection Age Limit**: Disconnect clients after a configurable time period + +Both solutions force clients to reconnect and re-establish subscriptions, which is standard Nostr behavior. + +--- + +## Component 1: Startup Cleanup + +### Purpose +Ensure clean state on every relay restart by closing all subscriptions in the database. + +### Implementation + +**File:** [`src/subscriptions.c`](src/subscriptions.c) + +**New Function:** +```c +void cleanup_all_subscriptions_on_startup(void) { + if (!g_db) { + DEBUG_ERROR("Database not initialized for startup cleanup"); + return; + } + + DEBUG_LOG("Startup cleanup: Marking all active subscriptions as disconnected"); + + // Mark all 'created' subscriptions as disconnected + const char* update_sql = + "UPDATE subscriptions " + "SET ended_at = strftime('%s', 'now') " + "WHERE event_type = 'created' AND ended_at IS NULL"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, update_sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + DEBUG_ERROR("Failed to prepare startup cleanup query: %s", sqlite3_errmsg(g_db)); + return; + } + + rc = sqlite3_step(stmt); + int updated_count = sqlite3_changes(g_db); + sqlite3_finalize(stmt); + + if (updated_count > 0) { + // Log a single 'disconnected' event for the startup cleanup + const char* insert_sql = + "INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) " + "VALUES ('startup_cleanup', '', 'system', 'disconnected')"; + + rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } + + DEBUG_LOG("Startup cleanup: Marked %d subscriptions as disconnected", updated_count); + } else { + DEBUG_LOG("Startup cleanup: No active subscriptions found"); + } +} +``` + +**Integration Point:** [`src/main.c:1810`](src/main.c:1810) + +```c +// Initialize subscription manager mutexes +if (pthread_mutex_init(&g_subscription_manager.subscriptions_lock, NULL) != 0) { + DEBUG_ERROR("Failed to initialize subscriptions mutex"); + sqlite3_close(g_db); + return 1; +} + +if (pthread_mutex_init(&g_subscription_manager.ip_tracking_lock, NULL) != 0) { + DEBUG_ERROR("Failed to initialize IP tracking mutex"); + pthread_mutex_destroy(&g_subscription_manager.subscriptions_lock); + sqlite3_close(g_db); + return 1; +} + +// **NEW: Startup cleanup - close all subscriptions** +cleanup_all_subscriptions_on_startup(); + +// Start WebSocket relay server +DEBUG_LOG("Starting WebSocket relay server..."); +if (start_websocket_relay(port_override, strict_port) != 0) { + DEBUG_ERROR("Failed to start WebSocket relay"); + // ... cleanup code +} +``` + +### Benefits +- **Immediate relief**: Restart relay to fix subscription issues +- **Clean slate**: Every restart starts with zero active subscriptions +- **Simple**: Single SQL UPDATE statement +- **Nostr-aligned**: Clients are designed to reconnect after relay restart +- **No configuration needed**: Always runs on startup + +--- + +## Component 2: Connection Age Limit + +### Purpose +Automatically disconnect clients after a configurable time period, forcing them to reconnect and re-establish subscriptions. + +### Why Disconnect Instead of Just Closing Subscriptions? + +**Option 1: Send CLOSED message (keep connection)** +- ❌ Not all clients handle `CLOSED` messages properly +- ❌ Silent failure - client thinks it's subscribed but isn't +- ❌ Partial cleanup - connection still consumes resources +- ❌ More complex to implement + +**Option 2: Disconnect client entirely (force reconnection)** ✅ +- ✅ Universal compatibility - all clients handle WebSocket reconnection +- ✅ Complete resource cleanup (memory, file descriptors, etc.) +- ✅ Simple implementation - single operation +- ✅ Well-tested code path (same as network interruptions) +- ✅ Forces re-authentication if needed + +### Implementation + +**File:** [`src/websockets.c`](src/websockets.c) + +**New Function:** +```c +/** + * Check connection age and disconnect clients that have been connected too long. + * This forces clients to reconnect and re-establish subscriptions. + * + * Uses libwebsockets' lws_vhost_foreach_wsi() to iterate through all active + * connections and checks their connection_established timestamp from per_session_data. + */ +void check_connection_age(int max_connection_seconds) { + if (max_connection_seconds <= 0 || !ws_context) { + return; + } + + time_t now = time(NULL); + time_t cutoff = now - max_connection_seconds; + + // Get the default vhost + struct lws_vhost *vhost = lws_get_vhost_by_name(ws_context, "default"); + if (!vhost) { + DEBUG_ERROR("Failed to get vhost for connection age check"); + return; + } + + // Iterate through all active WebSocket connections + // Note: lws_vhost_foreach_wsi() calls our callback for each connection + struct lws *wsi = NULL; + while ((wsi = lws_vhost_foreach_wsi(vhost, wsi)) != NULL) { + // Get per-session data which contains connection_established timestamp + struct per_session_data *pss = (struct per_session_data *)lws_wsi_user(wsi); + + if (pss && pss->connection_established > 0) { + // Check if connection is older than cutoff + if (pss->connection_established < cutoff) { + // Connection is too old - close it + long age_seconds = now - pss->connection_established; + + DEBUG_LOG("Closing connection from %s (age: %lds, limit: %ds)", + pss->client_ip, age_seconds, max_connection_seconds); + + // Close with normal status and reason message + lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, + (unsigned char *)"connection age limit", 21); + } + } + } +} +``` + +**Key Implementation Details:** + +1. **No database needed**: Active connections are tracked by libwebsockets itself +2. **Uses existing timestamp**: `pss->connection_established` is already set on line 456 of websockets.c +3. **Built-in iterator**: `lws_vhost_foreach_wsi()` safely iterates through all active connections +4. **Per-session data**: Each connection's `per_session_data` is accessible via `lws_wsi_user()` +5. **Safe closure**: `lws_close_reason()` properly closes the WebSocket with a status code and message + +**Integration Point:** [`src/websockets.c:2176`](src/websockets.c:2176) - in existing event loop + +```c +// Main event loop with proper signal handling +while (g_server_running && !g_shutdown_flag) { + int result = lws_service(ws_context, 1000); + + if (result < 0) { + DEBUG_ERROR("libwebsockets service error"); + break; + } + + // Check if it's time to post status update + time_t current_time = time(NULL); + int status_post_hours = get_config_int("kind_1_status_posts_hours", 0); + + if (status_post_hours > 0) { + int seconds_interval = status_post_hours * 3600; + if (current_time - last_status_post_time >= seconds_interval) { + last_status_post_time = current_time; + generate_and_post_status_event(); + } + } + + // **NEW: Check for connection age limit** + int max_connection_seconds = get_config_int("max_connection_seconds", 86400); // Default 24 hours + if (max_connection_seconds > 0) { + check_connection_age(max_connection_seconds); + } +} +``` + +### Configuration + +**Parameter:** `max_connection_seconds` +- **Default:** `86400` (24 hours) +- **Range:** `0` = disabled, `>0` = disconnect after X seconds +- **Units:** Seconds (for consistency with other time-based configs) + +**Example configurations:** +```json +{ + "max_connection_seconds": 86400 // 86400 seconds = 24 hours (default) +} +``` + +```json +{ + "max_connection_seconds": 43200 // 43200 seconds = 12 hours +} +``` + +```json +{ + "max_connection_seconds": 3600 // 3600 seconds = 1 hour +} +``` + +```json +{ + "max_connection_seconds": 0 // Disabled +} +``` + +### Client Behavior + +When disconnected due to age limit, clients will: +1. Detect WebSocket closure +2. Wait briefly (exponential backoff) +3. Reconnect to relay +4. Re-authenticate if needed (NIP-42) +5. Re-establish all subscriptions +6. Resume normal operation + +This is **exactly what happens** during network interruptions, so it's a well-tested code path in all Nostr clients. + +### Benefits +- **No new threads**: Uses existing event loop +- **Minimal overhead**: Check runs once per second (same as `lws_service`) +- **Simple implementation**: Iterate through active connections +- **Consistent pattern**: Matches existing status post checking +- **Universal compatibility**: All clients handle reconnection +- **Complete cleanup**: Frees all resources associated with connection +- **Configurable**: Can be adjusted per relay needs or disabled entirely + +--- + +## Implementation Plan + +### Phase 1: Startup Cleanup (1-2 hours) + +1. **Add `cleanup_all_subscriptions_on_startup()` function** + - File: [`src/subscriptions.c`](src/subscriptions.c) + - SQL UPDATE to mark all active subscriptions as disconnected + - Add logging for cleanup count + +2. **Integrate in main()** + - File: [`src/main.c:1810`](src/main.c:1810) + - Call after mutex initialization, before WebSocket server start + +3. **Test** + - Create subscriptions in database + - Restart relay + - Verify all subscriptions marked as disconnected + - Verify `active_subscriptions_log` shows 0 subscriptions + +**Estimated Time:** 1-2 hours + +### Phase 2: Connection Age Limit (2-3 hours) + +1. **Add `check_connection_age()` function** + - File: [`src/websockets.c`](src/websockets.c) + - Iterate through active connections + - Close connections older than limit + +2. **Integrate in event loop** + - File: [`src/websockets.c:2176`](src/websockets.c:2176) + - Add check after status post check + - Use same pattern as status posts + +3. **Add configuration parameter** + - Add `max_connection_seconds` to default config + - Default: 86400 (24 hours) + +4. **Test** + - Connect client + - Wait for timeout (or reduce timeout for testing) + - Verify client disconnected + - Verify client reconnects automatically + - Verify subscriptions re-established + +**Estimated Time:** 2-3 hours + +--- + +## Testing Strategy + +### Startup Cleanup Tests + +```bash +# Test 1: Clean startup with existing subscriptions +- Create 100 active subscriptions in database +- Restart relay +- Verify all subscriptions marked as disconnected +- Verify active_subscriptions_log shows 0 subscriptions + +# Test 2: Clean startup with no subscriptions +- Start relay with empty database +- Verify no errors +- Verify startup cleanup logs "No active subscriptions found" + +# Test 3: Clients reconnect after restart +- Create subscriptions before restart +- Restart relay +- Connect clients and create new subscriptions +- Verify new subscriptions tracked correctly +``` + +### Connection Age Limit Tests + +```bash +# Test 1: Connection disconnected after timeout +- Set max_connection_seconds to 60 (for testing) +- Connect client +- Wait 61 seconds +- Verify client disconnected +- Verify client reconnects automatically + +# Test 2: Subscriptions re-established after reconnection +- Connect client with subscriptions +- Wait for timeout +- Verify client reconnects +- Verify subscriptions re-established +- Verify events still delivered + +# Test 3: Disabled when set to 0 +- Set max_connection_seconds to 0 +- Connect client +- Wait extended period +- Verify client NOT disconnected +``` + +### Integration Tests + +```bash +# Test 1: Combined behavior +- Start relay (startup cleanup runs) +- Connect multiple clients +- Create subscriptions +- Wait for connection timeout +- Verify clients reconnect +- Restart relay +- Verify clean state + +# Test 2: Load test +- Connect 100 clients +- Each creates 5 subscriptions +- Wait for connection timeout +- Verify all clients reconnect +- Verify all subscriptions re-established +- Monitor CPU usage (should remain low) +``` + +--- + +## Success Criteria + +### Component 1: Startup Cleanup +- ✅ Relay starts with zero active subscriptions +- ✅ All previous subscriptions marked as disconnected on startup +- ✅ Clients successfully reconnect and re-establish subscriptions +- ✅ Relay restart can be used as emergency fix for subscription issues +- ✅ No errors during startup cleanup process + +### Component 2: Connection Age Limit +- ✅ Clients disconnected after configured time period +- ✅ Clients automatically reconnect +- ✅ Subscriptions re-established after reconnection +- ✅ No impact on relay performance +- ✅ Configuration parameter works correctly (including disabled state) + +### Overall Success +- ✅ CPU usage remains low (<10%) +- ✅ No orphaned subscriptions accumulate +- ✅ Database size remains stable +- ✅ No manual intervention required + +--- + +## Configuration Reference + +**New Configuration Parameters:** + +```json +{ + "max_connection_seconds": 86400 +} +``` + +**Recommended Settings:** + +- **Production:** + - `max_connection_seconds: 86400` (24 hours) + +- **Development:** + - `max_connection_seconds: 3600` (1 hour for faster testing) + +- **High-traffic:** + - `max_connection_seconds: 43200` (12 hours) + +- **Disabled:** + - `max_connection_seconds: 0` + +--- + +## Rollback Plan + +If issues arise after deployment: + +1. **Disable connection age limit:** + - Set `max_connection_seconds: 0` in config + - Restart relay + - Monitor for stability + +2. **Revert code changes:** + - Remove `check_connection_age()` call from event loop + - Remove `cleanup_all_subscriptions_on_startup()` call from main + - Restart relay + +3. **Database cleanup (if needed):** + - Manually clean up orphaned subscriptions using SQL: + ```sql + UPDATE subscriptions + SET ended_at = strftime('%s', 'now') + WHERE event_type = 'created' AND ended_at IS NULL; + ``` + +--- + +## Comparison with Original Design + +### Original Design (5 Components) +1. Startup cleanup +2. Fix WebSocket disconnection logging +3. Enhance subscription removal with reason parameter +4. Periodic cleanup task (background thread) +5. Optimize database VIEW +6. Subscription expiration (optional) + +### Simplified Design (2 Components) +1. Startup cleanup +2. Connection age limit + +### Why Simplified is Better + +**Advantages:** +- **Simpler**: 2 components vs 5-6 components +- **Faster to implement**: 3-5 hours vs 11-17 hours +- **Easier to maintain**: Less code, fewer moving parts +- **More reliable**: Fewer potential failure points +- **Nostr-aligned**: Leverages client reconnection behavior +- **No new threads**: Uses existing event loop +- **Universal compatibility**: All clients handle reconnection + +**What We're Not Losing:** +- Startup cleanup is identical in both designs +- Connection age limit achieves the same goal as periodic cleanup + expiration +- Disconnection forces complete cleanup (better than just logging) +- Database VIEW optimization not needed if subscriptions don't accumulate + +**Trade-offs:** +- Less granular logging (but simpler) +- No historical subscription analytics (but cleaner database) +- Clients must reconnect periodically (but this is standard Nostr behavior) + +--- + +## Conclusion + +This simplified design solves the subscription accumulation problem with two pragmatic solutions: + +1. **Startup cleanup** ensures every relay restart starts with a clean slate +2. **Connection age limit** prevents long-term accumulation by forcing periodic reconnection + +Both solutions align with Nostr's stateless design where clients are expected to handle reconnection. The implementation is simple, maintainable, and leverages existing code patterns. + +**Key Benefits:** +- ✅ Solves the root problem (subscription accumulation) +- ✅ Simple to implement (3-5 hours total) +- ✅ Easy to maintain (minimal code) +- ✅ Universal compatibility (all clients handle reconnection) +- ✅ No new threads or background tasks +- ✅ Configurable and can be disabled if needed +- ✅ Relay restart as emergency fix + +**Next Steps:** +1. Implement Component 1 (Startup Cleanup) +2. Test thoroughly +3. Implement Component 2 (Connection Age Limit) +4. Test thoroughly +5. Deploy to production +6. Monitor CPU usage and subscription counts \ No newline at end of file diff --git a/relay.pid b/relay.pid index cebf67b..bd7bcd6 100644 --- a/relay.pid +++ b/relay.pid @@ -1 +1 @@ -198023 +1073157 diff --git a/src/default_config_event.h b/src/default_config_event.h index d19df8b..a92fad6 100644 --- a/src/default_config_event.h +++ b/src/default_config_event.h @@ -65,6 +65,9 @@ static const struct { {"max_total_subscriptions", "5000"}, {"max_filters_per_subscription", "10"}, + // Connection Management + {"max_connection_seconds", "86400"}, // 24 hours (0 = disabled) + // Event Processing Limits {"max_event_tags", "100"}, {"max_content_length", "8196"}, diff --git a/src/main.c b/src/main.c index c22b82b..83289c2 100644 --- a/src/main.c +++ b/src/main.c @@ -1807,7 +1807,8 @@ int main(int argc, char* argv[]) { return 1; } - + // Cleanup orphaned subscriptions from previous runs + cleanup_all_subscriptions_on_startup(); // Start WebSocket Nostr relay server (port from CLI override or configuration) int result = start_websocket_relay(cli_options.port_override, cli_options.strict_port); // Use CLI port override if specified, otherwise config diff --git a/src/main.h b/src/main.h index eb7247e..1acc2f4 100644 --- a/src/main.h +++ b/src/main.h @@ -12,8 +12,8 @@ // Version information (auto-updated by build system) #define VERSION_MAJOR 1 #define VERSION_MINOR 0 -#define VERSION_PATCH 5 -#define VERSION "v1.0.5" +#define VERSION_PATCH 6 +#define VERSION "v1.0.6" // Avoid VERSION_MAJOR redefinition warning from nostr_core_lib #undef VERSION_MAJOR diff --git a/src/subscriptions.c b/src/subscriptions.c index fd1355b..6282b39 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -999,6 +999,44 @@ void update_subscription_events_sent(const char* sub_id, int events_sent) { } } +// Cleanup all subscriptions on startup +void cleanup_all_subscriptions_on_startup(void) { + if (!g_db) { + DEBUG_ERROR("Database not available for startup cleanup"); + return; + } + + DEBUG_LOG("Performing startup subscription cleanup"); + + // Mark all active subscriptions as disconnected + const char* sql = + "UPDATE subscriptions " + "SET ended_at = strftime('%s', 'now') " + "WHERE event_type = 'created' AND ended_at IS NULL"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + DEBUG_ERROR("Failed to prepare startup cleanup query"); + return; + } + + rc = sqlite3_step(stmt); + int changes = sqlite3_changes(g_db); + sqlite3_finalize(stmt); + + if (rc != SQLITE_DONE) { + DEBUG_ERROR("Failed to execute startup cleanup"); + return; + } + + if (changes > 0) { + DEBUG_LOG("Startup cleanup: marked %d orphaned subscriptions as disconnected", changes); + } else { + DEBUG_LOG("Startup cleanup: no orphaned subscriptions found"); + } +} + /////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/subscriptions.h b/src/subscriptions.h index 43ec1a5..cae0d04 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -120,4 +120,7 @@ void update_subscription_events_sent(const char* sub_id, int events_sent); // Subscription query functions int has_subscriptions_for_kind(int event_kind); +// Startup cleanup function +void cleanup_all_subscriptions_on_startup(void); + #endif // SUBSCRIPTIONS_H \ No newline at end of file diff --git a/src/websockets.c b/src/websockets.c index fb24feb..8200427 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -1984,6 +1984,73 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso return 0; } +// Check and disconnect connections that have exceeded max age +// This function works by checking connection age through the subscription system +static void check_connection_age(int max_connection_seconds) { + if (max_connection_seconds <= 0) { + return; // Feature disabled + } + + time_t current_time = time(NULL); + + // Lock the subscription manager to safely iterate through subscriptions + pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); + + // Track unique WSI pointers we've already checked to avoid duplicate checks + struct lws** checked_wsis = NULL; + int checked_count = 0; + int checked_capacity = 0; + + subscription_t* sub = g_subscription_manager.active_subscriptions; + while (sub) { + if (!sub->active || !sub->wsi) { + sub = sub->next; + continue; + } + + // Check if we've already processed this WSI + int already_checked = 0; + for (int i = 0; i < checked_count; i++) { + if (checked_wsis[i] == sub->wsi) { + already_checked = 1; + break; + } + } + + if (!already_checked) { + // Get per-session data to check connection age + struct per_session_data *pss = (struct per_session_data *)lws_wsi_user(sub->wsi); + + if (pss && pss->connection_established > 0) { + time_t connection_age = current_time - pss->connection_established; + + if (connection_age >= max_connection_seconds) { + DEBUG_LOG("Disconnecting client %s: connection age %ld seconds exceeds limit %d seconds", + pss->client_ip, connection_age, max_connection_seconds); + + // Close connection with normal closure status + lws_close_reason(sub->wsi, LWS_CLOSE_STATUS_NORMAL, + (unsigned char*)"Connection age limit reached", 28); + } + } + + // Add to checked list + if (checked_count >= checked_capacity) { + checked_capacity = checked_capacity == 0 ? 64 : checked_capacity * 2; + checked_wsis = realloc(checked_wsis, checked_capacity * sizeof(struct lws*)); + } + checked_wsis[checked_count++] = sub->wsi; + } + + sub = sub->next; + } + + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + + // Cleanup + free(checked_wsis); +} + // WebSocket protocol definition static struct lws_protocols protocols[] = { { @@ -2152,6 +2219,9 @@ int start_websocket_relay(int port_override, int strict_port) { // Static variable for status post timing (initialize to 0 for immediate first post) static time_t last_status_post_time = 0; + + // Static variable for connection age check timing + static time_t last_connection_age_check = 0; // Main event loop with proper signal handling while (g_server_running && !g_shutdown_flag) { @@ -2174,6 +2244,13 @@ int start_websocket_relay(int port_override, int strict_port) { generate_and_post_status_event(); } } + + // Check connection age limits (every 60 seconds) + int max_connection_seconds = get_config_int("max_connection_seconds", 86400); + if (max_connection_seconds > 0 && (current_time - last_connection_age_check >= 60)) { + last_connection_age_check = current_time; + check_connection_age(max_connection_seconds); + } } lws_context_destroy(ws_context); diff --git a/tests/.test_keys.txt b/tests/.test_keys.txt new file mode 100644 index 0000000..47ae36a --- /dev/null +++ b/tests/.test_keys.txt @@ -0,0 +1,5 @@ +# Test key configuration (from make_and_restart_relay.sh -t) +ADMIN_PRIVATE_KEY="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +ADMIN_PUBLIC_KEY="6a04ab98d9e4774ad806e302dddeb63bea16b5cb5f223ee77478e861bb583eb3" +RELAY_PUBLIC_KEY="4f355bdcb7cc0af728ef3cceb9615d90684bb5b2ca5f859ab0f0b704075871aa" +RELAY_URL="ws://localhost:8888" \ No newline at end of file