724 lines
25 KiB
C
724 lines
25 KiB
C
#define _GNU_SOURCE
|
|
#include <cjson/cJSON.h>
|
|
#include <sqlite3.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <time.h>
|
|
#include <stdio.h>
|
|
#include <printf.h>
|
|
#include <pthread.h>
|
|
#include <libwebsockets.h>
|
|
#include "subscriptions.h"
|
|
|
|
// Forward declarations for logging functions
|
|
void log_info(const char* message);
|
|
void log_error(const char* message);
|
|
void log_warning(const char* message);
|
|
|
|
// Forward declarations for configuration functions
|
|
const char* get_config_value(const char* key);
|
|
|
|
// Forward declarations for NIP-40 expiration functions
|
|
int is_event_expired(cJSON* event, time_t current_time);
|
|
|
|
// Global database variable
|
|
extern sqlite3* g_db;
|
|
|
|
// Global unified cache
|
|
extern unified_config_cache_t g_unified_cache;
|
|
|
|
// Global subscription manager
|
|
extern subscription_manager_t g_subscription_manager;
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// PERSISTENT SUBSCRIPTIONS SYSTEM
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Create a subscription filter from cJSON filter object
|
|
subscription_filter_t* create_subscription_filter(cJSON* filter_json) {
|
|
if (!filter_json || !cJSON_IsObject(filter_json)) {
|
|
return NULL;
|
|
}
|
|
|
|
subscription_filter_t* filter = calloc(1, sizeof(subscription_filter_t));
|
|
if (!filter) {
|
|
return NULL;
|
|
}
|
|
|
|
// Copy filter criteria
|
|
cJSON* kinds = cJSON_GetObjectItem(filter_json, "kinds");
|
|
if (kinds && cJSON_IsArray(kinds)) {
|
|
filter->kinds = cJSON_Duplicate(kinds, 1);
|
|
}
|
|
|
|
cJSON* authors = cJSON_GetObjectItem(filter_json, "authors");
|
|
if (authors && cJSON_IsArray(authors)) {
|
|
filter->authors = cJSON_Duplicate(authors, 1);
|
|
}
|
|
|
|
cJSON* ids = cJSON_GetObjectItem(filter_json, "ids");
|
|
if (ids && cJSON_IsArray(ids)) {
|
|
filter->ids = cJSON_Duplicate(ids, 1);
|
|
}
|
|
|
|
cJSON* since = cJSON_GetObjectItem(filter_json, "since");
|
|
if (since && cJSON_IsNumber(since)) {
|
|
filter->since = (long)cJSON_GetNumberValue(since);
|
|
}
|
|
|
|
cJSON* until = cJSON_GetObjectItem(filter_json, "until");
|
|
if (until && cJSON_IsNumber(until)) {
|
|
filter->until = (long)cJSON_GetNumberValue(until);
|
|
}
|
|
|
|
cJSON* limit = cJSON_GetObjectItem(filter_json, "limit");
|
|
if (limit && cJSON_IsNumber(limit)) {
|
|
filter->limit = (int)cJSON_GetNumberValue(limit);
|
|
}
|
|
|
|
// Handle tag filters (e.g., {"#e": ["id1"], "#p": ["pubkey1"]})
|
|
cJSON* item = NULL;
|
|
cJSON_ArrayForEach(item, filter_json) {
|
|
if (item->string && strlen(item->string) >= 2 && item->string[0] == '#') {
|
|
if (!filter->tag_filters) {
|
|
filter->tag_filters = cJSON_CreateObject();
|
|
}
|
|
if (filter->tag_filters) {
|
|
cJSON_AddItemToObject(filter->tag_filters, item->string, cJSON_Duplicate(item, 1));
|
|
}
|
|
}
|
|
}
|
|
|
|
return filter;
|
|
}
|
|
|
|
// Free a subscription filter
|
|
void free_subscription_filter(subscription_filter_t* filter) {
|
|
if (!filter) return;
|
|
|
|
if (filter->kinds) cJSON_Delete(filter->kinds);
|
|
if (filter->authors) cJSON_Delete(filter->authors);
|
|
if (filter->ids) cJSON_Delete(filter->ids);
|
|
if (filter->tag_filters) cJSON_Delete(filter->tag_filters);
|
|
|
|
if (filter->next) {
|
|
free_subscription_filter(filter->next);
|
|
}
|
|
|
|
free(filter);
|
|
}
|
|
|
|
// Create a new subscription
|
|
subscription_t* create_subscription(const char* sub_id, struct lws* wsi, cJSON* filters_array, const char* client_ip) {
|
|
if (!sub_id || !wsi || !filters_array) {
|
|
return NULL;
|
|
}
|
|
|
|
subscription_t* sub = calloc(1, sizeof(subscription_t));
|
|
if (!sub) {
|
|
return NULL;
|
|
}
|
|
|
|
// Copy subscription ID (truncate if too long)
|
|
strncpy(sub->id, sub_id, SUBSCRIPTION_ID_MAX_LENGTH - 1);
|
|
sub->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
|
|
|
|
// Set WebSocket connection
|
|
sub->wsi = wsi;
|
|
|
|
// Set client IP
|
|
if (client_ip) {
|
|
strncpy(sub->client_ip, client_ip, CLIENT_IP_MAX_LENGTH - 1);
|
|
sub->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0';
|
|
}
|
|
|
|
// Set timestamps and state
|
|
sub->created_at = time(NULL);
|
|
sub->events_sent = 0;
|
|
sub->active = 1;
|
|
|
|
// Convert filters array to linked list
|
|
subscription_filter_t* filter_tail = NULL;
|
|
int filter_count = 0;
|
|
|
|
if (cJSON_IsArray(filters_array)) {
|
|
cJSON* filter_json = NULL;
|
|
cJSON_ArrayForEach(filter_json, filters_array) {
|
|
if (filter_count >= MAX_FILTERS_PER_SUBSCRIPTION) {
|
|
log_warning("Maximum filters per subscription exceeded, ignoring excess filters");
|
|
break;
|
|
}
|
|
|
|
subscription_filter_t* filter = create_subscription_filter(filter_json);
|
|
if (filter) {
|
|
if (!sub->filters) {
|
|
sub->filters = filter;
|
|
filter_tail = filter;
|
|
} else {
|
|
filter_tail->next = filter;
|
|
filter_tail = filter;
|
|
}
|
|
filter_count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (filter_count == 0) {
|
|
log_error("No valid filters found for subscription");
|
|
free(sub);
|
|
return NULL;
|
|
}
|
|
|
|
return sub;
|
|
}
|
|
|
|
// Free a subscription
|
|
void free_subscription(subscription_t* sub) {
|
|
if (!sub) return;
|
|
|
|
if (sub->filters) {
|
|
free_subscription_filter(sub->filters);
|
|
}
|
|
|
|
free(sub);
|
|
}
|
|
|
|
// Add subscription to global manager (thread-safe)
|
|
int add_subscription_to_manager(subscription_t* sub) {
|
|
if (!sub) return -1;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Check global limits
|
|
if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
log_error("Maximum total subscriptions reached");
|
|
return -1;
|
|
}
|
|
|
|
// Add to global list
|
|
sub->next = g_subscription_manager.active_subscriptions;
|
|
g_subscription_manager.active_subscriptions = sub;
|
|
g_subscription_manager.total_subscriptions++;
|
|
g_subscription_manager.total_created++;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Log subscription creation to database
|
|
log_subscription_created(sub);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Added subscription '%s' (total: %d)",
|
|
sub->id, g_subscription_manager.total_subscriptions);
|
|
log_info(debug_msg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Remove subscription from global manager (thread-safe)
|
|
int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
|
|
if (!sub_id) return -1;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
subscription_t** current = &g_subscription_manager.active_subscriptions;
|
|
|
|
while (*current) {
|
|
subscription_t* sub = *current;
|
|
|
|
// Match by ID and WebSocket connection
|
|
if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) {
|
|
// Remove from list
|
|
*current = sub->next;
|
|
g_subscription_manager.total_subscriptions--;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Log subscription closure to database
|
|
log_subscription_closed(sub_id, sub->client_ip, "closed");
|
|
|
|
// Update events sent counter before freeing
|
|
update_subscription_events_sent(sub_id, sub->events_sent);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Removed subscription '%s' (total: %d)",
|
|
sub_id, g_subscription_manager.total_subscriptions);
|
|
log_info(debug_msg);
|
|
|
|
free_subscription(sub);
|
|
return 0;
|
|
}
|
|
|
|
current = &(sub->next);
|
|
}
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Subscription '%s' not found for removal", sub_id);
|
|
log_warning(debug_msg);
|
|
|
|
return -1;
|
|
}
|
|
|
|
// Check if an event matches a subscription filter
|
|
int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
|
|
if (!event || !filter) {
|
|
return 0;
|
|
}
|
|
|
|
// Check kinds filter
|
|
if (filter->kinds && cJSON_IsArray(filter->kinds)) {
|
|
cJSON* event_kind = cJSON_GetObjectItem(event, "kind");
|
|
if (!event_kind || !cJSON_IsNumber(event_kind)) {
|
|
return 0;
|
|
}
|
|
|
|
int event_kind_val = (int)cJSON_GetNumberValue(event_kind);
|
|
int kind_match = 0;
|
|
|
|
cJSON* kind_item = NULL;
|
|
cJSON_ArrayForEach(kind_item, filter->kinds) {
|
|
if (cJSON_IsNumber(kind_item) && (int)cJSON_GetNumberValue(kind_item) == event_kind_val) {
|
|
kind_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!kind_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check authors filter
|
|
if (filter->authors && cJSON_IsArray(filter->authors)) {
|
|
cJSON* event_pubkey = cJSON_GetObjectItem(event, "pubkey");
|
|
if (!event_pubkey || !cJSON_IsString(event_pubkey)) {
|
|
return 0;
|
|
}
|
|
|
|
const char* event_pubkey_str = cJSON_GetStringValue(event_pubkey);
|
|
int author_match = 0;
|
|
|
|
cJSON* author_item = NULL;
|
|
cJSON_ArrayForEach(author_item, filter->authors) {
|
|
if (cJSON_IsString(author_item)) {
|
|
const char* author_str = cJSON_GetStringValue(author_item);
|
|
// Support prefix matching (partial pubkeys)
|
|
if (strncmp(event_pubkey_str, author_str, strlen(author_str)) == 0) {
|
|
author_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!author_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check IDs filter
|
|
if (filter->ids && cJSON_IsArray(filter->ids)) {
|
|
cJSON* event_id = cJSON_GetObjectItem(event, "id");
|
|
if (!event_id || !cJSON_IsString(event_id)) {
|
|
return 0;
|
|
}
|
|
|
|
const char* event_id_str = cJSON_GetStringValue(event_id);
|
|
int id_match = 0;
|
|
|
|
cJSON* id_item = NULL;
|
|
cJSON_ArrayForEach(id_item, filter->ids) {
|
|
if (cJSON_IsString(id_item)) {
|
|
const char* id_str = cJSON_GetStringValue(id_item);
|
|
// Support prefix matching (partial IDs)
|
|
if (strncmp(event_id_str, id_str, strlen(id_str)) == 0) {
|
|
id_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!id_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check since filter
|
|
if (filter->since > 0) {
|
|
cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at");
|
|
if (!event_created_at || !cJSON_IsNumber(event_created_at)) {
|
|
return 0;
|
|
}
|
|
|
|
long event_timestamp = (long)cJSON_GetNumberValue(event_created_at);
|
|
if (event_timestamp < filter->since) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check until filter
|
|
if (filter->until > 0) {
|
|
cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at");
|
|
if (!event_created_at || !cJSON_IsNumber(event_created_at)) {
|
|
return 0;
|
|
}
|
|
|
|
long event_timestamp = (long)cJSON_GetNumberValue(event_created_at);
|
|
if (event_timestamp > filter->until) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check tag filters (e.g., #e, #p tags)
|
|
if (filter->tag_filters && cJSON_IsObject(filter->tag_filters)) {
|
|
cJSON* event_tags = cJSON_GetObjectItem(event, "tags");
|
|
if (!event_tags || !cJSON_IsArray(event_tags)) {
|
|
return 0; // Event has no tags but filter requires tags
|
|
}
|
|
|
|
// Check each tag filter
|
|
cJSON* tag_filter = NULL;
|
|
cJSON_ArrayForEach(tag_filter, filter->tag_filters) {
|
|
if (!tag_filter->string || strlen(tag_filter->string) < 2 || tag_filter->string[0] != '#') {
|
|
continue; // Invalid tag filter
|
|
}
|
|
|
|
const char* tag_name = tag_filter->string + 1; // Skip the '#'
|
|
|
|
if (!cJSON_IsArray(tag_filter)) {
|
|
continue; // Tag filter must be an array
|
|
}
|
|
|
|
int tag_match = 0;
|
|
|
|
// Search through event tags for matching tag name and value
|
|
cJSON* event_tag = NULL;
|
|
cJSON_ArrayForEach(event_tag, event_tags) {
|
|
if (!cJSON_IsArray(event_tag) || cJSON_GetArraySize(event_tag) < 2) {
|
|
continue; // Invalid tag format
|
|
}
|
|
|
|
cJSON* event_tag_name = cJSON_GetArrayItem(event_tag, 0);
|
|
cJSON* event_tag_value = cJSON_GetArrayItem(event_tag, 1);
|
|
|
|
if (!cJSON_IsString(event_tag_name) || !cJSON_IsString(event_tag_value)) {
|
|
continue;
|
|
}
|
|
|
|
// Check if tag name matches
|
|
if (strcmp(cJSON_GetStringValue(event_tag_name), tag_name) == 0) {
|
|
const char* event_tag_value_str = cJSON_GetStringValue(event_tag_value);
|
|
|
|
// Check if any of the filter values match this tag value
|
|
cJSON* filter_value = NULL;
|
|
cJSON_ArrayForEach(filter_value, tag_filter) {
|
|
if (cJSON_IsString(filter_value)) {
|
|
const char* filter_value_str = cJSON_GetStringValue(filter_value);
|
|
// Support prefix matching for tag values
|
|
if (strncmp(event_tag_value_str, filter_value_str, strlen(filter_value_str)) == 0) {
|
|
tag_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (tag_match) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!tag_match) {
|
|
return 0; // This tag filter didn't match, so the event doesn't match
|
|
}
|
|
}
|
|
}
|
|
|
|
return 1; // All filters passed
|
|
}
|
|
|
|
// Check if an event matches any filter in a subscription (filters are OR'd together)
|
|
int event_matches_subscription(cJSON* event, subscription_t* subscription) {
|
|
if (!event || !subscription || !subscription->filters) {
|
|
return 0;
|
|
}
|
|
|
|
subscription_filter_t* filter = subscription->filters;
|
|
while (filter) {
|
|
if (event_matches_filter(event, filter)) {
|
|
return 1; // Match found (OR logic)
|
|
}
|
|
filter = filter->next;
|
|
}
|
|
|
|
return 0; // No filters matched
|
|
}
|
|
|
|
// Broadcast event to all matching subscriptions (thread-safe)
|
|
int broadcast_event_to_subscriptions(cJSON* event) {
|
|
if (!event) {
|
|
return 0;
|
|
}
|
|
|
|
// Check if event is expired and should not be broadcast (NIP-40)
|
|
pthread_mutex_lock(&g_unified_cache.cache_lock);
|
|
int expiration_enabled = g_unified_cache.expiration_config.enabled;
|
|
int filter_responses = g_unified_cache.expiration_config.filter_responses;
|
|
pthread_mutex_unlock(&g_unified_cache.cache_lock);
|
|
|
|
if (expiration_enabled && filter_responses) {
|
|
time_t current_time = time(NULL);
|
|
if (is_event_expired(event, current_time)) {
|
|
char debug_msg[256];
|
|
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
|
|
const char* event_id = event_id_obj ? cJSON_GetStringValue(event_id_obj) : "unknown";
|
|
snprintf(debug_msg, sizeof(debug_msg), "Skipping broadcast of expired event: %.16s", event_id);
|
|
log_info(debug_msg);
|
|
return 0; // Don't broadcast expired events
|
|
}
|
|
}
|
|
|
|
int broadcasts = 0;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
subscription_t* sub = g_subscription_manager.active_subscriptions;
|
|
while (sub) {
|
|
if (sub->active && event_matches_subscription(event, sub)) {
|
|
// Create EVENT message for this subscription
|
|
cJSON* event_msg = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub->id));
|
|
cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1));
|
|
|
|
char* msg_str = cJSON_Print(event_msg);
|
|
if (msg_str) {
|
|
size_t msg_len = strlen(msg_str);
|
|
unsigned char* buf = malloc(LWS_PRE + msg_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, msg_str, msg_len);
|
|
|
|
// Send to WebSocket connection
|
|
int write_result = lws_write(sub->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
|
|
if (write_result >= 0) {
|
|
sub->events_sent++;
|
|
broadcasts++;
|
|
|
|
// Log event broadcast to database (optional - can be disabled for performance)
|
|
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
|
|
if (event_id_obj && cJSON_IsString(event_id_obj)) {
|
|
log_event_broadcast(cJSON_GetStringValue(event_id_obj), sub->id, sub->client_ip);
|
|
}
|
|
}
|
|
|
|
free(buf);
|
|
}
|
|
free(msg_str);
|
|
}
|
|
|
|
cJSON_Delete(event_msg);
|
|
}
|
|
|
|
sub = sub->next;
|
|
}
|
|
|
|
// Update global statistics
|
|
g_subscription_manager.total_events_broadcast += broadcasts;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
if (broadcasts > 0) {
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Broadcasted event to %d subscriptions", broadcasts);
|
|
log_info(debug_msg);
|
|
}
|
|
|
|
return broadcasts;
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// SUBSCRIPTION DATABASE LOGGING
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Log subscription creation to database
|
|
void log_subscription_created(const subscription_t* sub) {
|
|
if (!g_db || !sub) return;
|
|
|
|
// Create filter JSON for logging
|
|
char* filter_json = NULL;
|
|
if (sub->filters) {
|
|
cJSON* filters_array = cJSON_CreateArray();
|
|
subscription_filter_t* filter = sub->filters;
|
|
|
|
while (filter) {
|
|
cJSON* filter_obj = cJSON_CreateObject();
|
|
|
|
if (filter->kinds) {
|
|
cJSON_AddItemToObject(filter_obj, "kinds", cJSON_Duplicate(filter->kinds, 1));
|
|
}
|
|
if (filter->authors) {
|
|
cJSON_AddItemToObject(filter_obj, "authors", cJSON_Duplicate(filter->authors, 1));
|
|
}
|
|
if (filter->ids) {
|
|
cJSON_AddItemToObject(filter_obj, "ids", cJSON_Duplicate(filter->ids, 1));
|
|
}
|
|
if (filter->since > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "since", filter->since);
|
|
}
|
|
if (filter->until > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "until", filter->until);
|
|
}
|
|
if (filter->limit > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "limit", filter->limit);
|
|
}
|
|
if (filter->tag_filters) {
|
|
cJSON* tags_obj = cJSON_Duplicate(filter->tag_filters, 1);
|
|
cJSON* item = NULL;
|
|
cJSON_ArrayForEach(item, tags_obj) {
|
|
if (item->string) {
|
|
cJSON_AddItemToObject(filter_obj, item->string, cJSON_Duplicate(item, 1));
|
|
}
|
|
}
|
|
cJSON_Delete(tags_obj);
|
|
}
|
|
|
|
cJSON_AddItemToArray(filters_array, filter_obj);
|
|
filter = filter->next;
|
|
}
|
|
|
|
filter_json = cJSON_Print(filters_array);
|
|
cJSON_Delete(filters_array);
|
|
}
|
|
|
|
const char* sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) "
|
|
"VALUES (?, ?, 'created', ?)";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
|
|
if (filter_json) free(filter_json);
|
|
}
|
|
|
|
// Log subscription closure to database
|
|
void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason) {
|
|
(void)reason; // Mark as intentionally unused
|
|
if (!g_db || !sub_id) return;
|
|
|
|
const char* sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
|
"VALUES (?, ?, 'closed')";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, client_ip ? client_ip : "unknown", -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
|
|
// Update the corresponding 'created' entry with end time and events sent
|
|
const char* update_sql =
|
|
"UPDATE subscription_events "
|
|
"SET ended_at = strftime('%s', 'now') "
|
|
"WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL";
|
|
|
|
rc = sqlite3_prepare_v2(g_db, update_sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
|
|
// Log subscription disconnection to database
|
|
void log_subscription_disconnected(const char* client_ip) {
|
|
if (!g_db || !client_ip) return;
|
|
|
|
// Mark all active subscriptions for this client as disconnected
|
|
const char* sql =
|
|
"UPDATE subscription_events "
|
|
"SET ended_at = strftime('%s', 'now') "
|
|
"WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC);
|
|
int changes = sqlite3_changes(g_db);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
|
|
if (changes > 0) {
|
|
// Log a disconnection event
|
|
const char* insert_sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
|
"VALUES ('disconnect', ?, 'disconnected')";
|
|
|
|
rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Log event broadcast to database (optional, can be resource intensive)
|
|
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip) {
|
|
if (!g_db || !event_id || !sub_id || !client_ip) return;
|
|
|
|
const char* sql =
|
|
"INSERT INTO event_broadcasts (event_id, subscription_id, client_ip) "
|
|
"VALUES (?, ?, ?)";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 3, client_ip, -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
|
|
// Update events sent counter for a subscription
|
|
void update_subscription_events_sent(const char* sub_id, int events_sent) {
|
|
if (!g_db || !sub_id) return;
|
|
|
|
const char* sql =
|
|
"UPDATE subscription_events "
|
|
"SET events_sent = ? "
|
|
"WHERE subscription_id = ? AND event_type = 'created'";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_int(stmt, 1, events_sent);
|
|
sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|