#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include "subscriptions.h" // Forward declarations for logging functions void log_info(const char* message); void log_error(const char* message); void log_warning(const char* message); // Forward declarations for configuration functions const char* get_config_value(const char* key); // Forward declarations for NIP-40 expiration functions int is_event_expired(cJSON* event, time_t current_time); // Forward declarations for filter validation int validate_filter_values(cJSON* filter_json, char* error_message, size_t error_size); int validate_hex_string(const char* str, size_t expected_len, const char* field_name, char* error_message, size_t error_size); int validate_timestamp_range(long since, long until, char* error_message, size_t error_size); int validate_numeric_limits(int limit, char* error_message, size_t error_size); int validate_search_term(const char* search_term, char* error_message, size_t error_size); // Global database variable extern sqlite3* g_db; // Global unified cache extern unified_config_cache_t g_unified_cache; // Global subscription manager extern subscription_manager_t g_subscription_manager; ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// // PERSISTENT SUBSCRIPTIONS SYSTEM ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// // Create a subscription filter from cJSON filter object subscription_filter_t* create_subscription_filter(cJSON* filter_json) { if (!filter_json || !cJSON_IsObject(filter_json)) { return NULL; } // Validate filter values before creating the filter char error_message[512] = {0}; if (!validate_filter_values(filter_json, error_message, sizeof(error_message))) { log_warning(error_message); return NULL; } subscription_filter_t* filter = calloc(1, sizeof(subscription_filter_t)); if (!filter) { return NULL; } // Copy filter criteria cJSON* kinds = cJSON_GetObjectItem(filter_json, "kinds"); if (kinds && cJSON_IsArray(kinds)) { filter->kinds = cJSON_Duplicate(kinds, 1); } cJSON* authors = cJSON_GetObjectItem(filter_json, "authors"); if (authors && cJSON_IsArray(authors)) { filter->authors = cJSON_Duplicate(authors, 1); } cJSON* ids = cJSON_GetObjectItem(filter_json, "ids"); if (ids && cJSON_IsArray(ids)) { filter->ids = cJSON_Duplicate(ids, 1); } cJSON* since = cJSON_GetObjectItem(filter_json, "since"); if (since && cJSON_IsNumber(since)) { filter->since = (long)cJSON_GetNumberValue(since); } cJSON* until = cJSON_GetObjectItem(filter_json, "until"); if (until && cJSON_IsNumber(until)) { filter->until = (long)cJSON_GetNumberValue(until); } cJSON* limit = cJSON_GetObjectItem(filter_json, "limit"); if (limit && cJSON_IsNumber(limit)) { filter->limit = (int)cJSON_GetNumberValue(limit); } // Handle tag filters (e.g., {"#e": ["id1"], "#p": ["pubkey1"]}) cJSON* item = NULL; cJSON_ArrayForEach(item, filter_json) { if (item->string && strlen(item->string) >= 2 && item->string[0] == '#') { if (!filter->tag_filters) { filter->tag_filters = cJSON_CreateObject(); } if (filter->tag_filters) { cJSON_AddItemToObject(filter->tag_filters, item->string, cJSON_Duplicate(item, 1)); } } } return filter; } // Free a subscription filter void free_subscription_filter(subscription_filter_t* filter) { if (!filter) return; if (filter->kinds) cJSON_Delete(filter->kinds); if (filter->authors) cJSON_Delete(filter->authors); if (filter->ids) cJSON_Delete(filter->ids); if (filter->tag_filters) cJSON_Delete(filter->tag_filters); if (filter->next) { free_subscription_filter(filter->next); } free(filter); } // Validate subscription ID format and length static int validate_subscription_id(const char* sub_id) { if (!sub_id) { return 0; // NULL pointer } size_t len = strlen(sub_id); if (len == 0 || len >= SUBSCRIPTION_ID_MAX_LENGTH) { return 0; // Empty or too long } // Check for valid characters (alphanumeric, underscore, hyphen) for (size_t i = 0; i < len; i++) { char c = sub_id[i]; if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '-')) { return 0; // Invalid character } } return 1; // Valid } // Create a new subscription subscription_t* create_subscription(const char* sub_id, struct lws* wsi, cJSON* filters_array, const char* client_ip) { if (!sub_id || !wsi || !filters_array) { log_error("create_subscription: NULL parameter(s)"); return NULL; } // Validate subscription ID if (!validate_subscription_id(sub_id)) { log_error("create_subscription: invalid subscription ID format or length"); return NULL; } subscription_t* sub = calloc(1, sizeof(subscription_t)); if (!sub) { log_error("create_subscription: failed to allocate subscription"); return NULL; } // Copy subscription ID safely (already validated) size_t id_len = strlen(sub_id); memcpy(sub->id, sub_id, id_len); sub->id[id_len] = '\0'; // Set WebSocket connection sub->wsi = wsi; // Set client IP safely if (client_ip) { size_t ip_len = strlen(client_ip); if (ip_len >= CLIENT_IP_MAX_LENGTH) { ip_len = CLIENT_IP_MAX_LENGTH - 1; } memcpy(sub->client_ip, client_ip, ip_len); sub->client_ip[ip_len] = '\0'; } else { sub->client_ip[0] = '\0'; // Ensure null termination } // Set timestamps and state sub->created_at = time(NULL); sub->events_sent = 0; sub->active = 1; // Convert filters array to linked list subscription_filter_t* filter_tail = NULL; int filter_count = 0; if (cJSON_IsArray(filters_array)) { cJSON* filter_json = NULL; cJSON_ArrayForEach(filter_json, filters_array) { if (filter_count >= MAX_FILTERS_PER_SUBSCRIPTION) { log_warning("Maximum filters per subscription exceeded, ignoring excess filters"); break; } subscription_filter_t* filter = create_subscription_filter(filter_json); if (filter) { if (!sub->filters) { sub->filters = filter; filter_tail = filter; } else { filter_tail->next = filter; filter_tail = filter; } filter_count++; } } } if (filter_count == 0) { log_error("No valid filters found for subscription"); free(sub); return NULL; } return sub; } // Free a subscription void free_subscription(subscription_t* sub) { if (!sub) return; if (sub->filters) { free_subscription_filter(sub->filters); } free(sub); } // Add subscription to global manager (thread-safe) int add_subscription_to_manager(subscription_t* sub) { if (!sub) return -1; pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); // Check global limits if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) { pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); log_error("Maximum total subscriptions reached"); return -1; } // Add to global list sub->next = g_subscription_manager.active_subscriptions; g_subscription_manager.active_subscriptions = sub; g_subscription_manager.total_subscriptions++; g_subscription_manager.total_created++; pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); // Log subscription creation to database log_subscription_created(sub); return 0; } // Remove subscription from global manager (thread-safe) int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) { if (!sub_id) { log_error("remove_subscription_from_manager: NULL subscription ID"); return -1; } // Validate subscription ID format if (!validate_subscription_id(sub_id)) { log_error("remove_subscription_from_manager: invalid subscription ID format"); return -1; } pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); subscription_t** current = &g_subscription_manager.active_subscriptions; while (*current) { subscription_t* sub = *current; // Match by ID and WebSocket connection if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) { // Remove from list *current = sub->next; g_subscription_manager.total_subscriptions--; // Copy data needed for logging before unlocking char client_ip_copy[CLIENT_IP_MAX_LENGTH]; int events_sent_copy = sub->events_sent; char sub_id_copy[SUBSCRIPTION_ID_MAX_LENGTH]; memcpy(client_ip_copy, sub->client_ip, CLIENT_IP_MAX_LENGTH); memcpy(sub_id_copy, sub->id, SUBSCRIPTION_ID_MAX_LENGTH); client_ip_copy[CLIENT_IP_MAX_LENGTH - 1] = '\0'; sub_id_copy[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0'; pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); // Log subscription closure to database (now safe) log_subscription_closed(sub_id_copy, client_ip_copy, "closed"); // Update events sent counter before freeing update_subscription_events_sent(sub_id_copy, events_sent_copy); free_subscription(sub); return 0; } current = &(sub->next); } pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); char debug_msg[256]; snprintf(debug_msg, sizeof(debug_msg), "Subscription '%s' not found for removal", sub_id); log_warning(debug_msg); return -1; } // Check if an event matches a subscription filter int event_matches_filter(cJSON* event, subscription_filter_t* filter) { if (!event || !filter) { return 0; } // Check kinds filter if (filter->kinds && cJSON_IsArray(filter->kinds)) { cJSON* event_kind = cJSON_GetObjectItem(event, "kind"); if (!event_kind || !cJSON_IsNumber(event_kind)) { return 0; } int event_kind_val = (int)cJSON_GetNumberValue(event_kind); int kind_match = 0; cJSON* kind_item = NULL; cJSON_ArrayForEach(kind_item, filter->kinds) { if (cJSON_IsNumber(kind_item)) { int filter_kind = (int)cJSON_GetNumberValue(kind_item); if (filter_kind == event_kind_val) { kind_match = 1; break; } } } if (!kind_match) { return 0; } } // Check authors filter if (filter->authors && cJSON_IsArray(filter->authors)) { cJSON* event_pubkey = cJSON_GetObjectItem(event, "pubkey"); if (!event_pubkey || !cJSON_IsString(event_pubkey)) { return 0; } const char* event_pubkey_str = cJSON_GetStringValue(event_pubkey); int author_match = 0; cJSON* author_item = NULL; cJSON_ArrayForEach(author_item, filter->authors) { if (cJSON_IsString(author_item)) { const char* author_str = cJSON_GetStringValue(author_item); // Support prefix matching (partial pubkeys) if (strncmp(event_pubkey_str, author_str, strlen(author_str)) == 0) { author_match = 1; break; } } } if (!author_match) { return 0; } } // Check IDs filter if (filter->ids && cJSON_IsArray(filter->ids)) { cJSON* event_id = cJSON_GetObjectItem(event, "id"); if (!event_id || !cJSON_IsString(event_id)) { return 0; } const char* event_id_str = cJSON_GetStringValue(event_id); int id_match = 0; cJSON* id_item = NULL; cJSON_ArrayForEach(id_item, filter->ids) { if (cJSON_IsString(id_item)) { const char* id_str = cJSON_GetStringValue(id_item); // Support prefix matching (partial IDs) if (strncmp(event_id_str, id_str, strlen(id_str)) == 0) { id_match = 1; break; } } } if (!id_match) { return 0; } } // Check since filter if (filter->since > 0) { cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at"); if (!event_created_at || !cJSON_IsNumber(event_created_at)) { return 0; } long event_timestamp = (long)cJSON_GetNumberValue(event_created_at); if (event_timestamp < filter->since) { return 0; } } // Check until filter if (filter->until > 0) { cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at"); if (!event_created_at || !cJSON_IsNumber(event_created_at)) { return 0; } long event_timestamp = (long)cJSON_GetNumberValue(event_created_at); if (event_timestamp > filter->until) { return 0; } } // Check tag filters (e.g., #e, #p tags) if (filter->tag_filters && cJSON_IsObject(filter->tag_filters)) { cJSON* event_tags = cJSON_GetObjectItem(event, "tags"); if (!event_tags || !cJSON_IsArray(event_tags)) { return 0; // Event has no tags but filter requires tags } // Check each tag filter cJSON* tag_filter = NULL; cJSON_ArrayForEach(tag_filter, filter->tag_filters) { if (!tag_filter->string || strlen(tag_filter->string) < 2 || tag_filter->string[0] != '#') { continue; // Invalid tag filter } const char* tag_name = tag_filter->string + 1; // Skip the '#' if (!cJSON_IsArray(tag_filter)) { continue; // Tag filter must be an array } int tag_match = 0; // Search through event tags for matching tag name and value cJSON* event_tag = NULL; cJSON_ArrayForEach(event_tag, event_tags) { if (!cJSON_IsArray(event_tag) || cJSON_GetArraySize(event_tag) < 2) { continue; // Invalid tag format } cJSON* event_tag_name = cJSON_GetArrayItem(event_tag, 0); cJSON* event_tag_value = cJSON_GetArrayItem(event_tag, 1); if (!cJSON_IsString(event_tag_name) || !cJSON_IsString(event_tag_value)) { continue; } const char* event_tag_name_str = cJSON_GetStringValue(event_tag_name); const char* event_tag_value_str = cJSON_GetStringValue(event_tag_value); // Check if tag name matches if (strcmp(event_tag_name_str, tag_name) == 0) { // Check if any of the filter values match this tag value cJSON* filter_value = NULL; cJSON_ArrayForEach(filter_value, tag_filter) { if (cJSON_IsString(filter_value)) { const char* filter_value_str = cJSON_GetStringValue(filter_value); // Support prefix matching for tag values if (strncmp(event_tag_value_str, filter_value_str, strlen(filter_value_str)) == 0) { tag_match = 1; break; } } } if (tag_match) { break; } } } if (!tag_match) { return 0; // This tag filter didn't match, so the event doesn't match } } } return 1; // All filters passed } // Check if an event matches any filter in a subscription (filters are OR'd together) int event_matches_subscription(cJSON* event, subscription_t* subscription) { if (!event || !subscription || !subscription->filters) { return 0; } subscription_filter_t* filter = subscription->filters; while (filter) { if (event_matches_filter(event, filter)) { return 1; // Match found (OR logic) } filter = filter->next; } return 0; // No filters matched } // Broadcast event to all matching subscriptions (thread-safe) int broadcast_event_to_subscriptions(cJSON* event) { if (!event) { return 0; } // Check if event is expired and should not be broadcast (NIP-40) pthread_mutex_lock(&g_unified_cache.cache_lock); int expiration_enabled = g_unified_cache.expiration_config.enabled; int filter_responses = g_unified_cache.expiration_config.filter_responses; pthread_mutex_unlock(&g_unified_cache.cache_lock); if (expiration_enabled && filter_responses) { time_t current_time = time(NULL); if (is_event_expired(event, current_time)) { return 0; // Don't broadcast expired events } } int broadcasts = 0; // Create a temporary list of matching subscriptions to avoid holding lock during I/O typedef struct temp_sub { struct lws* wsi; char id[SUBSCRIPTION_ID_MAX_LENGTH]; char client_ip[CLIENT_IP_MAX_LENGTH]; struct temp_sub* next; } temp_sub_t; temp_sub_t* matching_subs = NULL; int matching_count = 0; // First pass: collect matching subscriptions while holding lock pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); subscription_t* sub = g_subscription_manager.active_subscriptions; while (sub) { if (sub->active && sub->wsi && event_matches_subscription(event, sub)) { temp_sub_t* temp = malloc(sizeof(temp_sub_t)); if (temp) { temp->wsi = sub->wsi; // Safely copy subscription ID size_t id_len = strlen(sub->id); if (id_len >= SUBSCRIPTION_ID_MAX_LENGTH) { id_len = SUBSCRIPTION_ID_MAX_LENGTH - 1; } memcpy(temp->id, sub->id, id_len); temp->id[id_len] = '\0'; // Safely copy client IP size_t ip_len = strlen(sub->client_ip); if (ip_len >= CLIENT_IP_MAX_LENGTH) { ip_len = CLIENT_IP_MAX_LENGTH - 1; } memcpy(temp->client_ip, sub->client_ip, ip_len); temp->client_ip[ip_len] = '\0'; temp->next = matching_subs; matching_subs = temp; matching_count++; } else { log_error("broadcast_event_to_subscriptions: failed to allocate temp subscription"); } } sub = sub->next; } pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); // Second pass: send messages without holding lock temp_sub_t* current_temp = matching_subs; while (current_temp) { // Create EVENT message for this subscription cJSON* event_msg = cJSON_CreateArray(); cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT")); cJSON_AddItemToArray(event_msg, cJSON_CreateString(current_temp->id)); cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1)); char* msg_str = cJSON_Print(event_msg); if (msg_str) { size_t msg_len = strlen(msg_str); unsigned char* buf = malloc(LWS_PRE + msg_len); if (buf) { memcpy(buf + LWS_PRE, msg_str, msg_len); // Send to WebSocket connection with error checking // Note: lws_write can fail if connection is closed, but won't crash int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT); if (write_result >= 0) { broadcasts++; // Update events sent counter for this subscription pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); subscription_t* update_sub = g_subscription_manager.active_subscriptions; while (update_sub) { if (update_sub->wsi == current_temp->wsi && strcmp(update_sub->id, current_temp->id) == 0) { update_sub->events_sent++; break; } update_sub = update_sub->next; } pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); // Log event broadcast to database (optional - can be disabled for performance) cJSON* event_id_obj = cJSON_GetObjectItem(event, "id"); if (event_id_obj && cJSON_IsString(event_id_obj)) { log_event_broadcast(cJSON_GetStringValue(event_id_obj), current_temp->id, current_temp->client_ip); } } free(buf); } free(msg_str); } cJSON_Delete(event_msg); current_temp = current_temp->next; } // Clean up temporary subscription list while (matching_subs) { temp_sub_t* next = matching_subs->next; free(matching_subs); matching_subs = next; } // Update global statistics pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); g_subscription_manager.total_events_broadcast += broadcasts; pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); return broadcasts; } ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// // SUBSCRIPTION DATABASE LOGGING ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// // Log subscription creation to database void log_subscription_created(const subscription_t* sub) { if (!g_db || !sub) return; // Create filter JSON for logging char* filter_json = NULL; if (sub->filters) { cJSON* filters_array = cJSON_CreateArray(); subscription_filter_t* filter = sub->filters; while (filter) { cJSON* filter_obj = cJSON_CreateObject(); if (filter->kinds) { cJSON_AddItemToObject(filter_obj, "kinds", cJSON_Duplicate(filter->kinds, 1)); } if (filter->authors) { cJSON_AddItemToObject(filter_obj, "authors", cJSON_Duplicate(filter->authors, 1)); } if (filter->ids) { cJSON_AddItemToObject(filter_obj, "ids", cJSON_Duplicate(filter->ids, 1)); } if (filter->since > 0) { cJSON_AddNumberToObject(filter_obj, "since", filter->since); } if (filter->until > 0) { cJSON_AddNumberToObject(filter_obj, "until", filter->until); } if (filter->limit > 0) { cJSON_AddNumberToObject(filter_obj, "limit", filter->limit); } if (filter->tag_filters) { cJSON* tags_obj = cJSON_Duplicate(filter->tag_filters, 1); cJSON* item = NULL; cJSON_ArrayForEach(item, tags_obj) { if (item->string) { cJSON_AddItemToObject(filter_obj, item->string, cJSON_Duplicate(item, 1)); } } cJSON_Delete(tags_obj); } cJSON_AddItemToArray(filters_array, filter_obj); filter = filter->next; } filter_json = cJSON_Print(filters_array); cJSON_Delete(filters_array); } const char* sql = "INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) " "VALUES (?, ?, 'created', ?)"; sqlite3_stmt* stmt; int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC); sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC); sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT); sqlite3_step(stmt); sqlite3_finalize(stmt); } if (filter_json) free(filter_json); } // Log subscription closure to database void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason) { (void)reason; // Mark as intentionally unused if (!g_db || !sub_id) return; const char* sql = "INSERT INTO subscription_events (subscription_id, client_ip, event_type) " "VALUES (?, ?, 'closed')"; sqlite3_stmt* stmt; int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC); sqlite3_bind_text(stmt, 2, client_ip ? client_ip : "unknown", -1, SQLITE_STATIC); sqlite3_step(stmt); sqlite3_finalize(stmt); } // Update the corresponding 'created' entry with end time and events sent const char* update_sql = "UPDATE subscription_events " "SET ended_at = strftime('%s', 'now') " "WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL"; rc = sqlite3_prepare_v2(g_db, update_sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC); sqlite3_step(stmt); sqlite3_finalize(stmt); } } // Log subscription disconnection to database void log_subscription_disconnected(const char* client_ip) { if (!g_db || !client_ip) return; // Mark all active subscriptions for this client as disconnected const char* sql = "UPDATE subscription_events " "SET ended_at = strftime('%s', 'now') " "WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL"; sqlite3_stmt* stmt; int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC); int changes = sqlite3_changes(g_db); sqlite3_step(stmt); sqlite3_finalize(stmt); if (changes > 0) { // Log a disconnection event const char* insert_sql = "INSERT INTO subscription_events (subscription_id, client_ip, event_type) " "VALUES ('disconnect', ?, 'disconnected')"; rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC); sqlite3_step(stmt); sqlite3_finalize(stmt); } } } } // Log event broadcast to database (optional, can be resource intensive) void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip) { if (!g_db || !event_id || !sub_id || !client_ip) return; const char* sql = "INSERT INTO event_broadcasts (event_id, subscription_id, client_ip) " "VALUES (?, ?, ?)"; sqlite3_stmt* stmt; int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC); sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC); sqlite3_bind_text(stmt, 3, client_ip, -1, SQLITE_STATIC); sqlite3_step(stmt); sqlite3_finalize(stmt); } } // Update events sent counter for a subscription void update_subscription_events_sent(const char* sub_id, int events_sent) { if (!g_db || !sub_id) return; const char* sql = "UPDATE subscription_events " "SET events_sent = ? " "WHERE subscription_id = ? AND event_type = 'created'"; sqlite3_stmt* stmt; int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL); if (rc == SQLITE_OK) { sqlite3_bind_int(stmt, 1, events_sent); sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC); sqlite3_step(stmt); sqlite3_finalize(stmt); } } /////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////// // PER-IP CONNECTION TRACKING /////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////// // Get or create IP connection info (thread-safe) ip_connection_info_t* get_or_create_ip_connection(const char* client_ip) { if (!client_ip) return NULL; pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock); // Look for existing IP connection info ip_connection_info_t* current = g_subscription_manager.ip_connections; while (current) { if (strcmp(current->ip_address, client_ip) == 0) { // Found existing entry, update activity current->last_activity = time(NULL); pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return current; } current = current->next; } // Create new IP connection info ip_connection_info_t* new_ip = calloc(1, sizeof(ip_connection_info_t)); if (!new_ip) { pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return NULL; } // Copy IP address safely strncpy(new_ip->ip_address, client_ip, CLIENT_IP_MAX_LENGTH - 1); new_ip->ip_address[CLIENT_IP_MAX_LENGTH - 1] = '\0'; // Initialize tracking data time_t now = time(NULL); new_ip->active_connections = 1; new_ip->total_subscriptions = 0; new_ip->first_connection = now; new_ip->last_activity = now; // Add to linked list new_ip->next = g_subscription_manager.ip_connections; g_subscription_manager.ip_connections = new_ip; pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return new_ip; } // Update IP connection activity timestamp void update_ip_connection_activity(const char* client_ip) { if (!client_ip) return; pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock); ip_connection_info_t* current = g_subscription_manager.ip_connections; while (current) { if (strcmp(current->ip_address, client_ip) == 0) { current->last_activity = time(NULL); break; } current = current->next; } pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); } // Remove IP connection (when last connection from IP closes) void remove_ip_connection(const char* client_ip) { if (!client_ip) return; pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock); ip_connection_info_t** current = &g_subscription_manager.ip_connections; while (*current) { ip_connection_info_t* entry = *current; if (strcmp(entry->ip_address, client_ip) == 0) { // Remove from list *current = entry->next; free(entry); break; } current = &((*current)->next); } pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); } // Get total subscriptions for an IP address int get_total_subscriptions_for_ip(const char* client_ip) { if (!client_ip) return 0; pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock); ip_connection_info_t* current = g_subscription_manager.ip_connections; while (current) { if (strcmp(current->ip_address, client_ip) == 0) { int total = current->total_subscriptions; pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return total; } current = current->next; } pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return 0; } // Get active connections for an IP address int get_active_connections_for_ip(const char* client_ip) { if (!client_ip) return 0; pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock); ip_connection_info_t* current = g_subscription_manager.ip_connections; while (current) { if (strcmp(current->ip_address, client_ip) == 0) { int active = current->active_connections; pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return active; } current = current->next; } pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock); return 0; } ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// // FILTER VALIDATION FUNCTIONS ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// /** * Validate hex string format and length */ int validate_hex_string(const char* str, size_t expected_len, const char* field_name, char* error_message, size_t error_size) { if (!str) { snprintf(error_message, error_size, "%s: null value", field_name); return 0; } size_t len = strlen(str); if (len != expected_len) { snprintf(error_message, error_size, "%s: invalid length %zu, expected %zu", field_name, len, expected_len); return 0; } // Check for valid hex characters for (size_t i = 0; i < len; i++) { char c = str[i]; if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'))) { snprintf(error_message, error_size, "%s: invalid hex character '%c' at position %zu", field_name, c, i); return 0; } } return 1; } /** * Validate timestamp range (since/until) */ int validate_timestamp_range(long since, long until, char* error_message, size_t error_size) { // Allow zero values (not set) if (since == 0 && until == 0) { return 1; } // Check for reasonable timestamp bounds (1970-01-01 to 2100-01-01) if (since != 0 && (since < MIN_TIMESTAMP || since > MAX_TIMESTAMP)) { snprintf(error_message, error_size, "since: timestamp %ld out of valid range", since); return 0; } if (until != 0 && (until < MIN_TIMESTAMP || until > MAX_TIMESTAMP)) { snprintf(error_message, error_size, "until: timestamp %ld out of valid range", until); return 0; } // Check that since is before until if both are set if (since > 0 && until > 0 && since >= until) { snprintf(error_message, error_size, "since (%ld) must be before until (%ld)", since, until); return 0; } return 1; } /** * Validate numeric limits */ int validate_numeric_limits(int limit, char* error_message, size_t error_size) { // Allow zero (no limit) if (limit == 0) { return 1; } // Check for reasonable limits (1-10000) if (limit < MIN_LIMIT || limit > MAX_LIMIT) { snprintf(error_message, error_size, "limit: value %d out of valid range [%d, %d]", limit, MIN_LIMIT, MAX_LIMIT); return 0; } return 1; } /** * Validate search term for SQL injection and length */ int validate_search_term(const char* search_term, char* error_message, size_t error_size) { if (!search_term) { return 1; // NULL search terms are allowed } size_t len = strlen(search_term); // Check maximum length if (len > MAX_SEARCH_TERM_LENGTH) { snprintf(error_message, error_size, "search: term too long (%zu characters, max %d)", len, (int)MAX_SEARCH_TERM_LENGTH); return 0; } // Check for potentially dangerous characters that could cause SQL issues // Allow alphanumeric, spaces, and common punctuation for (size_t i = 0; i < len; i++) { char c = search_term[i]; if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == ' ' || c == '-' || c == '_' || c == '.' || c == ',' || c == '!' || c == '?' || c == ':' || c == ';' || c == '"' || c == '\'' || c == '(' || c == ')' || c == '[' || c == ']' || c == '{' || c == '}' || c == '@' || c == '#' || c == '$' || c == '%' || c == '^' || c == '&' || c == '*' || c == '+' || c == '=' || c == '|' || c == '\\' || c == '/' || c == '<' || c == '>' || c == '~' || c == '`')) { // Reject control characters and other potentially problematic chars if (c < 32 || c == 127) { snprintf(error_message, error_size, "search: invalid character (ASCII %d) at position %zu", (int)c, i); return 0; } } } return 1; } /** * Validate all filter values in a filter object */ int validate_filter_values(cJSON* filter_json, char* error_message, size_t error_size) { if (!filter_json || !cJSON_IsObject(filter_json)) { snprintf(error_message, error_size, "filter must be a JSON object"); return 0; } // Validate kinds array cJSON* kinds = cJSON_GetObjectItem(filter_json, "kinds"); if (kinds) { if (!cJSON_IsArray(kinds)) { snprintf(error_message, error_size, "kinds must be an array"); return 0; } int kinds_count = cJSON_GetArraySize(kinds); if (kinds_count > MAX_KINDS_PER_FILTER) { snprintf(error_message, error_size, "kinds array too large (%d items, max %d)", kinds_count, MAX_KINDS_PER_FILTER); return 0; } for (int i = 0; i < kinds_count; i++) { cJSON* kind_item = cJSON_GetArrayItem(kinds, i); if (!cJSON_IsNumber(kind_item)) { snprintf(error_message, error_size, "kinds[%d] must be a number", i); return 0; } int kind_val = (int)cJSON_GetNumberValue(kind_item); if (kind_val < 0 || kind_val > 65535) { // Reasonable range for event kinds snprintf(error_message, error_size, "kinds[%d]: invalid event kind %d", i, kind_val); return 0; } } } // Validate authors array cJSON* authors = cJSON_GetObjectItem(filter_json, "authors"); if (authors) { if (!cJSON_IsArray(authors)) { snprintf(error_message, error_size, "authors must be an array"); return 0; } int authors_count = cJSON_GetArraySize(authors); if (authors_count > MAX_AUTHORS_PER_FILTER) { snprintf(error_message, error_size, "authors array too large (%d items, max %d)", authors_count, MAX_AUTHORS_PER_FILTER); return 0; } for (int i = 0; i < authors_count; i++) { cJSON* author_item = cJSON_GetArrayItem(authors, i); if (!cJSON_IsString(author_item)) { snprintf(error_message, error_size, "authors[%d] must be a string", i); return 0; } const char* author_str = cJSON_GetStringValue(author_item); // Allow partial pubkeys (prefix matching), so validate hex but allow shorter lengths size_t author_len = strlen(author_str); if (author_len == 0 || author_len > 64) { snprintf(error_message, error_size, "authors[%d]: invalid length %zu", i, author_len); return 0; } // Validate hex characters (allow partial) for (size_t j = 0; j < author_len; j++) { char c = author_str[j]; if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'))) { snprintf(error_message, error_size, "authors[%d]: invalid hex character '%c'", i, c); return 0; } } } } // Validate ids array cJSON* ids = cJSON_GetObjectItem(filter_json, "ids"); if (ids) { if (!cJSON_IsArray(ids)) { snprintf(error_message, error_size, "ids must be an array"); return 0; } int ids_count = cJSON_GetArraySize(ids); if (ids_count > MAX_IDS_PER_FILTER) { snprintf(error_message, error_size, "ids array too large (%d items, max %d)", ids_count, MAX_IDS_PER_FILTER); return 0; } for (int i = 0; i < ids_count; i++) { cJSON* id_item = cJSON_GetArrayItem(ids, i); if (!cJSON_IsString(id_item)) { snprintf(error_message, error_size, "ids[%d] must be a string", i); return 0; } const char* id_str = cJSON_GetStringValue(id_item); // Allow partial IDs (prefix matching) size_t id_len = strlen(id_str); if (id_len == 0 || id_len > 64) { snprintf(error_message, error_size, "ids[%d]: invalid length %zu", i, id_len); return 0; } // Validate hex characters for (size_t j = 0; j < id_len; j++) { char c = id_str[j]; if (!((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'))) { snprintf(error_message, error_size, "ids[%d]: invalid hex character '%c'", i, c); return 0; } } } } // Validate since/until timestamps long since_val = 0, until_val = 0; cJSON* since = cJSON_GetObjectItem(filter_json, "since"); if (since) { if (!cJSON_IsNumber(since)) { snprintf(error_message, error_size, "since must be a number"); return 0; } since_val = (long)cJSON_GetNumberValue(since); } cJSON* until = cJSON_GetObjectItem(filter_json, "until"); if (until) { if (!cJSON_IsNumber(until)) { snprintf(error_message, error_size, "until must be a number"); return 0; } until_val = (long)cJSON_GetNumberValue(until); } if (!validate_timestamp_range(since_val, until_val, error_message, error_size)) { return 0; } // Validate limit cJSON* limit = cJSON_GetObjectItem(filter_json, "limit"); if (limit) { if (!cJSON_IsNumber(limit)) { snprintf(error_message, error_size, "limit must be a number"); return 0; } int limit_val = (int)cJSON_GetNumberValue(limit); if (!validate_numeric_limits(limit_val, error_message, error_size)) { return 0; } } // Validate search term cJSON* search = cJSON_GetObjectItem(filter_json, "search"); if (search) { if (!cJSON_IsString(search)) { snprintf(error_message, error_size, "search must be a string"); return 0; } const char* search_term = cJSON_GetStringValue(search); if (!validate_search_term(search_term, error_message, error_size)) { return 0; } } // Validate tag filters (#e, #p, #t, etc.) cJSON* item = NULL; cJSON_ArrayForEach(item, filter_json) { const char* key = item->string; if (key && strlen(key) >= 2 && key[0] == '#') { if (!cJSON_IsArray(item)) { snprintf(error_message, error_size, "%s must be an array", key); return 0; } int tag_count = cJSON_GetArraySize(item); if (tag_count > MAX_TAG_VALUES_PER_FILTER) { snprintf(error_message, error_size, "%s array too large (%d items, max %d)", key, tag_count, MAX_TAG_VALUES_PER_FILTER); return 0; } for (int i = 0; i < tag_count; i++) { cJSON* tag_value = cJSON_GetArrayItem(item, i); if (!cJSON_IsString(tag_value)) { snprintf(error_message, error_size, "%s[%d] must be a string", key, i); return 0; } const char* tag_str = cJSON_GetStringValue(tag_value); size_t tag_len = strlen(tag_str); if (tag_len > MAX_TAG_VALUE_LENGTH) { snprintf(error_message, error_size, "%s[%d]: tag value too long (%zu characters, max %d)", key, i, tag_len, MAX_TAG_VALUE_LENGTH); return 0; } } } } return 1; }