diff --git a/README.md b/README.md index cf19cbd..d0ff624 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,27 @@ A nostr relay in C with sqlite on the back end. + +### [NIPs](https://github.com/nostr-protocol/nips) + +- [x] NIP-01: Basic protocol flow description +- [ ] NIP-02: Contact list and petnames +- [ ] NIP-04: Encrypted Direct Message +- [ ] NIP-09: Event deletion +- [ ] NIP-11: Relay information document +- [ ] NIP-12: Generic tag queries +- [ ] NIP-13: Proof of Work +- [x] NIP-15: End of Stored Events Notice +- [ ] NIP-16: Event Treatment +- [x] NIP-20: Command Results +- [ ] NIP-22: Event `created_at` Limits +- [ ] NIP-25: Reactions +- [ ] NIP-26: Delegated Event Signing +- [ ] NIP-28: Public Chat +- [ ] NIP-33: Parameterized Replaceable Events +- [ ] NIP-40: Expiration Timestamp +- [ ] NIP-42: Authentication of clients to relays +- [ ] NIP-45: Counting results. [experimental](#count) +- [ ] NIP-50: Keywords filter. [experimental](#search) +- [ ] NIP-70: Protected Events + diff --git a/db/c_nostr_relay.db b/db/c_nostr_relay.db index 39f1bb4..cb5cdb2 100644 Binary files a/db/c_nostr_relay.db and b/db/c_nostr_relay.db differ diff --git a/db/c_nostr_relay.db-shm b/db/c_nostr_relay.db-shm index 38493fc..0544074 100644 Binary files a/db/c_nostr_relay.db-shm and b/db/c_nostr_relay.db-shm differ diff --git a/db/c_nostr_relay.db-wal b/db/c_nostr_relay.db-wal index 8e0cb89..fcf3f6a 100644 Binary files a/db/c_nostr_relay.db-wal and b/db/c_nostr_relay.db-wal differ diff --git a/db/schema.sql b/db/schema.sql index b7aebbc..4333821 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -87,4 +87,95 @@ BEGIN AND kind = NEW.kind AND event_type = 'replaceable' AND id != NEW.id; -END; \ No newline at end of file +END; + +-- Persistent Subscriptions Logging Tables (Phase 2) +-- Optional database logging for subscription analytics and debugging + +-- Subscription events log +CREATE TABLE subscription_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + subscription_id TEXT NOT NULL, -- Subscription ID from client + client_ip TEXT NOT NULL, -- Client IP address + event_type TEXT NOT NULL CHECK (event_type IN ('created', 'closed', 'expired', 'disconnected')), + filter_json TEXT, -- JSON representation of filters (for created events) + events_sent INTEGER DEFAULT 0, -- Number of events sent to this subscription + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + ended_at INTEGER, -- When subscription ended (for closed/expired/disconnected) + duration INTEGER -- Computed: ended_at - created_at +); + +-- Subscription metrics summary +CREATE TABLE subscription_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, -- Date (YYYY-MM-DD) + total_created INTEGER DEFAULT 0, -- Total subscriptions created + total_closed INTEGER DEFAULT 0, -- Total subscriptions closed + total_events_broadcast INTEGER DEFAULT 0, -- Total events broadcast + avg_duration REAL DEFAULT 0, -- Average subscription duration + peak_concurrent INTEGER DEFAULT 0, -- Peak concurrent subscriptions + updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + UNIQUE(date) +); + +-- Event broadcasting log (optional, for detailed analytics) +CREATE TABLE event_broadcasts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_id TEXT NOT NULL, -- Event ID that was broadcast + subscription_id TEXT NOT NULL, -- Subscription that received it + client_ip TEXT NOT NULL, -- Client IP + broadcast_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + FOREIGN KEY (event_id) REFERENCES events(id) +); + +-- Indexes for subscription logging performance +CREATE INDEX idx_subscription_events_id ON subscription_events(subscription_id); +CREATE INDEX idx_subscription_events_type ON subscription_events(event_type); +CREATE INDEX idx_subscription_events_created ON subscription_events(created_at DESC); +CREATE INDEX idx_subscription_events_client ON subscription_events(client_ip); + +CREATE INDEX idx_subscription_metrics_date ON subscription_metrics(date DESC); + +CREATE INDEX idx_event_broadcasts_event ON event_broadcasts(event_id); +CREATE INDEX idx_event_broadcasts_sub ON event_broadcasts(subscription_id); +CREATE INDEX idx_event_broadcasts_time ON event_broadcasts(broadcast_at DESC); + +-- Trigger to update subscription duration when ended +CREATE TRIGGER update_subscription_duration + AFTER UPDATE OF ended_at ON subscription_events + WHEN NEW.ended_at IS NOT NULL AND OLD.ended_at IS NULL +BEGIN + UPDATE subscription_events + SET duration = NEW.ended_at - NEW.created_at + WHERE id = NEW.id; +END; + +-- View for subscription analytics +CREATE VIEW subscription_analytics AS +SELECT + date(created_at, 'unixepoch') as date, + COUNT(*) as subscriptions_created, + COUNT(CASE WHEN ended_at IS NOT NULL THEN 1 END) as subscriptions_ended, + AVG(CASE WHEN duration IS NOT NULL THEN duration END) as avg_duration_seconds, + MAX(events_sent) as max_events_sent, + AVG(events_sent) as avg_events_sent, + COUNT(DISTINCT client_ip) as unique_clients +FROM subscription_events +GROUP BY date(created_at, 'unixepoch') +ORDER BY date DESC; + +-- View for current active subscriptions (from log perspective) +CREATE VIEW active_subscriptions_log AS +SELECT + subscription_id, + client_ip, + filter_json, + events_sent, + created_at, + (strftime('%s', 'now') - created_at) as duration_seconds +FROM subscription_events +WHERE event_type = 'created' +AND subscription_id NOT IN ( + SELECT subscription_id FROM subscription_events + WHERE event_type IN ('closed', 'expired', 'disconnected') +); \ No newline at end of file diff --git a/docs/advanced_schema_design.md b/docs/advanced_schema_design.md deleted file mode 100644 index f0c8515..0000000 --- a/docs/advanced_schema_design.md +++ /dev/null @@ -1,337 +0,0 @@ -# Advanced Nostr Relay Schema Design - -## Overview - -This document outlines the design for an advanced multi-table schema that enforces Nostr protocol compliance at the database level, with separate tables for different event types based on their storage and replacement characteristics. - -## Event Type Classification - -Based on the Nostr specification, events are classified into four categories: - -### 1. Regular Events -- **Kinds**: `1000 <= n < 10000` || `4 <= n < 45` || `n == 1` || `n == 2` -- **Storage Policy**: All events stored permanently -- **Examples**: Text notes (1), Reposts (6), Reactions (7), Direct Messages (4) - -### 2. Replaceable Events -- **Kinds**: `10000 <= n < 20000` || `n == 0` || `n == 3` -- **Storage Policy**: Only latest per `(pubkey, kind)` combination -- **Replacement Logic**: Latest `created_at`, then lowest `id` lexically -- **Examples**: Metadata (0), Contacts (3), Mute List (10000) - -### 3. Ephemeral Events -- **Kinds**: `20000 <= n < 30000` -- **Storage Policy**: Not expected to be stored (optional temporary storage) -- **Examples**: Typing indicators, presence updates, ephemeral messages - -### 4. Addressable Events -- **Kinds**: `30000 <= n < 40000` -- **Storage Policy**: Only latest per `(pubkey, kind, d_tag)` combination -- **Replacement Logic**: Same as replaceable events -- **Examples**: Long-form content (30023), Application-specific data - -## SQLite JSON Capabilities Research - -SQLite provides powerful JSON functions that could be leveraged for tag storage: - -### Core JSON Functions -```sql --- Extract specific values -json_extract(column, '$.path') - --- Iterate through arrays -json_each(json_array_column) - --- Flatten nested structures -json_tree(json_column) - --- Validate JSON structure -json_valid(column) - --- Array operations -json_array_length(column) -json_extract(column, '$[0]') -- First element -``` - -### Tag Query Examples - -#### Find all 'e' tag references: -```sql -SELECT - id, - json_extract(value, '$[1]') as referenced_event_id, - json_extract(value, '$[2]') as relay_hint, - json_extract(value, '$[3]') as marker -FROM events, json_each(tags) -WHERE json_extract(value, '$[0]') = 'e'; -``` - -#### Find events with specific hashtags: -```sql -SELECT id, content -FROM events, json_each(tags) -WHERE json_extract(value, '$[0]') = 't' - AND json_extract(value, '$[1]') = 'bitcoin'; -``` - -#### Extract 'd' tag for addressable events: -```sql -SELECT - id, - json_extract(value, '$[1]') as d_tag_value -FROM events, json_each(tags) -WHERE json_extract(value, '$[0]') = 'd' -LIMIT 1; -``` - -### JSON Functional Indexes -```sql --- Index on hashtags -CREATE INDEX idx_hashtags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 't'; - --- Index on 'd' tags for addressable events -CREATE INDEX idx_d_tags ON events_addressable( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'd'; -``` - -## Proposed Schema Design - -### Option 1: Separate Tables with JSON Tags - -```sql --- Regular Events (permanent storage) -CREATE TABLE events_regular ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON, - first_seen INTEGER DEFAULT (strftime('%s', 'now')), - CONSTRAINT kind_regular CHECK ( - (kind >= 1000 AND kind < 10000) OR - (kind >= 4 AND kind < 45) OR - kind = 1 OR kind = 2 - ) -); - --- Replaceable Events (latest per pubkey+kind) -CREATE TABLE events_replaceable ( - pubkey TEXT NOT NULL, - kind INTEGER NOT NULL, - id TEXT NOT NULL, - created_at INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON, - replaced_at INTEGER DEFAULT (strftime('%s', 'now')), - PRIMARY KEY (pubkey, kind), - CONSTRAINT kind_replaceable CHECK ( - (kind >= 10000 AND kind < 20000) OR - kind = 0 OR kind = 3 - ) -); - --- Ephemeral Events (temporary/optional storage) -CREATE TABLE events_ephemeral ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON, - expires_at INTEGER DEFAULT (strftime('%s', 'now', '+1 hour')), - CONSTRAINT kind_ephemeral CHECK ( - kind >= 20000 AND kind < 30000 - ) -); - --- Addressable Events (latest per pubkey+kind+d_tag) -CREATE TABLE events_addressable ( - pubkey TEXT NOT NULL, - kind INTEGER NOT NULL, - d_tag TEXT NOT NULL, - id TEXT NOT NULL, - created_at INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON, - replaced_at INTEGER DEFAULT (strftime('%s', 'now')), - PRIMARY KEY (pubkey, kind, d_tag), - CONSTRAINT kind_addressable CHECK ( - kind >= 30000 AND kind < 40000 - ) -); -``` - -### Indexes for Performance - -```sql --- Regular events indexes -CREATE INDEX idx_regular_pubkey ON events_regular(pubkey); -CREATE INDEX idx_regular_kind ON events_regular(kind); -CREATE INDEX idx_regular_created_at ON events_regular(created_at); -CREATE INDEX idx_regular_kind_created_at ON events_regular(kind, created_at); - --- Replaceable events indexes -CREATE INDEX idx_replaceable_created_at ON events_replaceable(created_at); -CREATE INDEX idx_replaceable_id ON events_replaceable(id); - --- Ephemeral events indexes -CREATE INDEX idx_ephemeral_expires_at ON events_ephemeral(expires_at); -CREATE INDEX idx_ephemeral_pubkey ON events_ephemeral(pubkey); - --- Addressable events indexes -CREATE INDEX idx_addressable_created_at ON events_addressable(created_at); -CREATE INDEX idx_addressable_id ON events_addressable(id); - --- JSON tag indexes (examples) -CREATE INDEX idx_regular_e_tags ON events_regular( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'e'; - -CREATE INDEX idx_regular_p_tags ON events_regular( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'p'; -``` - -### Option 2: Unified Tag Table Approach - -```sql --- Unified tag storage (alternative to JSON) -CREATE TABLE tags_unified ( - event_id TEXT NOT NULL, - event_type TEXT NOT NULL, -- 'regular', 'replaceable', 'ephemeral', 'addressable' - tag_index INTEGER NOT NULL, -- Position in tag array - name TEXT NOT NULL, - value TEXT NOT NULL, - param_2 TEXT, -- Third element if present - param_3 TEXT, -- Fourth element if present - param_json TEXT, -- JSON for additional parameters - PRIMARY KEY (event_id, tag_index) -); - -CREATE INDEX idx_tags_name_value ON tags_unified(name, value); -CREATE INDEX idx_tags_event_type ON tags_unified(event_type); -``` - -## Implementation Strategy - -### 1. Kind Classification Function (C Code) -```c -typedef enum { - EVENT_TYPE_REGULAR, - EVENT_TYPE_REPLACEABLE, - EVENT_TYPE_EPHEMERAL, - EVENT_TYPE_ADDRESSABLE, - EVENT_TYPE_INVALID -} event_type_t; - -event_type_t classify_event_kind(int kind) { - if ((kind >= 1000 && kind < 10000) || - (kind >= 4 && kind < 45) || - kind == 1 || kind == 2) { - return EVENT_TYPE_REGULAR; - } - - if ((kind >= 10000 && kind < 20000) || - kind == 0 || kind == 3) { - return EVENT_TYPE_REPLACEABLE; - } - - if (kind >= 20000 && kind < 30000) { - return EVENT_TYPE_EPHEMERAL; - } - - if (kind >= 30000 && kind < 40000) { - return EVENT_TYPE_ADDRESSABLE; - } - - return EVENT_TYPE_INVALID; -} -``` - -### 2. Replacement Logic for Replaceable Events -```sql --- Trigger for replaceable events -CREATE TRIGGER replace_event_on_insert -BEFORE INSERT ON events_replaceable -FOR EACH ROW -WHEN EXISTS ( - SELECT 1 FROM events_replaceable - WHERE pubkey = NEW.pubkey AND kind = NEW.kind -) -BEGIN - DELETE FROM events_replaceable - WHERE pubkey = NEW.pubkey - AND kind = NEW.kind - AND ( - created_at < NEW.created_at OR - (created_at = NEW.created_at AND id > NEW.id) - ); -END; -``` - -### 3. D-Tag Extraction for Addressable Events -```c -char* extract_d_tag(cJSON* tags) { - if (!tags || !cJSON_IsArray(tags)) { - return NULL; - } - - cJSON* tag; - cJSON_ArrayForEach(tag, tags) { - if (cJSON_IsArray(tag) && cJSON_GetArraySize(tag) >= 2) { - cJSON* tag_name = cJSON_GetArrayItem(tag, 0); - cJSON* tag_value = cJSON_GetArrayItem(tag, 1); - - if (cJSON_IsString(tag_name) && cJSON_IsString(tag_value)) { - if (strcmp(cJSON_GetStringValue(tag_name), "d") == 0) { - return strdup(cJSON_GetStringValue(tag_value)); - } - } - } - } - - return strdup(""); // Default empty d-tag -} -``` - -## Advantages of This Design - -### 1. Protocol Compliance -- **Enforced at DB level**: Schema constraints prevent invalid event storage -- **Automatic replacement**: Triggers handle replaceable/addressable event logic -- **Type safety**: Separate tables ensure correct handling per event type - -### 2. Performance Benefits -- **Targeted indexes**: Each table optimized for its access patterns -- **Reduced storage**: Ephemeral events can be auto-expired -- **Query optimization**: SQLite can optimize queries per table structure - -### 3. JSON Tag Benefits -- **Atomic storage**: Tags stored with their event -- **Rich querying**: SQLite JSON functions enable complex tag queries -- **Schema flexibility**: Can handle arbitrary tag structures -- **Functional indexes**: Index specific tag patterns efficiently - -## Migration Strategy - -1. **Phase 1**: Create new schema alongside existing -2. **Phase 2**: Implement kind classification and routing logic -3. **Phase 3**: Migrate existing data to appropriate tables -4. **Phase 4**: Update application logic to use new tables -5. **Phase 5**: Drop old schema after verification - -## Next Steps for Implementation - -1. **Prototype JSON performance**: Create test database with sample data -2. **Benchmark query patterns**: Compare JSON vs normalized approaches -3. **Implement kind classification**: Add routing logic to C code -4. **Create migration scripts**: Handle existing data transformation -5. **Update test suite**: Verify compliance with new schema \ No newline at end of file diff --git a/docs/final_schema_recommendation.md b/docs/final_schema_recommendation.md deleted file mode 100644 index 75d6452..0000000 --- a/docs/final_schema_recommendation.md +++ /dev/null @@ -1,416 +0,0 @@ -# Final Schema Recommendation: Hybrid Single Table Approach - -## Executive Summary - -After analyzing the subscription query complexity, **the multi-table approach creates more problems than it solves**. REQ filters don't align with storage semantics - clients filter by kind, author, and tags regardless of event type classification. - -**Recommendation: Modified Single Table with Event Type Classification** - -## The Multi-Table Problem - -### REQ Filter Reality Check -- Clients send: `{"kinds": [1, 0, 30023], "authors": ["pubkey"], "#p": ["target"]}` -- Multi-table requires: 3 separate queries + UNION + complex ordering -- Single table requires: 1 query with simple WHERE conditions - -### Query Complexity Explosion -```sql --- Multi-table nightmare for simple filter -WITH results AS ( - SELECT * FROM events_regular WHERE kind = 1 AND pubkey = ? - UNION ALL - SELECT * FROM events_replaceable WHERE kind = 0 AND pubkey = ? - UNION ALL - SELECT * FROM events_addressable WHERE kind = 30023 AND pubkey = ? -) -SELECT r.* FROM results r -JOIN multiple_tag_tables t ON complex_conditions -ORDER BY created_at DESC, id ASC LIMIT ?; - --- vs Single table simplicity -SELECT e.* FROM events e, json_each(e.tags) t -WHERE e.kind IN (1, 0, 30023) - AND e.pubkey = ? - AND json_extract(t.value, '$[0]') = 'p' - AND json_extract(t.value, '$[1]') = ? -ORDER BY e.created_at DESC, e.id ASC LIMIT ?; -``` - -## Recommended Schema: Hybrid Approach - -### Core Design Philosophy -- **Single table for REQ query simplicity** -- **Event type classification for protocol compliance** -- **JSON tags for atomic storage and rich querying** -- **Partial unique constraints for replacement logic** - -### Schema Definition - -```sql -CREATE TABLE events ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - event_type TEXT NOT NULL CHECK (event_type IN ('regular', 'replaceable', 'ephemeral', 'addressable')), - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON NOT NULL DEFAULT '[]', - first_seen INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), - - -- Additional fields for addressable events - d_tag TEXT GENERATED ALWAYS AS ( - CASE - WHEN event_type = 'addressable' THEN - json_extract(tags, '$[*][1]') - FROM json_each(tags) - WHERE json_extract(value, '$[0]') = 'd' - LIMIT 1 - ELSE NULL - END - ) STORED, - - -- Replacement tracking - replaced_at INTEGER, - - -- Protocol compliance constraints - CONSTRAINT unique_replaceable - UNIQUE (pubkey, kind) - WHERE event_type = 'replaceable', - - CONSTRAINT unique_addressable - UNIQUE (pubkey, kind, d_tag) - WHERE event_type = 'addressable' AND d_tag IS NOT NULL -); -``` - -### Event Type Classification Function - -```sql --- Function to determine event type from kind -CREATE VIEW event_type_lookup AS -SELECT - CASE - WHEN (kind >= 1000 AND kind < 10000) OR - (kind >= 4 AND kind < 45) OR - kind = 1 OR kind = 2 THEN 'regular' - WHEN (kind >= 10000 AND kind < 20000) OR - kind = 0 OR kind = 3 THEN 'replaceable' - WHEN kind >= 20000 AND kind < 30000 THEN 'ephemeral' - WHEN kind >= 30000 AND kind < 40000 THEN 'addressable' - ELSE 'unknown' - END as event_type, - kind -FROM ( - -- Generate all possible kind values for lookup - WITH RECURSIVE kinds(kind) AS ( - SELECT 0 - UNION ALL - SELECT kind + 1 FROM kinds WHERE kind < 65535 - ) - SELECT kind FROM kinds -); -``` - -### Performance Indexes - -```sql --- Core query patterns -CREATE INDEX idx_events_pubkey ON events(pubkey); -CREATE INDEX idx_events_kind ON events(kind); -CREATE INDEX idx_events_created_at ON events(created_at DESC); -CREATE INDEX idx_events_event_type ON events(event_type); - --- Composite indexes for common filters -CREATE INDEX idx_events_pubkey_created_at ON events(pubkey, created_at DESC); -CREATE INDEX idx_events_kind_created_at ON events(kind, created_at DESC); -CREATE INDEX idx_events_type_created_at ON events(event_type, created_at DESC); - --- JSON tag indexes for common patterns -CREATE INDEX idx_events_e_tags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'e'; - -CREATE INDEX idx_events_p_tags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'p'; - -CREATE INDEX idx_events_hashtags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 't'; - --- Addressable events d_tag index -CREATE INDEX idx_events_d_tag ON events(d_tag) -WHERE event_type = 'addressable' AND d_tag IS NOT NULL; -``` - -### Replacement Logic Implementation - -#### Replaceable Events Trigger -```sql -CREATE TRIGGER handle_replaceable_events -BEFORE INSERT ON events -FOR EACH ROW -WHEN NEW.event_type = 'replaceable' -BEGIN - -- Delete older replaceable events with same pubkey+kind - DELETE FROM events - WHERE event_type = 'replaceable' - AND pubkey = NEW.pubkey - AND kind = NEW.kind - AND ( - created_at < NEW.created_at OR - (created_at = NEW.created_at AND id > NEW.id) - ); -END; -``` - -#### Addressable Events Trigger -```sql -CREATE TRIGGER handle_addressable_events -BEFORE INSERT ON events -FOR EACH ROW -WHEN NEW.event_type = 'addressable' -BEGIN - -- Delete older addressable events with same pubkey+kind+d_tag - DELETE FROM events - WHERE event_type = 'addressable' - AND pubkey = NEW.pubkey - AND kind = NEW.kind - AND d_tag = NEW.d_tag - AND ( - created_at < NEW.created_at OR - (created_at = NEW.created_at AND id > NEW.id) - ); -END; -``` - -## Implementation Strategy - -### C Code Integration - -#### Event Type Classification -```c -typedef enum { - EVENT_TYPE_REGULAR, - EVENT_TYPE_REPLACEABLE, - EVENT_TYPE_EPHEMERAL, - EVENT_TYPE_ADDRESSABLE, - EVENT_TYPE_UNKNOWN -} event_type_t; - -event_type_t classify_event_kind(int kind) { - if ((kind >= 1000 && kind < 10000) || - (kind >= 4 && kind < 45) || - kind == 1 || kind == 2) { - return EVENT_TYPE_REGULAR; - } - if ((kind >= 10000 && kind < 20000) || - kind == 0 || kind == 3) { - return EVENT_TYPE_REPLACEABLE; - } - if (kind >= 20000 && kind < 30000) { - return EVENT_TYPE_EPHEMERAL; - } - if (kind >= 30000 && kind < 40000) { - return EVENT_TYPE_ADDRESSABLE; - } - return EVENT_TYPE_UNKNOWN; -} - -const char* event_type_to_string(event_type_t type) { - switch (type) { - case EVENT_TYPE_REGULAR: return "regular"; - case EVENT_TYPE_REPLACEABLE: return "replaceable"; - case EVENT_TYPE_EPHEMERAL: return "ephemeral"; - case EVENT_TYPE_ADDRESSABLE: return "addressable"; - default: return "unknown"; - } -} -``` - -#### Simplified Event Storage -```c -int store_event(cJSON* event) { - // Extract fields - cJSON* id = cJSON_GetObjectItem(event, "id"); - cJSON* pubkey = cJSON_GetObjectItem(event, "pubkey"); - cJSON* created_at = cJSON_GetObjectItem(event, "created_at"); - cJSON* kind = cJSON_GetObjectItem(event, "kind"); - cJSON* content = cJSON_GetObjectItem(event, "content"); - cJSON* sig = cJSON_GetObjectItem(event, "sig"); - - // Classify event type - event_type_t type = classify_event_kind(cJSON_GetNumberValue(kind)); - - // Serialize tags to JSON - cJSON* tags = cJSON_GetObjectItem(event, "tags"); - char* tags_json = cJSON_Print(tags ? tags : cJSON_CreateArray()); - - // Single INSERT statement - database handles replacement via triggers - const char* sql = - "INSERT INTO events (id, pubkey, created_at, kind, event_type, content, sig, tags) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; - - sqlite3_stmt* stmt; - int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); - if (rc != SQLITE_OK) { - free(tags_json); - return -1; - } - - sqlite3_bind_text(stmt, 1, cJSON_GetStringValue(id), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 2, cJSON_GetStringValue(pubkey), -1, SQLITE_STATIC); - sqlite3_bind_int64(stmt, 3, (sqlite3_int64)cJSON_GetNumberValue(created_at)); - sqlite3_bind_int(stmt, 4, (int)cJSON_GetNumberValue(kind)); - sqlite3_bind_text(stmt, 5, event_type_to_string(type), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 6, cJSON_GetStringValue(content), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 7, cJSON_GetStringValue(sig), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 8, tags_json, -1, SQLITE_TRANSIENT); - - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - free(tags_json); - - return (rc == SQLITE_DONE) ? 0 : -1; -} -``` - -#### Simple REQ Query Building -```c -char* build_filter_query(cJSON* filter) { - // Build single query against events table - // Much simpler than multi-table approach - - GString* query = g_string_new("SELECT * FROM events WHERE 1=1"); - - // Handle ids filter - cJSON* ids = cJSON_GetObjectItem(filter, "ids"); - if (ids && cJSON_IsArray(ids)) { - g_string_append(query, " AND id IN ("); - // Add parameter placeholders - g_string_append(query, ")"); - } - - // Handle authors filter - cJSON* authors = cJSON_GetObjectItem(filter, "authors"); - if (authors && cJSON_IsArray(authors)) { - g_string_append(query, " AND pubkey IN ("); - // Add parameter placeholders - g_string_append(query, ")"); - } - - // Handle kinds filter - cJSON* kinds = cJSON_GetObjectItem(filter, "kinds"); - if (kinds && cJSON_IsArray(kinds)) { - g_string_append(query, " AND kind IN ("); - // Add parameter placeholders - g_string_append(query, ")"); - } - - // Handle tag filters (#e, #p, etc.) - cJSON* item; - cJSON_ArrayForEach(item, filter) { - char* key = item->string; - if (key && key[0] == '#' && strlen(key) == 2) { - char tag_name = key[1]; - g_string_append_printf(query, - " AND EXISTS (SELECT 1 FROM json_each(tags) " - "WHERE json_extract(value, '$[0]') = '%c' " - "AND json_extract(value, '$[1]') IN (", tag_name); - // Add parameter placeholders - g_string_append(query, "))"); - } - } - - // Handle time range - cJSON* since = cJSON_GetObjectItem(filter, "since"); - if (since) { - g_string_append(query, " AND created_at >= ?"); - } - - cJSON* until = cJSON_GetObjectItem(filter, "until"); - if (until) { - g_string_append(query, " AND created_at <= ?"); - } - - // Standard ordering and limit - g_string_append(query, " ORDER BY created_at DESC, id ASC"); - - cJSON* limit = cJSON_GetObjectItem(filter, "limit"); - if (limit) { - g_string_append(query, " LIMIT ?"); - } - - return g_string_free(query, FALSE); -} -``` - -## Benefits of This Approach - -### 1. Query Simplicity -- ✅ Single table = simple REQ queries -- ✅ No UNION complexity -- ✅ Familiar SQL patterns -- ✅ Easy LIMIT and ORDER BY handling - -### 2. Protocol Compliance -- ✅ Event type classification enforced -- ✅ Replacement logic via triggers -- ✅ Unique constraints prevent duplicates -- ✅ Proper handling of all event types - -### 3. Performance -- ✅ Unified indexes across all events -- ✅ No join overhead for basic queries -- ✅ JSON tag indexes for complex filters -- ✅ Single table scan for cross-kind queries - -### 4. Implementation Simplicity -- ✅ Minimal changes from current code -- ✅ Database handles replacement logic -- ✅ Simple event storage function -- ✅ No complex routing logic needed - -### 5. Future Flexibility -- ✅ Can add columns for new event types -- ✅ Can split tables later if needed -- ✅ Easy to add new indexes -- ✅ Extensible constraint system - -## Migration Path - -### Phase 1: Schema Update -1. Add `event_type` column to existing events table -2. Add JSON `tags` column -3. Create classification triggers -4. Add partial unique constraints - -### Phase 2: Data Migration -1. Classify existing events by kind -2. Convert existing tag table data to JSON -3. Verify constraint compliance -4. Update indexes - -### Phase 3: Code Updates -1. Update event storage to use new schema -2. Simplify REQ query building -3. Remove tag table JOIN logic -4. Test subscription filtering - -### Phase 4: Optimization -1. Monitor query performance -2. Add specialized indexes as needed -3. Tune replacement triggers -4. Consider ephemeral event cleanup - -## Conclusion - -This hybrid approach achieves the best of both worlds: - -- **Protocol compliance** through event type classification and constraints -- **Query simplicity** through unified storage -- **Performance** through targeted indexes -- **Implementation ease** through minimal complexity - -The multi-table approach, while theoretically cleaner, creates a subscription query nightmare that would significantly burden the implementation. The hybrid single-table approach provides all the benefits with manageable complexity. \ No newline at end of file diff --git a/docs/implementation_plan.md b/docs/implementation_plan.md deleted file mode 100644 index 2953f04..0000000 --- a/docs/implementation_plan.md +++ /dev/null @@ -1,326 +0,0 @@ -# Implementation Plan: Hybrid Schema Migration - -## Overview - -Migrating from the current two-table design (event + tag tables) to a single event table with JSON tags column and event type classification. - -## Current Schema → Target Schema - -### Current Schema (to be replaced) -```sql -CREATE TABLE event ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL -); - -CREATE TABLE tag ( - id TEXT NOT NULL, -- references event.id - name TEXT NOT NULL, - value TEXT NOT NULL, - parameters TEXT -); -``` - -### Target Schema (simplified from final recommendation) -```sql -CREATE TABLE events ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - event_type TEXT NOT NULL CHECK (event_type IN ('regular', 'replaceable', 'ephemeral', 'addressable')), - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON NOT NULL DEFAULT '[]', - first_seen INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), - - -- Optional: Protocol compliance constraints (can be added later) - CONSTRAINT unique_replaceable - UNIQUE (pubkey, kind) WHERE event_type = 'replaceable', - CONSTRAINT unique_addressable - UNIQUE (pubkey, kind, json_extract(tags, '$[?(@[0]=="d")][1]')) - WHERE event_type = 'addressable' -); -``` - -## Implementation Steps - -### Phase 1: Update Schema File - -**File**: `db/schema.sql` - -1. Replace current event table definition -2. Remove tag table completely -3. Add new indexes for performance -4. Add event type classification logic - -### Phase 2: Update C Code - -**File**: `src/main.c` - -1. Add event type classification function -2. Update `store_event()` function to use JSON tags -3. Update `retrieve_event()` function to return JSON tags -4. Remove all tag table related code -5. Update REQ query handling to use JSON tag queries - -### Phase 3: Update Database Initialization - -**File**: `db/init.sh` - -1. Update table count validation (expect 1 table instead of 2) -2. Update schema verification logic - -### Phase 4: Update Tests - -**File**: `tests/1_nip_test.sh` - -1. Verify events are stored with JSON tags -2. Test query functionality with new schema -3. Validate event type classification - -### Phase 5: Migration Strategy - -Create migration script to handle existing data (if any). - -## Detailed Implementation - -### 1. Event Type Classification - -```c -// Add to src/main.c -typedef enum { - EVENT_TYPE_REGULAR, - EVENT_TYPE_REPLACEABLE, - EVENT_TYPE_EPHEMERAL, - EVENT_TYPE_ADDRESSABLE, - EVENT_TYPE_UNKNOWN -} event_type_t; - -event_type_t classify_event_kind(int kind) { - if ((kind >= 1000 && kind < 10000) || - (kind >= 4 && kind < 45) || - kind == 1 || kind == 2) { - return EVENT_TYPE_REGULAR; - } - if ((kind >= 10000 && kind < 20000) || - kind == 0 || kind == 3) { - return EVENT_TYPE_REPLACEABLE; - } - if (kind >= 20000 && kind < 30000) { - return EVENT_TYPE_EPHEMERAL; - } - if (kind >= 30000 && kind < 40000) { - return EVENT_TYPE_ADDRESSABLE; - } - return EVENT_TYPE_UNKNOWN; -} - -const char* event_type_to_string(event_type_t type) { - switch (type) { - case EVENT_TYPE_REGULAR: return "regular"; - case EVENT_TYPE_REPLACEABLE: return "replaceable"; - case EVENT_TYPE_EPHEMERAL: return "ephemeral"; - case EVENT_TYPE_ADDRESSABLE: return "addressable"; - default: return "unknown"; - } -} -``` - -### 2. Updated store_event Function - -```c -// Replace existing store_event function -int store_event(cJSON* event) { - if (!g_db || !event) { - return -1; - } - - // Extract event fields - cJSON* id = cJSON_GetObjectItem(event, "id"); - cJSON* pubkey = cJSON_GetObjectItem(event, "pubkey"); - cJSON* created_at = cJSON_GetObjectItem(event, "created_at"); - cJSON* kind = cJSON_GetObjectItem(event, "kind"); - cJSON* content = cJSON_GetObjectItem(event, "content"); - cJSON* sig = cJSON_GetObjectItem(event, "sig"); - cJSON* tags = cJSON_GetObjectItem(event, "tags"); - - if (!id || !pubkey || !created_at || !kind || !content || !sig) { - log_error("Invalid event - missing required fields"); - return -1; - } - - // Classify event type - event_type_t type = classify_event_kind((int)cJSON_GetNumberValue(kind)); - - // Serialize tags to JSON (use empty array if no tags) - char* tags_json = NULL; - if (tags && cJSON_IsArray(tags)) { - tags_json = cJSON_Print(tags); - } else { - tags_json = strdup("[]"); - } - - if (!tags_json) { - log_error("Failed to serialize tags to JSON"); - return -1; - } - - // Single INSERT statement - const char* sql = - "INSERT INTO events (id, pubkey, created_at, kind, event_type, content, sig, tags) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; - - sqlite3_stmt* stmt; - int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); - if (rc != SQLITE_OK) { - log_error("Failed to prepare event insert statement"); - free(tags_json); - return -1; - } - - // Bind parameters - sqlite3_bind_text(stmt, 1, cJSON_GetStringValue(id), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 2, cJSON_GetStringValue(pubkey), -1, SQLITE_STATIC); - sqlite3_bind_int64(stmt, 3, (sqlite3_int64)cJSON_GetNumberValue(created_at)); - sqlite3_bind_int(stmt, 4, (int)cJSON_GetNumberValue(kind)); - sqlite3_bind_text(stmt, 5, event_type_to_string(type), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 6, cJSON_GetStringValue(content), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 7, cJSON_GetStringValue(sig), -1, SQLITE_STATIC); - sqlite3_bind_text(stmt, 8, tags_json, -1, SQLITE_TRANSIENT); - - // Execute statement - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - - if (rc != SQLITE_DONE) { - if (rc == SQLITE_CONSTRAINT) { - log_warning("Event already exists in database"); - free(tags_json); - return 0; // Not an error, just duplicate - } - char error_msg[256]; - snprintf(error_msg, sizeof(error_msg), "Failed to insert event: %s", sqlite3_errmsg(g_db)); - log_error(error_msg); - free(tags_json); - return -1; - } - - free(tags_json); - log_success("Event stored in database"); - return 0; -} -``` - -### 3. Updated retrieve_event Function - -```c -// Replace existing retrieve_event function -cJSON* retrieve_event(const char* event_id) { - if (!g_db || !event_id) { - return NULL; - } - - const char* sql = - "SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE id = ?"; - - sqlite3_stmt* stmt; - int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); - if (rc != SQLITE_OK) { - return NULL; - } - - sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC); - - cJSON* event = NULL; - if (sqlite3_step(stmt) == SQLITE_ROW) { - event = cJSON_CreateObject(); - - cJSON_AddStringToObject(event, "id", (char*)sqlite3_column_text(stmt, 0)); - cJSON_AddStringToObject(event, "pubkey", (char*)sqlite3_column_text(stmt, 1)); - cJSON_AddNumberToObject(event, "created_at", sqlite3_column_int64(stmt, 2)); - cJSON_AddNumberToObject(event, "kind", sqlite3_column_int(stmt, 3)); - cJSON_AddStringToObject(event, "content", (char*)sqlite3_column_text(stmt, 4)); - cJSON_AddStringToObject(event, "sig", (char*)sqlite3_column_text(stmt, 5)); - - // Parse tags JSON - const char* tags_json = (char*)sqlite3_column_text(stmt, 6); - if (tags_json) { - cJSON* tags = cJSON_Parse(tags_json); - if (tags) { - cJSON_AddItemToObject(event, "tags", tags); - } else { - cJSON_AddItemToObject(event, "tags", cJSON_CreateArray()); - } - } else { - cJSON_AddItemToObject(event, "tags", cJSON_CreateArray()); - } - } - - sqlite3_finalize(stmt); - return event; -} -``` - -## Migration Considerations - -### Handling Existing Data - -If there's existing data in the current schema: - -1. **Export existing events and tags** -2. **Transform tag data to JSON format** -3. **Classify events by kind** -4. **Import into new schema** - -### Backward Compatibility - -- API remains the same - events still have the same JSON structure -- Internal storage changes but external interface is unchanged -- Tests should pass with minimal modifications - -## Performance Optimizations - -### Essential Indexes - -```sql --- Core performance indexes -CREATE INDEX idx_events_pubkey ON events(pubkey); -CREATE INDEX idx_events_kind ON events(kind); -CREATE INDEX idx_events_created_at ON events(created_at DESC); -CREATE INDEX idx_events_event_type ON events(event_type); - --- Composite indexes for common query patterns -CREATE INDEX idx_events_kind_created_at ON events(kind, created_at DESC); -CREATE INDEX idx_events_pubkey_created_at ON events(pubkey, created_at DESC); - --- JSON tag indexes for common tag patterns -CREATE INDEX idx_events_e_tags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'e'; - -CREATE INDEX idx_events_p_tags ON events( - json_extract(tags, '$[*][1]') -) WHERE json_extract(tags, '$[*][0]') = 'p'; -``` - -## Next Steps - -1. **Switch to code mode** to implement the schema changes -2. **Update db/schema.sql** with new table definition -3. **Modify src/main.c** with new functions -4. **Update db/init.sh** for single table validation -5. **Test with existing test suite** - -This approach will provide: -- ✅ Simplified schema management -- ✅ Protocol compliance preparation -- ✅ JSON tag query capabilities -- ✅ Performance optimization opportunities -- ✅ Easy REQ subscription handling - -Ready to proceed with implementation? \ No newline at end of file diff --git a/docs/subscription_query_analysis.md b/docs/subscription_query_analysis.md deleted file mode 100644 index 3a43db5..0000000 --- a/docs/subscription_query_analysis.md +++ /dev/null @@ -1,331 +0,0 @@ -# Subscription Query Complexity Analysis - -## Overview - -This document analyzes how Nostr REQ subscription filters would be implemented across different schema designs, focusing on query complexity, performance implications, and implementation burden. - -## Nostr REQ Filter Specification Recap - -Clients send REQ messages with filters containing: -- **`ids`**: List of specific event IDs -- **`authors`**: List of pubkeys -- **`kinds`**: List of event kinds -- **`#`**: Tag filters (e.g., `#e` for event refs, `#p` for pubkey mentions) -- **`since`/`until`**: Time range filters -- **`limit`**: Maximum events to return - -### Key Filter Behaviors: -- **Multiple filters = OR logic**: Match any filter -- **Within filter = AND logic**: Match all specified conditions -- **Lists = IN logic**: Match any value in the list -- **Tag filters**: Must have at least one matching tag - -## Schema Comparison for REQ Handling - -### Current Simple Schema (Single Table) -```sql -CREATE TABLE event ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL -); - -CREATE TABLE tag ( - id TEXT NOT NULL, -- event ID - name TEXT NOT NULL, - value TEXT NOT NULL, - parameters TEXT -); -``` - -#### Sample REQ Query Implementation: -```sql --- Filter: {"authors": ["pubkey1", "pubkey2"], "kinds": [1, 6], "#p": ["target_pubkey"]} -SELECT DISTINCT e.* -FROM event e -WHERE e.pubkey IN ('pubkey1', 'pubkey2') - AND e.kind IN (1, 6) - AND EXISTS ( - SELECT 1 FROM tag t - WHERE t.id = e.id AND t.name = 'p' AND t.value = 'target_pubkey' - ) -ORDER BY e.created_at DESC, e.id ASC -LIMIT ?; -``` - -### Multi-Table Schema Challenge - -With separate tables (`events_regular`, `events_replaceable`, `events_ephemeral`, `events_addressable`), a REQ filter could potentially match events across ALL tables. - -#### Problem Example: -Filter: `{"kinds": [1, 0, 20001, 30023]}` -- Kind 1 → `events_regular` -- Kind 0 → `events_replaceable` -- Kind 20001 → `events_ephemeral` -- Kind 30023 → `events_addressable` - -This requires **4 separate queries + UNION**, significantly complicating the implementation. - -## Multi-Table Query Complexity - -### Scenario 1: Cross-Table Kind Filter -```sql --- Filter: {"kinds": [1, 0, 30023]} --- Requires querying 3 different tables - -SELECT id, pubkey, created_at, kind, content, sig FROM events_regular -WHERE kind = 1 -UNION ALL -SELECT id, pubkey, created_at, kind, content, sig FROM events_replaceable -WHERE kind = 0 -UNION ALL -SELECT id, pubkey, created_at, kind, content, sig FROM events_addressable -WHERE kind = 30023 -ORDER BY created_at DESC, id ASC -LIMIT ?; -``` - -### Scenario 2: Cross-Table Author Filter -```sql --- Filter: {"authors": ["pubkey1"]} --- Must check ALL tables for this author - -SELECT id, pubkey, created_at, kind, content, sig FROM events_regular -WHERE pubkey = 'pubkey1' -UNION ALL -SELECT id, pubkey, created_at, kind, content, sig FROM events_replaceable -WHERE pubkey = 'pubkey1' -UNION ALL -SELECT id, pubkey, created_at, kind, content, sig FROM events_ephemeral -WHERE pubkey = 'pubkey1' -UNION ALL -SELECT id, pubkey, created_at, kind, content, sig FROM events_addressable -WHERE pubkey = 'pubkey1' -ORDER BY created_at DESC, id ASC -LIMIT ?; -``` - -### Scenario 3: Complex Multi-Condition Filter -```sql --- Filter: {"authors": ["pubkey1"], "kinds": [1, 0], "#p": ["target"], "since": 1234567890} --- Extremely complex with multiple UNIONs and tag JOINs - -WITH regular_results AS ( - SELECT DISTINCT r.* - FROM events_regular r - JOIN tags_regular tr ON r.id = tr.event_id - WHERE r.pubkey = 'pubkey1' - AND r.kind = 1 - AND r.created_at >= 1234567890 - AND tr.name = 'p' AND tr.value = 'target' -), -replaceable_results AS ( - SELECT DISTINCT rp.* - FROM events_replaceable rp - JOIN tags_replaceable trp ON (rp.pubkey, rp.kind) = (trp.event_pubkey, trp.event_kind) - WHERE rp.pubkey = 'pubkey1' - AND rp.kind = 0 - AND rp.created_at >= 1234567890 - AND trp.name = 'p' AND trp.value = 'target' -) -SELECT * FROM regular_results -UNION ALL -SELECT * FROM replaceable_results -ORDER BY created_at DESC, id ASC -LIMIT ?; -``` - -## Implementation Burden Analysis - -### Single Table Approach -```c -// Simple - one query builder function -char* build_filter_query(cJSON* filters) { - // Build single SELECT with WHERE conditions - // Single ORDER BY and LIMIT - // One execution path -} -``` - -### Multi-Table Approach -```c -// Complex - requires routing and union logic -char* build_multi_table_query(cJSON* filters) { - // 1. Analyze kinds to determine which tables to query - // 2. Split filters per table type - // 3. Build separate queries for each table - // 4. Union results with complex ORDER BY - // 5. Handle LIMIT across UNION (tricky!) -} - -typedef struct { - bool needs_regular; - bool needs_replaceable; - bool needs_ephemeral; - bool needs_addressable; - cJSON* regular_filter; - cJSON* replaceable_filter; - cJSON* ephemeral_filter; - cJSON* addressable_filter; -} filter_routing_t; -``` - -### Query Routing Complexity - -For each REQ filter, we must: - -1. **Analyze kinds** → Determine which tables to query -2. **Split filters** → Create per-table filter conditions -3. **Handle tag filters** → Different tag table references per event type -4. **Union results** → Merge with proper ordering -5. **Apply LIMIT** → Complex with UNION queries - -## Performance Implications - -### Single Table Advantages: -- ✅ **Single query execution** -- ✅ **One index strategy** -- ✅ **Simple LIMIT handling** -- ✅ **Unified ORDER BY** -- ✅ **No UNION overhead** - -### Multi-Table Disadvantages: -- ❌ **Multiple query executions** -- ❌ **UNION sorting overhead** -- ❌ **Complex LIMIT application** -- ❌ **Index fragmentation across tables** -- ❌ **Result set merging complexity** - -## Specific REQ Filter Challenges - -### 1. LIMIT Handling with UNION -```sql --- WRONG: Limit applies to each subquery -(SELECT * FROM events_regular WHERE ... LIMIT 100) -UNION ALL -(SELECT * FROM events_replaceable WHERE ... LIMIT 100) --- Could return 200 events! - --- CORRECT: Limit applies to final result -SELECT * FROM ( - SELECT * FROM events_regular WHERE ... - UNION ALL - SELECT * FROM events_replaceable WHERE ... - ORDER BY created_at DESC, id ASC -) LIMIT 100; --- But this sorts ALL results before limiting! -``` - -### 2. Tag Filter Complexity -Each event type needs different tag table joins: -- `events_regular` → `tags_regular` -- `events_replaceable` → `tags_replaceable` (with composite key) -- `events_addressable` → `tags_addressable` (with composite key) -- `events_ephemeral` → `tags_ephemeral` - -### 3. Subscription State Management -With multiple tables, subscription state becomes complex: -- Which tables does this subscription monitor? -- How to efficiently check new events across tables? -- Different trigger/notification patterns per table - -## Alternative: Unified Event View - -### Hybrid Approach: Views Over Multi-Tables -```sql --- Create unified view for queries -CREATE VIEW all_events AS -SELECT - 'regular' as event_type, - id, pubkey, created_at, kind, content, sig -FROM events_regular -UNION ALL -SELECT - 'replaceable' as event_type, - id, pubkey, created_at, kind, content, sig -FROM events_replaceable -UNION ALL -SELECT - 'ephemeral' as event_type, - id, pubkey, created_at, kind, content, sig -FROM events_ephemeral -UNION ALL -SELECT - 'addressable' as event_type, - id, pubkey, created_at, kind, content, sig -FROM events_addressable; - --- Unified tag view -CREATE VIEW all_tags AS -SELECT event_id, name, value, parameters FROM tags_regular -UNION ALL -SELECT CONCAT(event_pubkey, ':', event_kind), name, value, parameters FROM tags_replaceable -UNION ALL -SELECT event_id, name, value, parameters FROM tags_ephemeral -UNION ALL -SELECT CONCAT(event_pubkey, ':', event_kind, ':', d_tag), name, value, parameters FROM tags_addressable; -``` - -### REQ Query Against Views: -```sql --- Much simpler - back to single-table complexity -SELECT DISTINCT e.* -FROM all_events e -JOIN all_tags t ON e.id = t.event_id -WHERE e.pubkey IN (?) - AND e.kind IN (?) - AND t.name = 'p' AND t.value = ? -ORDER BY e.created_at DESC, e.id ASC -LIMIT ?; -``` - -## Recommendation - -**The multi-table approach creates significant subscription query complexity that may outweigh its benefits.** - -### Key Issues: -1. **REQ filters don't map to event types** - clients filter by kind, author, tags, not storage semantics -2. **UNION query complexity** - much harder to optimize and implement -3. **Subscription management burden** - must monitor multiple tables -4. **Performance uncertainty** - UNION queries may be slower than single table - -### Alternative Recommendation: - -**Modified Single Table with Event Type Column:** - -```sql -CREATE TABLE events ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - event_type TEXT NOT NULL, -- 'regular', 'replaceable', 'ephemeral', 'addressable' - content TEXT NOT NULL, - sig TEXT NOT NULL, - tags JSON, - - -- Replaceable event fields - replaced_at INTEGER, - - -- Addressable event fields - d_tag TEXT, - - -- Unique constraints per event type - CONSTRAINT unique_replaceable - UNIQUE (pubkey, kind) WHERE event_type = 'replaceable', - CONSTRAINT unique_addressable - UNIQUE (pubkey, kind, d_tag) WHERE event_type = 'addressable' -); -``` - -### Benefits: -- ✅ **Simple REQ queries** - single table, familiar patterns -- ✅ **Type enforcement** - partial unique constraints handle replacement logic -- ✅ **Performance** - unified indexes, no UNIONs -- ✅ **Implementation simplicity** - minimal changes from current code -- ✅ **Future flexibility** - can split tables later if needed - -This approach gets the best of both worlds: protocol compliance through constraints, but query simplicity through unified storage. \ No newline at end of file diff --git a/relay.log b/relay.log index 53a1ad4..1bbce89 100644 --- a/relay.log +++ b/relay.log @@ -3,238 +3,3 @@ [INFO] Starting relay server... [INFO] Starting libwebsockets-based Nostr relay server... [SUCCESS] WebSocket relay started on ws://127.0.0.1:8888 -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (1) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 5 rows -[INFO] Total events sent: 5 -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 17 rows -[INFO] Total events sent: 17 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (1) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 7 rows -[INFO] Total events sent: 7 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (0) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 1 rows -[INFO] Total events sent: 1 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND pubkey IN ('aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4') ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 17 rows -[INFO] Total events sent: 17 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND created_at >= 1756983802 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 6 rows -[INFO] Total events sent: 6 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 17 rows -[INFO] Total events sent: 17 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (0,1) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 8 rows -[INFO] Total events sent: 8 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (1) ORDER BY created_at DESC LIMIT 1 -[INFO] Query returned 1 rows -[INFO] Total events sent: 1 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling EVENT message -[SUCCESS] Event stored in database -[SUCCESS] Event stored successfully -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 22 rows -[INFO] Total events sent: 22 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (1) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 9 rows -[INFO] Total events sent: 9 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (0) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 1 rows -[INFO] Total events sent: 1 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND pubkey IN ('aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4') ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 22 rows -[INFO] Total events sent: 22 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND created_at >= 1756983945 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 9 rows -[INFO] Total events sent: 9 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 22 rows -[INFO] Total events sent: 22 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (0,1) ORDER BY created_at DESC LIMIT 500 -[INFO] Query returned 10 rows -[INFO] Total events sent: 10 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed -[INFO] WebSocket connection established -[INFO] Received WebSocket message -[INFO] Handling REQ message -[INFO] Executing SQL: SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1 AND kind IN (1) ORDER BY created_at DESC LIMIT 1 -[INFO] Query returned 1 rows -[INFO] Total events sent: 1 -[INFO] Received WebSocket message -[INFO] Subscription closed -[INFO] WebSocket connection closed diff --git a/relay.pid b/relay.pid index 2557d0e..2f55dcf 100644 --- a/relay.pid +++ b/relay.pid @@ -1 +1 @@ -417956 +677168 diff --git a/src/main b/src/main index 7d051c4..179a61d 100755 Binary files a/src/main and b/src/main differ diff --git a/src/main.c b/src/main.c index cb24458..dc48275 100644 --- a/src/main.c +++ b/src/main.c @@ -1,3 +1,5 @@ + + #define _GNU_SOURCE #include #include @@ -13,24 +15,804 @@ #include "../nostr_core_lib/cjson/cJSON.h" #include "../nostr_core_lib/nostr_core/nostr_core.h" -// Configuration +// Server Configuration #define DEFAULT_PORT 8888 #define DEFAULT_HOST "127.0.0.1" #define DATABASE_PATH "db/c_nostr_relay.db" #define MAX_CLIENTS 100 -// Global state -static sqlite3* g_db = NULL; -static int g_server_running = 1; +// Persistent subscription system configuration +#define MAX_SUBSCRIPTIONS_PER_CLIENT 20 +#define MAX_TOTAL_SUBSCRIPTIONS 5000 +#define MAX_FILTERS_PER_SUBSCRIPTION 10 +#define SUBSCRIPTION_ID_MAX_LENGTH 64 +#define CLIENT_IP_MAX_LENGTH 64 // Color constants for logging #define RED "\033[31m" -#define GREEN "\033[32m" +#define GREEN "\033[32m" #define YELLOW "\033[33m" #define BLUE "\033[34m" #define BOLD "\033[1m" #define RESET "\033[0m" +// Global state +static sqlite3* g_db = NULL; +static int g_server_running = 1; +static struct lws_context *ws_context = NULL; + + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// DATA STRUCTURES +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + +// Forward declarations +typedef struct subscription_filter subscription_filter_t; +typedef struct subscription subscription_t; +typedef struct subscription_manager subscription_manager_t; + +// Subscription filter structure +struct subscription_filter { + // Filter criteria (all optional) + cJSON* kinds; // Array of event kinds [1,2,3] + cJSON* authors; // Array of author pubkeys + cJSON* ids; // Array of event IDs + long since; // Unix timestamp (0 = not set) + long until; // Unix timestamp (0 = not set) + int limit; // Result limit (0 = no limit) + cJSON* tag_filters; // Object with tag filters: {"#e": ["id1"], "#p": ["pubkey1"]} + + // Linked list for multiple filters per subscription + struct subscription_filter* next; +}; + +// Active subscription structure +struct subscription { + char id[SUBSCRIPTION_ID_MAX_LENGTH]; // Subscription ID + struct lws* wsi; // WebSocket connection handle + subscription_filter_t* filters; // Linked list of filters (OR'd together) + time_t created_at; // When subscription was created + int events_sent; // Counter for sent events + int active; // 1 = active, 0 = closed + + // Client info for logging + char client_ip[CLIENT_IP_MAX_LENGTH]; // Client IP address + + // Linked list pointers + struct subscription* next; // Next subscription globally + struct subscription* session_next; // Next subscription for this session +}; + +// Enhanced per-session data with subscription management +struct per_session_data { + int authenticated; + subscription_t* subscriptions; // Head of this session's subscription list + pthread_mutex_t session_lock; // Per-session thread safety + char client_ip[CLIENT_IP_MAX_LENGTH]; // Client IP for logging + int subscription_count; // Number of subscriptions for this session +}; + +// Global subscription manager +struct subscription_manager { + subscription_t* active_subscriptions; // Head of global subscription list + pthread_mutex_t subscriptions_lock; // Global thread safety + int total_subscriptions; // Current count + + // Configuration + int max_subscriptions_per_client; // Default: 20 + int max_total_subscriptions; // Default: 5000 + + // Statistics + uint64_t total_created; // Lifetime subscription count + uint64_t total_events_broadcast; // Lifetime event broadcast count +}; + +// Global subscription manager instance +static subscription_manager_t g_subscription_manager = { + .active_subscriptions = NULL, + .subscriptions_lock = PTHREAD_MUTEX_INITIALIZER, + .total_subscriptions = 0, + .max_subscriptions_per_client = MAX_SUBSCRIPTIONS_PER_CLIENT, + .max_total_subscriptions = MAX_TOTAL_SUBSCRIPTIONS, + .total_created = 0, + .total_events_broadcast = 0 +}; + +// Forward declarations for logging functions +void log_info(const char* message); +void log_success(const char* message); +void log_error(const char* message); +void log_warning(const char* message); + +// Forward declarations for subscription database logging +void log_subscription_created(const subscription_t* sub); +void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason); +void log_subscription_disconnected(const char* client_ip); +void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip); +void update_subscription_events_sent(const char* sub_id, int events_sent); + + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// PERSISTENT SUBSCRIPTIONS SYSTEM +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + +// Create a subscription filter from cJSON filter object +subscription_filter_t* create_subscription_filter(cJSON* filter_json) { + if (!filter_json || !cJSON_IsObject(filter_json)) { + return NULL; + } + + subscription_filter_t* filter = calloc(1, sizeof(subscription_filter_t)); + if (!filter) { + return NULL; + } + + // Copy filter criteria + cJSON* kinds = cJSON_GetObjectItem(filter_json, "kinds"); + if (kinds && cJSON_IsArray(kinds)) { + filter->kinds = cJSON_Duplicate(kinds, 1); + } + + cJSON* authors = cJSON_GetObjectItem(filter_json, "authors"); + if (authors && cJSON_IsArray(authors)) { + filter->authors = cJSON_Duplicate(authors, 1); + } + + cJSON* ids = cJSON_GetObjectItem(filter_json, "ids"); + if (ids && cJSON_IsArray(ids)) { + filter->ids = cJSON_Duplicate(ids, 1); + } + + cJSON* since = cJSON_GetObjectItem(filter_json, "since"); + if (since && cJSON_IsNumber(since)) { + filter->since = (long)cJSON_GetNumberValue(since); + } + + cJSON* until = cJSON_GetObjectItem(filter_json, "until"); + if (until && cJSON_IsNumber(until)) { + filter->until = (long)cJSON_GetNumberValue(until); + } + + cJSON* limit = cJSON_GetObjectItem(filter_json, "limit"); + if (limit && cJSON_IsNumber(limit)) { + filter->limit = (int)cJSON_GetNumberValue(limit); + } + + // Handle tag filters (e.g., {"#e": ["id1"], "#p": ["pubkey1"]}) + cJSON* item = NULL; + cJSON_ArrayForEach(item, filter_json) { + if (item->string && strlen(item->string) >= 2 && item->string[0] == '#') { + if (!filter->tag_filters) { + filter->tag_filters = cJSON_CreateObject(); + } + if (filter->tag_filters) { + cJSON_AddItemToObject(filter->tag_filters, item->string, cJSON_Duplicate(item, 1)); + } + } + } + + return filter; +} + +// Free a subscription filter +void free_subscription_filter(subscription_filter_t* filter) { + if (!filter) return; + + if (filter->kinds) cJSON_Delete(filter->kinds); + if (filter->authors) cJSON_Delete(filter->authors); + if (filter->ids) cJSON_Delete(filter->ids); + if (filter->tag_filters) cJSON_Delete(filter->tag_filters); + + if (filter->next) { + free_subscription_filter(filter->next); + } + + free(filter); +} + +// Create a new subscription +subscription_t* create_subscription(const char* sub_id, struct lws* wsi, cJSON* filters_array, const char* client_ip) { + if (!sub_id || !wsi || !filters_array) { + return NULL; + } + + subscription_t* sub = calloc(1, sizeof(subscription_t)); + if (!sub) { + return NULL; + } + + // Copy subscription ID (truncate if too long) + strncpy(sub->id, sub_id, SUBSCRIPTION_ID_MAX_LENGTH - 1); + sub->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0'; + + // Set WebSocket connection + sub->wsi = wsi; + + // Set client IP + if (client_ip) { + strncpy(sub->client_ip, client_ip, CLIENT_IP_MAX_LENGTH - 1); + sub->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0'; + } + + // Set timestamps and state + sub->created_at = time(NULL); + sub->events_sent = 0; + sub->active = 1; + + // Convert filters array to linked list + subscription_filter_t* filter_tail = NULL; + int filter_count = 0; + + if (cJSON_IsArray(filters_array)) { + cJSON* filter_json = NULL; + cJSON_ArrayForEach(filter_json, filters_array) { + if (filter_count >= MAX_FILTERS_PER_SUBSCRIPTION) { + log_warning("Maximum filters per subscription exceeded, ignoring excess filters"); + break; + } + + subscription_filter_t* filter = create_subscription_filter(filter_json); + if (filter) { + if (!sub->filters) { + sub->filters = filter; + filter_tail = filter; + } else { + filter_tail->next = filter; + filter_tail = filter; + } + filter_count++; + } + } + } + + if (filter_count == 0) { + log_error("No valid filters found for subscription"); + free(sub); + return NULL; + } + + return sub; +} + +// Free a subscription +void free_subscription(subscription_t* sub) { + if (!sub) return; + + if (sub->filters) { + free_subscription_filter(sub->filters); + } + + free(sub); +} + +// Add subscription to global manager (thread-safe) +int add_subscription_to_manager(subscription_t* sub) { + if (!sub) return -1; + + pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); + + // Check global limits + if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) { + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + log_error("Maximum total subscriptions reached"); + return -1; + } + + // Add to global list + sub->next = g_subscription_manager.active_subscriptions; + g_subscription_manager.active_subscriptions = sub; + g_subscription_manager.total_subscriptions++; + g_subscription_manager.total_created++; + + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + + // Log subscription creation to database + log_subscription_created(sub); + + char debug_msg[256]; + snprintf(debug_msg, sizeof(debug_msg), "Added subscription '%s' (total: %d)", + sub->id, g_subscription_manager.total_subscriptions); + log_info(debug_msg); + + return 0; +} + +// Remove subscription from global manager (thread-safe) +int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) { + if (!sub_id) return -1; + + pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); + + subscription_t** current = &g_subscription_manager.active_subscriptions; + + while (*current) { + subscription_t* sub = *current; + + // Match by ID and WebSocket connection + if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) { + // Remove from list + *current = sub->next; + g_subscription_manager.total_subscriptions--; + + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + + // Log subscription closure to database + log_subscription_closed(sub_id, sub->client_ip, "closed"); + + // Update events sent counter before freeing + update_subscription_events_sent(sub_id, sub->events_sent); + + char debug_msg[256]; + snprintf(debug_msg, sizeof(debug_msg), "Removed subscription '%s' (total: %d)", + sub_id, g_subscription_manager.total_subscriptions); + log_info(debug_msg); + + free_subscription(sub); + return 0; + } + + current = &(sub->next); + } + + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + + char debug_msg[256]; + snprintf(debug_msg, sizeof(debug_msg), "Subscription '%s' not found for removal", sub_id); + log_warning(debug_msg); + + return -1; +} + +// Check if an event matches a subscription filter +int event_matches_filter(cJSON* event, subscription_filter_t* filter) { + if (!event || !filter) { + return 0; + } + + // Check kinds filter + if (filter->kinds && cJSON_IsArray(filter->kinds)) { + cJSON* event_kind = cJSON_GetObjectItem(event, "kind"); + if (!event_kind || !cJSON_IsNumber(event_kind)) { + return 0; + } + + int event_kind_val = (int)cJSON_GetNumberValue(event_kind); + int kind_match = 0; + + cJSON* kind_item = NULL; + cJSON_ArrayForEach(kind_item, filter->kinds) { + if (cJSON_IsNumber(kind_item) && (int)cJSON_GetNumberValue(kind_item) == event_kind_val) { + kind_match = 1; + break; + } + } + + if (!kind_match) { + return 0; + } + } + + // Check authors filter + if (filter->authors && cJSON_IsArray(filter->authors)) { + cJSON* event_pubkey = cJSON_GetObjectItem(event, "pubkey"); + if (!event_pubkey || !cJSON_IsString(event_pubkey)) { + return 0; + } + + const char* event_pubkey_str = cJSON_GetStringValue(event_pubkey); + int author_match = 0; + + cJSON* author_item = NULL; + cJSON_ArrayForEach(author_item, filter->authors) { + if (cJSON_IsString(author_item)) { + const char* author_str = cJSON_GetStringValue(author_item); + // Support prefix matching (partial pubkeys) + if (strncmp(event_pubkey_str, author_str, strlen(author_str)) == 0) { + author_match = 1; + break; + } + } + } + + if (!author_match) { + return 0; + } + } + + // Check IDs filter + if (filter->ids && cJSON_IsArray(filter->ids)) { + cJSON* event_id = cJSON_GetObjectItem(event, "id"); + if (!event_id || !cJSON_IsString(event_id)) { + return 0; + } + + const char* event_id_str = cJSON_GetStringValue(event_id); + int id_match = 0; + + cJSON* id_item = NULL; + cJSON_ArrayForEach(id_item, filter->ids) { + if (cJSON_IsString(id_item)) { + const char* id_str = cJSON_GetStringValue(id_item); + // Support prefix matching (partial IDs) + if (strncmp(event_id_str, id_str, strlen(id_str)) == 0) { + id_match = 1; + break; + } + } + } + + if (!id_match) { + return 0; + } + } + + // Check since filter + if (filter->since > 0) { + cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at"); + if (!event_created_at || !cJSON_IsNumber(event_created_at)) { + return 0; + } + + long event_timestamp = (long)cJSON_GetNumberValue(event_created_at); + if (event_timestamp < filter->since) { + return 0; + } + } + + // Check until filter + if (filter->until > 0) { + cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at"); + if (!event_created_at || !cJSON_IsNumber(event_created_at)) { + return 0; + } + + long event_timestamp = (long)cJSON_GetNumberValue(event_created_at); + if (event_timestamp > filter->until) { + return 0; + } + } + + // Check tag filters (e.g., #e, #p tags) + if (filter->tag_filters && cJSON_IsObject(filter->tag_filters)) { + cJSON* event_tags = cJSON_GetObjectItem(event, "tags"); + if (!event_tags || !cJSON_IsArray(event_tags)) { + return 0; // Event has no tags but filter requires tags + } + + // Check each tag filter + cJSON* tag_filter = NULL; + cJSON_ArrayForEach(tag_filter, filter->tag_filters) { + if (!tag_filter->string || strlen(tag_filter->string) < 2 || tag_filter->string[0] != '#') { + continue; // Invalid tag filter + } + + const char* tag_name = tag_filter->string + 1; // Skip the '#' + + if (!cJSON_IsArray(tag_filter)) { + continue; // Tag filter must be an array + } + + int tag_match = 0; + + // Search through event tags for matching tag name and value + cJSON* event_tag = NULL; + cJSON_ArrayForEach(event_tag, event_tags) { + if (!cJSON_IsArray(event_tag) || cJSON_GetArraySize(event_tag) < 2) { + continue; // Invalid tag format + } + + cJSON* event_tag_name = cJSON_GetArrayItem(event_tag, 0); + cJSON* event_tag_value = cJSON_GetArrayItem(event_tag, 1); + + if (!cJSON_IsString(event_tag_name) || !cJSON_IsString(event_tag_value)) { + continue; + } + + // Check if tag name matches + if (strcmp(cJSON_GetStringValue(event_tag_name), tag_name) == 0) { + const char* event_tag_value_str = cJSON_GetStringValue(event_tag_value); + + // Check if any of the filter values match this tag value + cJSON* filter_value = NULL; + cJSON_ArrayForEach(filter_value, tag_filter) { + if (cJSON_IsString(filter_value)) { + const char* filter_value_str = cJSON_GetStringValue(filter_value); + // Support prefix matching for tag values + if (strncmp(event_tag_value_str, filter_value_str, strlen(filter_value_str)) == 0) { + tag_match = 1; + break; + } + } + } + + if (tag_match) { + break; + } + } + } + + if (!tag_match) { + return 0; // This tag filter didn't match, so the event doesn't match + } + } + } + + return 1; // All filters passed +} + +// Check if an event matches any filter in a subscription (filters are OR'd together) +int event_matches_subscription(cJSON* event, subscription_t* subscription) { + if (!event || !subscription || !subscription->filters) { + return 0; + } + + subscription_filter_t* filter = subscription->filters; + while (filter) { + if (event_matches_filter(event, filter)) { + return 1; // Match found (OR logic) + } + filter = filter->next; + } + + return 0; // No filters matched +} + +// Broadcast event to all matching subscriptions (thread-safe) +int broadcast_event_to_subscriptions(cJSON* event) { + if (!event) { + return 0; + } + + int broadcasts = 0; + + pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); + + subscription_t* sub = g_subscription_manager.active_subscriptions; + while (sub) { + if (sub->active && event_matches_subscription(event, sub)) { + // Create EVENT message for this subscription + cJSON* event_msg = cJSON_CreateArray(); + cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT")); + cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub->id)); + cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1)); + + char* msg_str = cJSON_Print(event_msg); + if (msg_str) { + size_t msg_len = strlen(msg_str); + unsigned char* buf = malloc(LWS_PRE + msg_len); + if (buf) { + memcpy(buf + LWS_PRE, msg_str, msg_len); + + // Send to WebSocket connection + int write_result = lws_write(sub->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT); + if (write_result >= 0) { + sub->events_sent++; + broadcasts++; + + // Log event broadcast to database (optional - can be disabled for performance) + cJSON* event_id_obj = cJSON_GetObjectItem(event, "id"); + if (event_id_obj && cJSON_IsString(event_id_obj)) { + log_event_broadcast(cJSON_GetStringValue(event_id_obj), sub->id, sub->client_ip); + } + } + + free(buf); + } + free(msg_str); + } + + cJSON_Delete(event_msg); + } + + sub = sub->next; + } + + // Update global statistics + g_subscription_manager.total_events_broadcast += broadcasts; + + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); + + if (broadcasts > 0) { + char debug_msg[256]; + snprintf(debug_msg, sizeof(debug_msg), "Broadcasted event to %d subscriptions", broadcasts); + log_info(debug_msg); + } + + return broadcasts; +} + + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// SUBSCRIPTION DATABASE LOGGING +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + +// Log subscription creation to database +void log_subscription_created(const subscription_t* sub) { + if (!g_db || !sub) return; + + // Create filter JSON for logging + char* filter_json = NULL; + if (sub->filters) { + cJSON* filters_array = cJSON_CreateArray(); + subscription_filter_t* filter = sub->filters; + + while (filter) { + cJSON* filter_obj = cJSON_CreateObject(); + + if (filter->kinds) { + cJSON_AddItemToObject(filter_obj, "kinds", cJSON_Duplicate(filter->kinds, 1)); + } + if (filter->authors) { + cJSON_AddItemToObject(filter_obj, "authors", cJSON_Duplicate(filter->authors, 1)); + } + if (filter->ids) { + cJSON_AddItemToObject(filter_obj, "ids", cJSON_Duplicate(filter->ids, 1)); + } + if (filter->since > 0) { + cJSON_AddNumberToObject(filter_obj, "since", filter->since); + } + if (filter->until > 0) { + cJSON_AddNumberToObject(filter_obj, "until", filter->until); + } + if (filter->limit > 0) { + cJSON_AddNumberToObject(filter_obj, "limit", filter->limit); + } + if (filter->tag_filters) { + cJSON* tags_obj = cJSON_Duplicate(filter->tag_filters, 1); + cJSON* item = NULL; + cJSON_ArrayForEach(item, tags_obj) { + if (item->string) { + cJSON_AddItemToObject(filter_obj, item->string, cJSON_Duplicate(item, 1)); + } + } + cJSON_Delete(tags_obj); + } + + cJSON_AddItemToArray(filters_array, filter_obj); + filter = filter->next; + } + + filter_json = cJSON_Print(filters_array); + cJSON_Delete(filters_array); + } + + const char* sql = + "INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) " + "VALUES (?, ?, 'created', ?)"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT); + + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } + + if (filter_json) free(filter_json); +} + +// Log subscription closure to database +void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason) { + (void)reason; // Mark as intentionally unused + if (!g_db || !sub_id) return; + + const char* sql = + "INSERT INTO subscription_events (subscription_id, client_ip, event_type) " + "VALUES (?, ?, 'closed')"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 2, client_ip ? client_ip : "unknown", -1, SQLITE_STATIC); + + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } + + // Update the corresponding 'created' entry with end time and events sent + const char* update_sql = + "UPDATE subscription_events " + "SET ended_at = strftime('%s', 'now') " + "WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL"; + + rc = sqlite3_prepare_v2(g_db, update_sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC); + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } +} + +// Log subscription disconnection to database +void log_subscription_disconnected(const char* client_ip) { + if (!g_db || !client_ip) return; + + // Mark all active subscriptions for this client as disconnected + const char* sql = + "UPDATE subscription_events " + "SET ended_at = strftime('%s', 'now') " + "WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC); + int changes = sqlite3_changes(g_db); + sqlite3_step(stmt); + sqlite3_finalize(stmt); + + if (changes > 0) { + // Log a disconnection event + const char* insert_sql = + "INSERT INTO subscription_events (subscription_id, client_ip, event_type) " + "VALUES ('disconnect', ?, 'disconnected')"; + + rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC); + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } + } + } +} + +// Log event broadcast to database (optional, can be resource intensive) +void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip) { + if (!g_db || !event_id || !sub_id || !client_ip) return; + + const char* sql = + "INSERT INTO event_broadcasts (event_id, subscription_id, client_ip) " + "VALUES (?, ?, ?)"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 3, client_ip, -1, SQLITE_STATIC); + + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } +} + +// Update events sent counter for a subscription +void update_subscription_events_sent(const char* sub_id, int events_sent) { + if (!g_db || !sub_id) return; + + const char* sql = + "UPDATE subscription_events " + "SET events_sent = ? " + "WHERE subscription_id = ? AND event_type = 'created'"; + + sqlite3_stmt* stmt; + int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); + if (rc == SQLITE_OK) { + sqlite3_bind_int(stmt, 1, events_sent); + sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC); + + sqlite3_step(stmt); + sqlite3_finalize(stmt); + } +} + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// LOGGING FUNCTIONS +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + // Logging functions void log_info(const char* message) { printf(BLUE "[INFO]" RESET " %s\n", message); @@ -60,6 +842,12 @@ void signal_handler(int sig) { } } +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// DATABASE FUNCTIONS +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + // Initialize database connection int init_database() { int rc = sqlite3_open(DATABASE_PATH, &g_db); @@ -200,7 +988,12 @@ int store_event(cJSON* event) { return 0; } -// Retrieve event from database +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// EVENT STORAGE AND RETRIEVAL +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + cJSON* retrieve_event(const char* event_id) { if (!g_db || !event_id) { return NULL; @@ -246,15 +1039,90 @@ cJSON* retrieve_event(const char* event_id) { return event; } -// Handle REQ message (subscription) - send events matching filters -int handle_req_message(const char* sub_id, cJSON* filters, struct lws *wsi) { - log_info("Handling REQ message"); + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// SUBSCRIPTION HANDLERS +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + +int handle_req_message(const char* sub_id, cJSON* filters, struct lws *wsi, struct per_session_data *pss) { + log_info("Handling REQ message for persistent subscription"); if (!cJSON_IsArray(filters)) { log_error("REQ filters is not an array"); return 0; } + // Check session subscription limits + if (pss && pss->subscription_count >= MAX_SUBSCRIPTIONS_PER_CLIENT) { + log_error("Maximum subscriptions per client exceeded"); + + // Send CLOSED notice + cJSON* closed_msg = cJSON_CreateArray(); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString("CLOSED")); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString(sub_id)); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString("error: too many subscriptions")); + + char* closed_str = cJSON_Print(closed_msg); + if (closed_str) { + size_t closed_len = strlen(closed_str); + unsigned char* buf = malloc(LWS_PRE + closed_len); + if (buf) { + memcpy(buf + LWS_PRE, closed_str, closed_len); + lws_write(wsi, buf + LWS_PRE, closed_len, LWS_WRITE_TEXT); + free(buf); + } + free(closed_str); + } + cJSON_Delete(closed_msg); + + return 0; + } + + // Create persistent subscription + subscription_t* subscription = create_subscription(sub_id, wsi, filters, pss ? pss->client_ip : "unknown"); + if (!subscription) { + log_error("Failed to create subscription"); + return 0; + } + + // Add to global manager + if (add_subscription_to_manager(subscription) != 0) { + log_error("Failed to add subscription to global manager"); + free_subscription(subscription); + + // Send CLOSED notice + cJSON* closed_msg = cJSON_CreateArray(); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString("CLOSED")); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString(sub_id)); + cJSON_AddItemToArray(closed_msg, cJSON_CreateString("error: subscription limit reached")); + + char* closed_str = cJSON_Print(closed_msg); + if (closed_str) { + size_t closed_len = strlen(closed_str); + unsigned char* buf = malloc(LWS_PRE + closed_len); + if (buf) { + memcpy(buf + LWS_PRE, closed_str, closed_len); + lws_write(wsi, buf + LWS_PRE, closed_len, LWS_WRITE_TEXT); + free(buf); + } + free(closed_str); + } + cJSON_Delete(closed_msg); + + return 0; + } + + // Add to session's subscription list (if session data available) + if (pss) { + pthread_mutex_lock(&pss->session_lock); + subscription->session_next = pss->subscriptions; + pss->subscriptions = subscription; + pss->subscription_count++; + pthread_mutex_unlock(&pss->session_lock); + } + int events_sent = 0; // Process each filter in the array @@ -454,14 +1322,13 @@ int handle_event_message(cJSON* event) { return -1; } -// Global WebSocket context -static struct lws_context *ws_context = NULL; -// Per-session data structure -struct per_session_data { - int authenticated; - char subscription_id[64]; -}; + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// WEBSOCKET PROTOCOL +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// // WebSocket callback function for Nostr relay protocol static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reason, @@ -472,6 +1339,10 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso case LWS_CALLBACK_ESTABLISHED: log_info("WebSocket connection established"); memset(pss, 0, sizeof(*pss)); + pthread_mutex_init(&pss->session_lock, NULL); + // TODO: Get real client IP address + strncpy(pss->client_ip, "127.0.0.1", CLIENT_IP_MAX_LENGTH - 1); + pss->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0'; break; case LWS_CALLBACK_RECEIVE: @@ -497,6 +1368,11 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso if (event && cJSON_IsObject(event)) { int result = handle_event_message(event); + // Broadcast event to matching persistent subscriptions + if (result == 0) { + broadcast_event_to_subscriptions(event); + } + // Send OK response cJSON* event_id = cJSON_GetObjectItem(event, "id"); if (event_id && cJSON_IsString(event_id)) { @@ -526,7 +1402,6 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso if (sub_id && cJSON_IsString(sub_id)) { const char* subscription_id = cJSON_GetStringValue(sub_id); - strncpy(pss->subscription_id, subscription_id, sizeof(pss->subscription_id) - 1); // Create array of filter objects from position 2 onwards cJSON* filters = cJSON_CreateArray(); @@ -538,7 +1413,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso } } - handle_req_message(subscription_id, filters, wsi); + handle_req_message(subscription_id, filters, wsi, pss); // Clean up the filters array we created cJSON_Delete(filters); @@ -563,7 +1438,35 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso } } else if (strcmp(msg_type, "CLOSE") == 0) { // Handle CLOSE message - log_info("Subscription closed"); + cJSON* sub_id = cJSON_GetArrayItem(json, 1); + if (sub_id && cJSON_IsString(sub_id)) { + const char* subscription_id = cJSON_GetStringValue(sub_id); + + // Remove from global manager + remove_subscription_from_manager(subscription_id, wsi); + + // Remove from session list if present + if (pss) { + pthread_mutex_lock(&pss->session_lock); + + subscription_t** current = &pss->subscriptions; + while (*current) { + if (strcmp((*current)->id, subscription_id) == 0) { + subscription_t* to_remove = *current; + *current = to_remove->session_next; + pss->subscription_count--; + break; + } + current = &((*current)->session_next); + } + + pthread_mutex_unlock(&pss->session_lock); + } + + char debug_msg[256]; + snprintf(debug_msg, sizeof(debug_msg), "Closed subscription: %s", subscription_id); + log_info(debug_msg); + } } } } @@ -576,6 +1479,24 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso case LWS_CALLBACK_CLOSED: log_info("WebSocket connection closed"); + + // Clean up session subscriptions + if (pss) { + pthread_mutex_lock(&pss->session_lock); + + subscription_t* sub = pss->subscriptions; + while (sub) { + subscription_t* next = sub->session_next; + remove_subscription_from_manager(sub->id, wsi); + sub = next; + } + + pss->subscriptions = NULL; + pss->subscription_count = 0; + + pthread_mutex_unlock(&pss->session_lock); + pthread_mutex_destroy(&pss->session_lock); + } break; default: @@ -650,6 +1571,15 @@ int start_websocket_relay() { return 0; } + + + +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// +// MAIN PROGRAM +///////////////////////////////////////////////////////////////////////////////////////// +///////////////////////////////////////////////////////////////////////////////////////// + // Print usage information void print_usage(const char* program_name) { printf("Usage: %s [OPTIONS]\n", program_name); diff --git a/tests/subscribe_all.sh b/tests/subscribe_all.sh new file mode 100755 index 0000000..4c48bbf --- /dev/null +++ b/tests/subscribe_all.sh @@ -0,0 +1,199 @@ +#!/bin/bash + +# Persistent Subscription Test Script +# Subscribes to all events in the relay and prints them as they arrive in real-time +# This tests the persistent subscription functionality of the C-Relay + +set -e # Exit on any error + +# Color constants +RED='\033[31m' +GREEN='\033[32m' +YELLOW='\033[33m' +BLUE='\033[34m' +BOLD='\033[1m' +RESET='\033[0m' + +# Test configuration +RELAY_URL="ws://127.0.0.1:8888" +SUBSCRIPTION_ID="persistent_test_$(date +%s)" + +# Print functions +print_header() { + echo -e "${BLUE}${BOLD}=== $1 ===${RESET}" +} + +print_info() { + echo -e "${BLUE}[INFO]${RESET} $1" +} + +print_success() { + echo -e "${GREEN}✓${RESET} $1" +} + +print_error() { + echo -e "${RED}✗${RESET} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${RESET} $1" +} + +print_event() { + echo -e "${GREEN}[EVENT]${RESET} $1" +} + +# Cleanup function +cleanup() { + print_info "Cleaning up..." + if [[ -n "$WEBSOCAT_PID" ]]; then + kill "$WEBSOCAT_PID" 2>/dev/null || true + wait "$WEBSOCAT_PID" 2>/dev/null || true + fi + + # Send CLOSE message to clean up subscription on relay + if command -v websocat &> /dev/null; then + echo "[\"CLOSE\",\"$SUBSCRIPTION_ID\"]" | timeout 2s websocat "$RELAY_URL" 2>/dev/null || true + fi + + print_info "Cleanup complete" + exit 0 +} + +# Set up signal handlers +trap cleanup SIGINT SIGTERM + +# Parse events from relay responses +parse_events() { + while IFS= read -r line; do + # Check if this is an EVENT message + if echo "$line" | jq -e '. | type == "array" and length >= 3 and .[0] == "EVENT"' >/dev/null 2>&1; then + # Extract event details + local event_id=$(echo "$line" | jq -r '.[2].id' 2>/dev/null || echo "unknown") + local event_kind=$(echo "$line" | jq -r '.[2].kind' 2>/dev/null || echo "unknown") + local event_content=$(echo "$line" | jq -r '.[2].content' 2>/dev/null || echo "") + local event_pubkey=$(echo "$line" | jq -r '.[2].pubkey' 2>/dev/null || echo "unknown") + local event_created_at=$(echo "$line" | jq -r '.[2].created_at' 2>/dev/null || echo "unknown") + local event_tags=$(echo "$line" | jq -r '.[2].tags | length' 2>/dev/null || echo "0") + + # Convert timestamp to readable format + local readable_time="unknown" + if [[ "$event_created_at" != "unknown" && "$event_created_at" =~ ^[0-9]+$ ]]; then + readable_time=$(date -d "@$event_created_at" "+%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "$event_created_at") + fi + + # Print formatted event + print_event "Kind: $event_kind | ID: ${event_id:0:16}... | Author: ${event_pubkey:0:16}..." + echo -e " ${YELLOW}Time:${RESET} $readable_time | ${YELLOW}Tags:${RESET} $event_tags" + + # Show content (truncated if too long) + if [[ -n "$event_content" ]]; then + local truncated_content="${event_content:0:100}" + if [[ ${#event_content} -gt 100 ]]; then + truncated_content="${truncated_content}..." + fi + echo -e " ${YELLOW}Content:${RESET} $truncated_content" + fi + echo # Blank line for readability + + elif echo "$line" | jq -e '. | type == "array" and length >= 2 and .[0] == "EOSE"' >/dev/null 2>&1; then + # End of stored events + local sub_id=$(echo "$line" | jq -r '.[1]' 2>/dev/null) + print_info "End of stored events for subscription: $sub_id" + print_success "Persistent subscription is now active - waiting for new events..." + echo + + elif echo "$line" | jq -e '. | type == "array" and length >= 3 and .[0] == "CLOSED"' >/dev/null 2>&1; then + # Subscription closed + local sub_id=$(echo "$line" | jq -r '.[1]' 2>/dev/null) + local reason=$(echo "$line" | jq -r '.[2]' 2>/dev/null) + print_warning "Subscription $sub_id was closed: $reason" + + elif echo "$line" | jq -e '. | type == "array" and length >= 4 and .[0] == "OK"' >/dev/null 2>&1; then + # OK response to event publishing + local event_id=$(echo "$line" | jq -r '.[1]' 2>/dev/null) + local success=$(echo "$line" | jq -r '.[2]' 2>/dev/null) + local message=$(echo "$line" | jq -r '.[3]' 2>/dev/null) + if [[ "$success" == "true" ]]; then + print_success "Event published: ${event_id:0:16}..." + else + print_error "Event publish failed: ${event_id:0:16}... - $message" + fi + + else + # Unknown message type - just show it + print_info "Relay message: $line" + fi + done +} + +# Main function +main() { + print_header "Persistent Subscription Test - Subscribe to All Events" + + # Check dependencies + if ! command -v websocat &> /dev/null; then + print_error "websocat command not found" + print_info "Please install websocat for testing" + return 1 + fi + if ! command -v jq &> /dev/null; then + print_error "jq command not found" + print_info "Please install jq for JSON processing" + return 1 + fi + + print_info "Subscription ID: $SUBSCRIPTION_ID" + print_info "Relay URL: $RELAY_URL" + print_info "Filter: {} (all events)" + echo + + # Create REQ message to subscribe to all events + local req_message="[\"REQ\",\"$SUBSCRIPTION_ID\",{}]" + + print_info "Establishing persistent subscription..." + print_info "Press Ctrl+C to stop and cleanup" + echo + + # Start websocat connection and keep it open + { + echo "$req_message" + # Keep the connection alive by sleeping indefinitely + # The connection will receive events as they come in + while true; do + sleep 1 + done + } | websocat "$RELAY_URL" | parse_events & + + # Store the background process ID + WEBSOCAT_PID=$! + + # Wait for the background process (which runs indefinitely) + # This will exit when we get a signal (Ctrl+C) + wait "$WEBSOCAT_PID" 2>/dev/null || true +} + +# Usage information +usage() { + echo "Usage: $0" + echo + echo "This script creates a persistent subscription to all events on the relay" + echo "and displays them in real-time as they arrive. Perfect for testing" + echo "the persistent subscription functionality." + echo + echo "To test:" + echo "1. Run this script in one terminal" + echo "2. Run 'tests/1_nip_test.sh' in another terminal" + echo "3. Watch events appear in real-time in this terminal" + echo + echo "Press Ctrl+C to stop and cleanup the subscription." +} + +# Handle help flag +if [[ "$1" == "-h" || "$1" == "--help" ]]; then + usage + exit 0 +fi + +# Run main function +main "$@" \ No newline at end of file