1656 lines
61 KiB
C
1656 lines
61 KiB
C
|
|
|
|
#define _GNU_SOURCE
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <signal.h>
|
|
#include <time.h>
|
|
#include <pthread.h>
|
|
#include <sqlite3.h>
|
|
#include <libwebsockets.h>
|
|
|
|
// Include nostr_core_lib for Nostr functionality
|
|
#include "../nostr_core_lib/cjson/cJSON.h"
|
|
#include "../nostr_core_lib/nostr_core/nostr_core.h"
|
|
|
|
// Server Configuration
|
|
#define DEFAULT_PORT 8888
|
|
#define DEFAULT_HOST "127.0.0.1"
|
|
#define DATABASE_PATH "db/c_nostr_relay.db"
|
|
#define MAX_CLIENTS 100
|
|
|
|
// Persistent subscription system configuration
|
|
#define MAX_SUBSCRIPTIONS_PER_CLIENT 20
|
|
#define MAX_TOTAL_SUBSCRIPTIONS 5000
|
|
#define MAX_FILTERS_PER_SUBSCRIPTION 10
|
|
#define SUBSCRIPTION_ID_MAX_LENGTH 64
|
|
#define CLIENT_IP_MAX_LENGTH 64
|
|
|
|
// Color constants for logging
|
|
#define RED "\033[31m"
|
|
#define GREEN "\033[32m"
|
|
#define YELLOW "\033[33m"
|
|
#define BLUE "\033[34m"
|
|
#define BOLD "\033[1m"
|
|
#define RESET "\033[0m"
|
|
|
|
// Global state
|
|
static sqlite3* g_db = NULL;
|
|
static int g_server_running = 1;
|
|
static struct lws_context *ws_context = NULL;
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// DATA STRUCTURES
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Forward declarations
|
|
typedef struct subscription_filter subscription_filter_t;
|
|
typedef struct subscription subscription_t;
|
|
typedef struct subscription_manager subscription_manager_t;
|
|
|
|
// Subscription filter structure
|
|
struct subscription_filter {
|
|
// Filter criteria (all optional)
|
|
cJSON* kinds; // Array of event kinds [1,2,3]
|
|
cJSON* authors; // Array of author pubkeys
|
|
cJSON* ids; // Array of event IDs
|
|
long since; // Unix timestamp (0 = not set)
|
|
long until; // Unix timestamp (0 = not set)
|
|
int limit; // Result limit (0 = no limit)
|
|
cJSON* tag_filters; // Object with tag filters: {"#e": ["id1"], "#p": ["pubkey1"]}
|
|
|
|
// Linked list for multiple filters per subscription
|
|
struct subscription_filter* next;
|
|
};
|
|
|
|
// Active subscription structure
|
|
struct subscription {
|
|
char id[SUBSCRIPTION_ID_MAX_LENGTH]; // Subscription ID
|
|
struct lws* wsi; // WebSocket connection handle
|
|
subscription_filter_t* filters; // Linked list of filters (OR'd together)
|
|
time_t created_at; // When subscription was created
|
|
int events_sent; // Counter for sent events
|
|
int active; // 1 = active, 0 = closed
|
|
|
|
// Client info for logging
|
|
char client_ip[CLIENT_IP_MAX_LENGTH]; // Client IP address
|
|
|
|
// Linked list pointers
|
|
struct subscription* next; // Next subscription globally
|
|
struct subscription* session_next; // Next subscription for this session
|
|
};
|
|
|
|
// Enhanced per-session data with subscription management
|
|
struct per_session_data {
|
|
int authenticated;
|
|
subscription_t* subscriptions; // Head of this session's subscription list
|
|
pthread_mutex_t session_lock; // Per-session thread safety
|
|
char client_ip[CLIENT_IP_MAX_LENGTH]; // Client IP for logging
|
|
int subscription_count; // Number of subscriptions for this session
|
|
};
|
|
|
|
// Global subscription manager
|
|
struct subscription_manager {
|
|
subscription_t* active_subscriptions; // Head of global subscription list
|
|
pthread_mutex_t subscriptions_lock; // Global thread safety
|
|
int total_subscriptions; // Current count
|
|
|
|
// Configuration
|
|
int max_subscriptions_per_client; // Default: 20
|
|
int max_total_subscriptions; // Default: 5000
|
|
|
|
// Statistics
|
|
uint64_t total_created; // Lifetime subscription count
|
|
uint64_t total_events_broadcast; // Lifetime event broadcast count
|
|
};
|
|
|
|
// Global subscription manager instance
|
|
static subscription_manager_t g_subscription_manager = {
|
|
.active_subscriptions = NULL,
|
|
.subscriptions_lock = PTHREAD_MUTEX_INITIALIZER,
|
|
.total_subscriptions = 0,
|
|
.max_subscriptions_per_client = MAX_SUBSCRIPTIONS_PER_CLIENT,
|
|
.max_total_subscriptions = MAX_TOTAL_SUBSCRIPTIONS,
|
|
.total_created = 0,
|
|
.total_events_broadcast = 0
|
|
};
|
|
|
|
// Forward declarations for logging functions
|
|
void log_info(const char* message);
|
|
void log_success(const char* message);
|
|
void log_error(const char* message);
|
|
void log_warning(const char* message);
|
|
|
|
// Forward declarations for subscription database logging
|
|
void log_subscription_created(const subscription_t* sub);
|
|
void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason);
|
|
void log_subscription_disconnected(const char* client_ip);
|
|
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip);
|
|
void update_subscription_events_sent(const char* sub_id, int events_sent);
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// PERSISTENT SUBSCRIPTIONS SYSTEM
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Create a subscription filter from cJSON filter object
|
|
subscription_filter_t* create_subscription_filter(cJSON* filter_json) {
|
|
if (!filter_json || !cJSON_IsObject(filter_json)) {
|
|
return NULL;
|
|
}
|
|
|
|
subscription_filter_t* filter = calloc(1, sizeof(subscription_filter_t));
|
|
if (!filter) {
|
|
return NULL;
|
|
}
|
|
|
|
// Copy filter criteria
|
|
cJSON* kinds = cJSON_GetObjectItem(filter_json, "kinds");
|
|
if (kinds && cJSON_IsArray(kinds)) {
|
|
filter->kinds = cJSON_Duplicate(kinds, 1);
|
|
}
|
|
|
|
cJSON* authors = cJSON_GetObjectItem(filter_json, "authors");
|
|
if (authors && cJSON_IsArray(authors)) {
|
|
filter->authors = cJSON_Duplicate(authors, 1);
|
|
}
|
|
|
|
cJSON* ids = cJSON_GetObjectItem(filter_json, "ids");
|
|
if (ids && cJSON_IsArray(ids)) {
|
|
filter->ids = cJSON_Duplicate(ids, 1);
|
|
}
|
|
|
|
cJSON* since = cJSON_GetObjectItem(filter_json, "since");
|
|
if (since && cJSON_IsNumber(since)) {
|
|
filter->since = (long)cJSON_GetNumberValue(since);
|
|
}
|
|
|
|
cJSON* until = cJSON_GetObjectItem(filter_json, "until");
|
|
if (until && cJSON_IsNumber(until)) {
|
|
filter->until = (long)cJSON_GetNumberValue(until);
|
|
}
|
|
|
|
cJSON* limit = cJSON_GetObjectItem(filter_json, "limit");
|
|
if (limit && cJSON_IsNumber(limit)) {
|
|
filter->limit = (int)cJSON_GetNumberValue(limit);
|
|
}
|
|
|
|
// Handle tag filters (e.g., {"#e": ["id1"], "#p": ["pubkey1"]})
|
|
cJSON* item = NULL;
|
|
cJSON_ArrayForEach(item, filter_json) {
|
|
if (item->string && strlen(item->string) >= 2 && item->string[0] == '#') {
|
|
if (!filter->tag_filters) {
|
|
filter->tag_filters = cJSON_CreateObject();
|
|
}
|
|
if (filter->tag_filters) {
|
|
cJSON_AddItemToObject(filter->tag_filters, item->string, cJSON_Duplicate(item, 1));
|
|
}
|
|
}
|
|
}
|
|
|
|
return filter;
|
|
}
|
|
|
|
// Free a subscription filter
|
|
void free_subscription_filter(subscription_filter_t* filter) {
|
|
if (!filter) return;
|
|
|
|
if (filter->kinds) cJSON_Delete(filter->kinds);
|
|
if (filter->authors) cJSON_Delete(filter->authors);
|
|
if (filter->ids) cJSON_Delete(filter->ids);
|
|
if (filter->tag_filters) cJSON_Delete(filter->tag_filters);
|
|
|
|
if (filter->next) {
|
|
free_subscription_filter(filter->next);
|
|
}
|
|
|
|
free(filter);
|
|
}
|
|
|
|
// Create a new subscription
|
|
subscription_t* create_subscription(const char* sub_id, struct lws* wsi, cJSON* filters_array, const char* client_ip) {
|
|
if (!sub_id || !wsi || !filters_array) {
|
|
return NULL;
|
|
}
|
|
|
|
subscription_t* sub = calloc(1, sizeof(subscription_t));
|
|
if (!sub) {
|
|
return NULL;
|
|
}
|
|
|
|
// Copy subscription ID (truncate if too long)
|
|
strncpy(sub->id, sub_id, SUBSCRIPTION_ID_MAX_LENGTH - 1);
|
|
sub->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
|
|
|
|
// Set WebSocket connection
|
|
sub->wsi = wsi;
|
|
|
|
// Set client IP
|
|
if (client_ip) {
|
|
strncpy(sub->client_ip, client_ip, CLIENT_IP_MAX_LENGTH - 1);
|
|
sub->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0';
|
|
}
|
|
|
|
// Set timestamps and state
|
|
sub->created_at = time(NULL);
|
|
sub->events_sent = 0;
|
|
sub->active = 1;
|
|
|
|
// Convert filters array to linked list
|
|
subscription_filter_t* filter_tail = NULL;
|
|
int filter_count = 0;
|
|
|
|
if (cJSON_IsArray(filters_array)) {
|
|
cJSON* filter_json = NULL;
|
|
cJSON_ArrayForEach(filter_json, filters_array) {
|
|
if (filter_count >= MAX_FILTERS_PER_SUBSCRIPTION) {
|
|
log_warning("Maximum filters per subscription exceeded, ignoring excess filters");
|
|
break;
|
|
}
|
|
|
|
subscription_filter_t* filter = create_subscription_filter(filter_json);
|
|
if (filter) {
|
|
if (!sub->filters) {
|
|
sub->filters = filter;
|
|
filter_tail = filter;
|
|
} else {
|
|
filter_tail->next = filter;
|
|
filter_tail = filter;
|
|
}
|
|
filter_count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (filter_count == 0) {
|
|
log_error("No valid filters found for subscription");
|
|
free(sub);
|
|
return NULL;
|
|
}
|
|
|
|
return sub;
|
|
}
|
|
|
|
// Free a subscription
|
|
void free_subscription(subscription_t* sub) {
|
|
if (!sub) return;
|
|
|
|
if (sub->filters) {
|
|
free_subscription_filter(sub->filters);
|
|
}
|
|
|
|
free(sub);
|
|
}
|
|
|
|
// Add subscription to global manager (thread-safe)
|
|
int add_subscription_to_manager(subscription_t* sub) {
|
|
if (!sub) return -1;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Check global limits
|
|
if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
log_error("Maximum total subscriptions reached");
|
|
return -1;
|
|
}
|
|
|
|
// Add to global list
|
|
sub->next = g_subscription_manager.active_subscriptions;
|
|
g_subscription_manager.active_subscriptions = sub;
|
|
g_subscription_manager.total_subscriptions++;
|
|
g_subscription_manager.total_created++;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Log subscription creation to database
|
|
log_subscription_created(sub);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Added subscription '%s' (total: %d)",
|
|
sub->id, g_subscription_manager.total_subscriptions);
|
|
log_info(debug_msg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Remove subscription from global manager (thread-safe)
|
|
int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
|
|
if (!sub_id) return -1;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
subscription_t** current = &g_subscription_manager.active_subscriptions;
|
|
|
|
while (*current) {
|
|
subscription_t* sub = *current;
|
|
|
|
// Match by ID and WebSocket connection
|
|
if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) {
|
|
// Remove from list
|
|
*current = sub->next;
|
|
g_subscription_manager.total_subscriptions--;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
// Log subscription closure to database
|
|
log_subscription_closed(sub_id, sub->client_ip, "closed");
|
|
|
|
// Update events sent counter before freeing
|
|
update_subscription_events_sent(sub_id, sub->events_sent);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Removed subscription '%s' (total: %d)",
|
|
sub_id, g_subscription_manager.total_subscriptions);
|
|
log_info(debug_msg);
|
|
|
|
free_subscription(sub);
|
|
return 0;
|
|
}
|
|
|
|
current = &(sub->next);
|
|
}
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Subscription '%s' not found for removal", sub_id);
|
|
log_warning(debug_msg);
|
|
|
|
return -1;
|
|
}
|
|
|
|
// Check if an event matches a subscription filter
|
|
int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
|
|
if (!event || !filter) {
|
|
return 0;
|
|
}
|
|
|
|
// Check kinds filter
|
|
if (filter->kinds && cJSON_IsArray(filter->kinds)) {
|
|
cJSON* event_kind = cJSON_GetObjectItem(event, "kind");
|
|
if (!event_kind || !cJSON_IsNumber(event_kind)) {
|
|
return 0;
|
|
}
|
|
|
|
int event_kind_val = (int)cJSON_GetNumberValue(event_kind);
|
|
int kind_match = 0;
|
|
|
|
cJSON* kind_item = NULL;
|
|
cJSON_ArrayForEach(kind_item, filter->kinds) {
|
|
if (cJSON_IsNumber(kind_item) && (int)cJSON_GetNumberValue(kind_item) == event_kind_val) {
|
|
kind_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!kind_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check authors filter
|
|
if (filter->authors && cJSON_IsArray(filter->authors)) {
|
|
cJSON* event_pubkey = cJSON_GetObjectItem(event, "pubkey");
|
|
if (!event_pubkey || !cJSON_IsString(event_pubkey)) {
|
|
return 0;
|
|
}
|
|
|
|
const char* event_pubkey_str = cJSON_GetStringValue(event_pubkey);
|
|
int author_match = 0;
|
|
|
|
cJSON* author_item = NULL;
|
|
cJSON_ArrayForEach(author_item, filter->authors) {
|
|
if (cJSON_IsString(author_item)) {
|
|
const char* author_str = cJSON_GetStringValue(author_item);
|
|
// Support prefix matching (partial pubkeys)
|
|
if (strncmp(event_pubkey_str, author_str, strlen(author_str)) == 0) {
|
|
author_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!author_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check IDs filter
|
|
if (filter->ids && cJSON_IsArray(filter->ids)) {
|
|
cJSON* event_id = cJSON_GetObjectItem(event, "id");
|
|
if (!event_id || !cJSON_IsString(event_id)) {
|
|
return 0;
|
|
}
|
|
|
|
const char* event_id_str = cJSON_GetStringValue(event_id);
|
|
int id_match = 0;
|
|
|
|
cJSON* id_item = NULL;
|
|
cJSON_ArrayForEach(id_item, filter->ids) {
|
|
if (cJSON_IsString(id_item)) {
|
|
const char* id_str = cJSON_GetStringValue(id_item);
|
|
// Support prefix matching (partial IDs)
|
|
if (strncmp(event_id_str, id_str, strlen(id_str)) == 0) {
|
|
id_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!id_match) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check since filter
|
|
if (filter->since > 0) {
|
|
cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at");
|
|
if (!event_created_at || !cJSON_IsNumber(event_created_at)) {
|
|
return 0;
|
|
}
|
|
|
|
long event_timestamp = (long)cJSON_GetNumberValue(event_created_at);
|
|
if (event_timestamp < filter->since) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check until filter
|
|
if (filter->until > 0) {
|
|
cJSON* event_created_at = cJSON_GetObjectItem(event, "created_at");
|
|
if (!event_created_at || !cJSON_IsNumber(event_created_at)) {
|
|
return 0;
|
|
}
|
|
|
|
long event_timestamp = (long)cJSON_GetNumberValue(event_created_at);
|
|
if (event_timestamp > filter->until) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// Check tag filters (e.g., #e, #p tags)
|
|
if (filter->tag_filters && cJSON_IsObject(filter->tag_filters)) {
|
|
cJSON* event_tags = cJSON_GetObjectItem(event, "tags");
|
|
if (!event_tags || !cJSON_IsArray(event_tags)) {
|
|
return 0; // Event has no tags but filter requires tags
|
|
}
|
|
|
|
// Check each tag filter
|
|
cJSON* tag_filter = NULL;
|
|
cJSON_ArrayForEach(tag_filter, filter->tag_filters) {
|
|
if (!tag_filter->string || strlen(tag_filter->string) < 2 || tag_filter->string[0] != '#') {
|
|
continue; // Invalid tag filter
|
|
}
|
|
|
|
const char* tag_name = tag_filter->string + 1; // Skip the '#'
|
|
|
|
if (!cJSON_IsArray(tag_filter)) {
|
|
continue; // Tag filter must be an array
|
|
}
|
|
|
|
int tag_match = 0;
|
|
|
|
// Search through event tags for matching tag name and value
|
|
cJSON* event_tag = NULL;
|
|
cJSON_ArrayForEach(event_tag, event_tags) {
|
|
if (!cJSON_IsArray(event_tag) || cJSON_GetArraySize(event_tag) < 2) {
|
|
continue; // Invalid tag format
|
|
}
|
|
|
|
cJSON* event_tag_name = cJSON_GetArrayItem(event_tag, 0);
|
|
cJSON* event_tag_value = cJSON_GetArrayItem(event_tag, 1);
|
|
|
|
if (!cJSON_IsString(event_tag_name) || !cJSON_IsString(event_tag_value)) {
|
|
continue;
|
|
}
|
|
|
|
// Check if tag name matches
|
|
if (strcmp(cJSON_GetStringValue(event_tag_name), tag_name) == 0) {
|
|
const char* event_tag_value_str = cJSON_GetStringValue(event_tag_value);
|
|
|
|
// Check if any of the filter values match this tag value
|
|
cJSON* filter_value = NULL;
|
|
cJSON_ArrayForEach(filter_value, tag_filter) {
|
|
if (cJSON_IsString(filter_value)) {
|
|
const char* filter_value_str = cJSON_GetStringValue(filter_value);
|
|
// Support prefix matching for tag values
|
|
if (strncmp(event_tag_value_str, filter_value_str, strlen(filter_value_str)) == 0) {
|
|
tag_match = 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (tag_match) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!tag_match) {
|
|
return 0; // This tag filter didn't match, so the event doesn't match
|
|
}
|
|
}
|
|
}
|
|
|
|
return 1; // All filters passed
|
|
}
|
|
|
|
// Check if an event matches any filter in a subscription (filters are OR'd together)
|
|
int event_matches_subscription(cJSON* event, subscription_t* subscription) {
|
|
if (!event || !subscription || !subscription->filters) {
|
|
return 0;
|
|
}
|
|
|
|
subscription_filter_t* filter = subscription->filters;
|
|
while (filter) {
|
|
if (event_matches_filter(event, filter)) {
|
|
return 1; // Match found (OR logic)
|
|
}
|
|
filter = filter->next;
|
|
}
|
|
|
|
return 0; // No filters matched
|
|
}
|
|
|
|
// Broadcast event to all matching subscriptions (thread-safe)
|
|
int broadcast_event_to_subscriptions(cJSON* event) {
|
|
if (!event) {
|
|
return 0;
|
|
}
|
|
|
|
int broadcasts = 0;
|
|
|
|
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
subscription_t* sub = g_subscription_manager.active_subscriptions;
|
|
while (sub) {
|
|
if (sub->active && event_matches_subscription(event, sub)) {
|
|
// Create EVENT message for this subscription
|
|
cJSON* event_msg = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub->id));
|
|
cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1));
|
|
|
|
char* msg_str = cJSON_Print(event_msg);
|
|
if (msg_str) {
|
|
size_t msg_len = strlen(msg_str);
|
|
unsigned char* buf = malloc(LWS_PRE + msg_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, msg_str, msg_len);
|
|
|
|
// Send to WebSocket connection
|
|
int write_result = lws_write(sub->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
|
|
if (write_result >= 0) {
|
|
sub->events_sent++;
|
|
broadcasts++;
|
|
|
|
// Log event broadcast to database (optional - can be disabled for performance)
|
|
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
|
|
if (event_id_obj && cJSON_IsString(event_id_obj)) {
|
|
log_event_broadcast(cJSON_GetStringValue(event_id_obj), sub->id, sub->client_ip);
|
|
}
|
|
}
|
|
|
|
free(buf);
|
|
}
|
|
free(msg_str);
|
|
}
|
|
|
|
cJSON_Delete(event_msg);
|
|
}
|
|
|
|
sub = sub->next;
|
|
}
|
|
|
|
// Update global statistics
|
|
g_subscription_manager.total_events_broadcast += broadcasts;
|
|
|
|
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
|
|
|
if (broadcasts > 0) {
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Broadcasted event to %d subscriptions", broadcasts);
|
|
log_info(debug_msg);
|
|
}
|
|
|
|
return broadcasts;
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// SUBSCRIPTION DATABASE LOGGING
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Log subscription creation to database
|
|
void log_subscription_created(const subscription_t* sub) {
|
|
if (!g_db || !sub) return;
|
|
|
|
// Create filter JSON for logging
|
|
char* filter_json = NULL;
|
|
if (sub->filters) {
|
|
cJSON* filters_array = cJSON_CreateArray();
|
|
subscription_filter_t* filter = sub->filters;
|
|
|
|
while (filter) {
|
|
cJSON* filter_obj = cJSON_CreateObject();
|
|
|
|
if (filter->kinds) {
|
|
cJSON_AddItemToObject(filter_obj, "kinds", cJSON_Duplicate(filter->kinds, 1));
|
|
}
|
|
if (filter->authors) {
|
|
cJSON_AddItemToObject(filter_obj, "authors", cJSON_Duplicate(filter->authors, 1));
|
|
}
|
|
if (filter->ids) {
|
|
cJSON_AddItemToObject(filter_obj, "ids", cJSON_Duplicate(filter->ids, 1));
|
|
}
|
|
if (filter->since > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "since", filter->since);
|
|
}
|
|
if (filter->until > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "until", filter->until);
|
|
}
|
|
if (filter->limit > 0) {
|
|
cJSON_AddNumberToObject(filter_obj, "limit", filter->limit);
|
|
}
|
|
if (filter->tag_filters) {
|
|
cJSON* tags_obj = cJSON_Duplicate(filter->tag_filters, 1);
|
|
cJSON* item = NULL;
|
|
cJSON_ArrayForEach(item, tags_obj) {
|
|
if (item->string) {
|
|
cJSON_AddItemToObject(filter_obj, item->string, cJSON_Duplicate(item, 1));
|
|
}
|
|
}
|
|
cJSON_Delete(tags_obj);
|
|
}
|
|
|
|
cJSON_AddItemToArray(filters_array, filter_obj);
|
|
filter = filter->next;
|
|
}
|
|
|
|
filter_json = cJSON_Print(filters_array);
|
|
cJSON_Delete(filters_array);
|
|
}
|
|
|
|
const char* sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) "
|
|
"VALUES (?, ?, 'created', ?)";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
|
|
if (filter_json) free(filter_json);
|
|
}
|
|
|
|
// Log subscription closure to database
|
|
void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason) {
|
|
(void)reason; // Mark as intentionally unused
|
|
if (!g_db || !sub_id) return;
|
|
|
|
const char* sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
|
"VALUES (?, ?, 'closed')";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, client_ip ? client_ip : "unknown", -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
|
|
// Update the corresponding 'created' entry with end time and events sent
|
|
const char* update_sql =
|
|
"UPDATE subscription_events "
|
|
"SET ended_at = strftime('%s', 'now') "
|
|
"WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL";
|
|
|
|
rc = sqlite3_prepare_v2(g_db, update_sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
|
|
// Log subscription disconnection to database
|
|
void log_subscription_disconnected(const char* client_ip) {
|
|
if (!g_db || !client_ip) return;
|
|
|
|
// Mark all active subscriptions for this client as disconnected
|
|
const char* sql =
|
|
"UPDATE subscription_events "
|
|
"SET ended_at = strftime('%s', 'now') "
|
|
"WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC);
|
|
int changes = sqlite3_changes(g_db);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
|
|
if (changes > 0) {
|
|
// Log a disconnection event
|
|
const char* insert_sql =
|
|
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
|
"VALUES ('disconnect', ?, 'disconnected')";
|
|
|
|
rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, client_ip, -1, SQLITE_STATIC);
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Log event broadcast to database (optional, can be resource intensive)
|
|
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip) {
|
|
if (!g_db || !event_id || !sub_id || !client_ip) return;
|
|
|
|
const char* sql =
|
|
"INSERT INTO event_broadcasts (event_id, subscription_id, client_ip) "
|
|
"VALUES (?, ?, ?)";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 3, client_ip, -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
|
|
// Update events sent counter for a subscription
|
|
void update_subscription_events_sent(const char* sub_id, int events_sent) {
|
|
if (!g_db || !sub_id) return;
|
|
|
|
const char* sql =
|
|
"UPDATE subscription_events "
|
|
"SET events_sent = ? "
|
|
"WHERE subscription_id = ? AND event_type = 'created'";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc == SQLITE_OK) {
|
|
sqlite3_bind_int(stmt, 1, events_sent);
|
|
sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC);
|
|
|
|
sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// LOGGING FUNCTIONS
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Logging functions
|
|
void log_info(const char* message) {
|
|
printf(BLUE "[INFO]" RESET " %s\n", message);
|
|
fflush(stdout);
|
|
}
|
|
|
|
void log_success(const char* message) {
|
|
printf(GREEN "[SUCCESS]" RESET " %s\n", message);
|
|
fflush(stdout);
|
|
}
|
|
|
|
void log_error(const char* message) {
|
|
printf(RED "[ERROR]" RESET " %s\n", message);
|
|
fflush(stdout);
|
|
}
|
|
|
|
void log_warning(const char* message) {
|
|
printf(YELLOW "[WARNING]" RESET " %s\n", message);
|
|
fflush(stdout);
|
|
}
|
|
|
|
// Signal handler for graceful shutdown
|
|
void signal_handler(int sig) {
|
|
if (sig == SIGINT || sig == SIGTERM) {
|
|
log_info("Received shutdown signal");
|
|
g_server_running = 0;
|
|
}
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// DATABASE FUNCTIONS
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Initialize database connection
|
|
int init_database() {
|
|
int rc = sqlite3_open(DATABASE_PATH, &g_db);
|
|
if (rc != SQLITE_OK) {
|
|
log_error("Cannot open database");
|
|
return -1;
|
|
}
|
|
|
|
log_success("Database connection established");
|
|
return 0;
|
|
}
|
|
|
|
// Close database connection
|
|
void close_database() {
|
|
if (g_db) {
|
|
sqlite3_close(g_db);
|
|
g_db = NULL;
|
|
log_info("Database connection closed");
|
|
}
|
|
}
|
|
|
|
// Event type classification
|
|
typedef enum {
|
|
EVENT_TYPE_REGULAR,
|
|
EVENT_TYPE_REPLACEABLE,
|
|
EVENT_TYPE_EPHEMERAL,
|
|
EVENT_TYPE_ADDRESSABLE,
|
|
EVENT_TYPE_UNKNOWN
|
|
} event_type_t;
|
|
|
|
event_type_t classify_event_kind(int kind) {
|
|
if ((kind >= 1000 && kind < 10000) ||
|
|
(kind >= 4 && kind < 45) ||
|
|
kind == 1 || kind == 2) {
|
|
return EVENT_TYPE_REGULAR;
|
|
}
|
|
if ((kind >= 10000 && kind < 20000) ||
|
|
kind == 0 || kind == 3) {
|
|
return EVENT_TYPE_REPLACEABLE;
|
|
}
|
|
if (kind >= 20000 && kind < 30000) {
|
|
return EVENT_TYPE_EPHEMERAL;
|
|
}
|
|
if (kind >= 30000 && kind < 40000) {
|
|
return EVENT_TYPE_ADDRESSABLE;
|
|
}
|
|
return EVENT_TYPE_UNKNOWN;
|
|
}
|
|
|
|
const char* event_type_to_string(event_type_t type) {
|
|
switch (type) {
|
|
case EVENT_TYPE_REGULAR: return "regular";
|
|
case EVENT_TYPE_REPLACEABLE: return "replaceable";
|
|
case EVENT_TYPE_EPHEMERAL: return "ephemeral";
|
|
case EVENT_TYPE_ADDRESSABLE: return "addressable";
|
|
default: return "unknown";
|
|
}
|
|
}
|
|
|
|
// Store event in database
|
|
int store_event(cJSON* event) {
|
|
if (!g_db || !event) {
|
|
return -1;
|
|
}
|
|
|
|
// Extract event fields
|
|
cJSON* id = cJSON_GetObjectItem(event, "id");
|
|
cJSON* pubkey = cJSON_GetObjectItem(event, "pubkey");
|
|
cJSON* created_at = cJSON_GetObjectItem(event, "created_at");
|
|
cJSON* kind = cJSON_GetObjectItem(event, "kind");
|
|
cJSON* content = cJSON_GetObjectItem(event, "content");
|
|
cJSON* sig = cJSON_GetObjectItem(event, "sig");
|
|
cJSON* tags = cJSON_GetObjectItem(event, "tags");
|
|
|
|
if (!id || !pubkey || !created_at || !kind || !content || !sig) {
|
|
log_error("Invalid event - missing required fields");
|
|
return -1;
|
|
}
|
|
|
|
// Classify event type
|
|
event_type_t type = classify_event_kind((int)cJSON_GetNumberValue(kind));
|
|
|
|
// Serialize tags to JSON (use empty array if no tags)
|
|
char* tags_json = NULL;
|
|
if (tags && cJSON_IsArray(tags)) {
|
|
tags_json = cJSON_Print(tags);
|
|
} else {
|
|
tags_json = strdup("[]");
|
|
}
|
|
|
|
if (!tags_json) {
|
|
log_error("Failed to serialize tags to JSON");
|
|
return -1;
|
|
}
|
|
|
|
// Prepare SQL statement for event insertion
|
|
const char* sql =
|
|
"INSERT INTO events (id, pubkey, created_at, kind, event_type, content, sig, tags) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc != SQLITE_OK) {
|
|
log_error("Failed to prepare event insert statement");
|
|
free(tags_json);
|
|
return -1;
|
|
}
|
|
|
|
// Bind parameters
|
|
sqlite3_bind_text(stmt, 1, cJSON_GetStringValue(id), -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 2, cJSON_GetStringValue(pubkey), -1, SQLITE_STATIC);
|
|
sqlite3_bind_int64(stmt, 3, (sqlite3_int64)cJSON_GetNumberValue(created_at));
|
|
sqlite3_bind_int(stmt, 4, (int)cJSON_GetNumberValue(kind));
|
|
sqlite3_bind_text(stmt, 5, event_type_to_string(type), -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 6, cJSON_GetStringValue(content), -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 7, cJSON_GetStringValue(sig), -1, SQLITE_STATIC);
|
|
sqlite3_bind_text(stmt, 8, tags_json, -1, SQLITE_TRANSIENT);
|
|
|
|
// Execute statement
|
|
rc = sqlite3_step(stmt);
|
|
sqlite3_finalize(stmt);
|
|
|
|
if (rc != SQLITE_DONE) {
|
|
if (rc == SQLITE_CONSTRAINT) {
|
|
log_warning("Event already exists in database");
|
|
free(tags_json);
|
|
return 0; // Not an error, just duplicate
|
|
}
|
|
char error_msg[256];
|
|
snprintf(error_msg, sizeof(error_msg), "Failed to insert event: %s", sqlite3_errmsg(g_db));
|
|
log_error(error_msg);
|
|
free(tags_json);
|
|
return -1;
|
|
}
|
|
|
|
free(tags_json);
|
|
log_success("Event stored in database");
|
|
return 0;
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// EVENT STORAGE AND RETRIEVAL
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
cJSON* retrieve_event(const char* event_id) {
|
|
if (!g_db || !event_id) {
|
|
return NULL;
|
|
}
|
|
|
|
const char* sql =
|
|
"SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE id = ?";
|
|
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc != SQLITE_OK) {
|
|
return NULL;
|
|
}
|
|
|
|
sqlite3_bind_text(stmt, 1, event_id, -1, SQLITE_STATIC);
|
|
|
|
cJSON* event = NULL;
|
|
if (sqlite3_step(stmt) == SQLITE_ROW) {
|
|
event = cJSON_CreateObject();
|
|
|
|
cJSON_AddStringToObject(event, "id", (char*)sqlite3_column_text(stmt, 0));
|
|
cJSON_AddStringToObject(event, "pubkey", (char*)sqlite3_column_text(stmt, 1));
|
|
cJSON_AddNumberToObject(event, "created_at", sqlite3_column_int64(stmt, 2));
|
|
cJSON_AddNumberToObject(event, "kind", sqlite3_column_int(stmt, 3));
|
|
cJSON_AddStringToObject(event, "content", (char*)sqlite3_column_text(stmt, 4));
|
|
cJSON_AddStringToObject(event, "sig", (char*)sqlite3_column_text(stmt, 5));
|
|
|
|
// Parse tags JSON
|
|
const char* tags_json = (char*)sqlite3_column_text(stmt, 6);
|
|
if (tags_json) {
|
|
cJSON* tags = cJSON_Parse(tags_json);
|
|
if (tags) {
|
|
cJSON_AddItemToObject(event, "tags", tags);
|
|
} else {
|
|
cJSON_AddItemToObject(event, "tags", cJSON_CreateArray());
|
|
}
|
|
} else {
|
|
cJSON_AddItemToObject(event, "tags", cJSON_CreateArray());
|
|
}
|
|
}
|
|
|
|
sqlite3_finalize(stmt);
|
|
return event;
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// SUBSCRIPTION HANDLERS
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
int handle_req_message(const char* sub_id, cJSON* filters, struct lws *wsi, struct per_session_data *pss) {
|
|
log_info("Handling REQ message for persistent subscription");
|
|
|
|
if (!cJSON_IsArray(filters)) {
|
|
log_error("REQ filters is not an array");
|
|
return 0;
|
|
}
|
|
|
|
// Check session subscription limits
|
|
if (pss && pss->subscription_count >= MAX_SUBSCRIPTIONS_PER_CLIENT) {
|
|
log_error("Maximum subscriptions per client exceeded");
|
|
|
|
// Send CLOSED notice
|
|
cJSON* closed_msg = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString("CLOSED"));
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString(sub_id));
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString("error: too many subscriptions"));
|
|
|
|
char* closed_str = cJSON_Print(closed_msg);
|
|
if (closed_str) {
|
|
size_t closed_len = strlen(closed_str);
|
|
unsigned char* buf = malloc(LWS_PRE + closed_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, closed_str, closed_len);
|
|
lws_write(wsi, buf + LWS_PRE, closed_len, LWS_WRITE_TEXT);
|
|
free(buf);
|
|
}
|
|
free(closed_str);
|
|
}
|
|
cJSON_Delete(closed_msg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Create persistent subscription
|
|
subscription_t* subscription = create_subscription(sub_id, wsi, filters, pss ? pss->client_ip : "unknown");
|
|
if (!subscription) {
|
|
log_error("Failed to create subscription");
|
|
return 0;
|
|
}
|
|
|
|
// Add to global manager
|
|
if (add_subscription_to_manager(subscription) != 0) {
|
|
log_error("Failed to add subscription to global manager");
|
|
free_subscription(subscription);
|
|
|
|
// Send CLOSED notice
|
|
cJSON* closed_msg = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString("CLOSED"));
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString(sub_id));
|
|
cJSON_AddItemToArray(closed_msg, cJSON_CreateString("error: subscription limit reached"));
|
|
|
|
char* closed_str = cJSON_Print(closed_msg);
|
|
if (closed_str) {
|
|
size_t closed_len = strlen(closed_str);
|
|
unsigned char* buf = malloc(LWS_PRE + closed_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, closed_str, closed_len);
|
|
lws_write(wsi, buf + LWS_PRE, closed_len, LWS_WRITE_TEXT);
|
|
free(buf);
|
|
}
|
|
free(closed_str);
|
|
}
|
|
cJSON_Delete(closed_msg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Add to session's subscription list (if session data available)
|
|
if (pss) {
|
|
pthread_mutex_lock(&pss->session_lock);
|
|
subscription->session_next = pss->subscriptions;
|
|
pss->subscriptions = subscription;
|
|
pss->subscription_count++;
|
|
pthread_mutex_unlock(&pss->session_lock);
|
|
}
|
|
|
|
int events_sent = 0;
|
|
|
|
// Process each filter in the array
|
|
for (int i = 0; i < cJSON_GetArraySize(filters); i++) {
|
|
cJSON* filter = cJSON_GetArrayItem(filters, i);
|
|
if (!filter || !cJSON_IsObject(filter)) {
|
|
log_warning("Invalid filter object");
|
|
continue;
|
|
}
|
|
|
|
// Build SQL query based on filter
|
|
char sql[1024] = "SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1";
|
|
char* sql_ptr = sql + strlen(sql);
|
|
int remaining = sizeof(sql) - strlen(sql);
|
|
|
|
// Handle kinds filter
|
|
cJSON* kinds = cJSON_GetObjectItem(filter, "kinds");
|
|
if (kinds && cJSON_IsArray(kinds)) {
|
|
int kind_count = cJSON_GetArraySize(kinds);
|
|
if (kind_count > 0) {
|
|
snprintf(sql_ptr, remaining, " AND kind IN (");
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
|
|
for (int k = 0; k < kind_count; k++) {
|
|
cJSON* kind = cJSON_GetArrayItem(kinds, k);
|
|
if (cJSON_IsNumber(kind)) {
|
|
if (k > 0) {
|
|
snprintf(sql_ptr, remaining, ",");
|
|
sql_ptr++;
|
|
remaining--;
|
|
}
|
|
snprintf(sql_ptr, remaining, "%d", (int)cJSON_GetNumberValue(kind));
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
}
|
|
snprintf(sql_ptr, remaining, ")");
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
}
|
|
|
|
// Handle authors filter
|
|
cJSON* authors = cJSON_GetObjectItem(filter, "authors");
|
|
if (authors && cJSON_IsArray(authors)) {
|
|
int author_count = cJSON_GetArraySize(authors);
|
|
if (author_count > 0) {
|
|
snprintf(sql_ptr, remaining, " AND pubkey IN (");
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
|
|
for (int a = 0; a < author_count; a++) {
|
|
cJSON* author = cJSON_GetArrayItem(authors, a);
|
|
if (cJSON_IsString(author)) {
|
|
if (a > 0) {
|
|
snprintf(sql_ptr, remaining, ",");
|
|
sql_ptr++;
|
|
remaining--;
|
|
}
|
|
snprintf(sql_ptr, remaining, "'%s'", cJSON_GetStringValue(author));
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
}
|
|
snprintf(sql_ptr, remaining, ")");
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
}
|
|
|
|
// Handle since filter
|
|
cJSON* since = cJSON_GetObjectItem(filter, "since");
|
|
if (since && cJSON_IsNumber(since)) {
|
|
snprintf(sql_ptr, remaining, " AND created_at >= %ld", (long)cJSON_GetNumberValue(since));
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
|
|
// Handle until filter
|
|
cJSON* until = cJSON_GetObjectItem(filter, "until");
|
|
if (until && cJSON_IsNumber(until)) {
|
|
snprintf(sql_ptr, remaining, " AND created_at <= %ld", (long)cJSON_GetNumberValue(until));
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
}
|
|
|
|
// Add ordering and limit
|
|
snprintf(sql_ptr, remaining, " ORDER BY created_at DESC");
|
|
sql_ptr += strlen(sql_ptr);
|
|
remaining = sizeof(sql) - strlen(sql);
|
|
|
|
// Handle limit filter
|
|
cJSON* limit = cJSON_GetObjectItem(filter, "limit");
|
|
if (limit && cJSON_IsNumber(limit)) {
|
|
int limit_val = (int)cJSON_GetNumberValue(limit);
|
|
if (limit_val > 0 && limit_val <= 5000) {
|
|
snprintf(sql_ptr, remaining, " LIMIT %d", limit_val);
|
|
}
|
|
} else {
|
|
// Default limit to prevent excessive queries
|
|
snprintf(sql_ptr, remaining, " LIMIT 500");
|
|
}
|
|
|
|
// Debug: Log the SQL query being executed
|
|
char debug_msg[1280];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Executing SQL: %s", sql);
|
|
log_info(debug_msg);
|
|
|
|
// Execute query and send events
|
|
sqlite3_stmt* stmt;
|
|
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
|
if (rc != SQLITE_OK) {
|
|
char error_msg[256];
|
|
snprintf(error_msg, sizeof(error_msg), "Failed to prepare subscription query: %s", sqlite3_errmsg(g_db));
|
|
log_error(error_msg);
|
|
continue;
|
|
}
|
|
|
|
int row_count = 0;
|
|
while (sqlite3_step(stmt) == SQLITE_ROW) {
|
|
row_count++;
|
|
|
|
// Build event JSON
|
|
cJSON* event = cJSON_CreateObject();
|
|
cJSON_AddStringToObject(event, "id", (char*)sqlite3_column_text(stmt, 0));
|
|
cJSON_AddStringToObject(event, "pubkey", (char*)sqlite3_column_text(stmt, 1));
|
|
cJSON_AddNumberToObject(event, "created_at", sqlite3_column_int64(stmt, 2));
|
|
cJSON_AddNumberToObject(event, "kind", sqlite3_column_int(stmt, 3));
|
|
cJSON_AddStringToObject(event, "content", (char*)sqlite3_column_text(stmt, 4));
|
|
cJSON_AddStringToObject(event, "sig", (char*)sqlite3_column_text(stmt, 5));
|
|
|
|
// Parse tags JSON
|
|
const char* tags_json = (char*)sqlite3_column_text(stmt, 6);
|
|
cJSON* tags = NULL;
|
|
if (tags_json) {
|
|
tags = cJSON_Parse(tags_json);
|
|
}
|
|
if (!tags) {
|
|
tags = cJSON_CreateArray();
|
|
}
|
|
cJSON_AddItemToObject(event, "tags", tags);
|
|
|
|
// Send EVENT message
|
|
cJSON* event_msg = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
|
|
cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub_id));
|
|
cJSON_AddItemToArray(event_msg, event);
|
|
|
|
char* msg_str = cJSON_Print(event_msg);
|
|
if (msg_str) {
|
|
size_t msg_len = strlen(msg_str);
|
|
unsigned char* buf = malloc(LWS_PRE + msg_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, msg_str, msg_len);
|
|
lws_write(wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
|
|
free(buf);
|
|
}
|
|
free(msg_str);
|
|
}
|
|
|
|
cJSON_Delete(event_msg);
|
|
events_sent++;
|
|
}
|
|
|
|
char row_debug[128];
|
|
snprintf(row_debug, sizeof(row_debug), "Query returned %d rows", row_count);
|
|
log_info(row_debug);
|
|
|
|
sqlite3_finalize(stmt);
|
|
}
|
|
|
|
char events_debug[128];
|
|
snprintf(events_debug, sizeof(events_debug), "Total events sent: %d", events_sent);
|
|
log_info(events_debug);
|
|
|
|
return events_sent;
|
|
}
|
|
|
|
// Handle EVENT message (publish)
|
|
int handle_event_message(cJSON* event) {
|
|
log_info("Handling EVENT message");
|
|
|
|
// Validate event structure (basic check)
|
|
cJSON* id = cJSON_GetObjectItem(event, "id");
|
|
if (!id || !cJSON_IsString(id)) {
|
|
log_error("Invalid event - no ID");
|
|
return -1;
|
|
}
|
|
|
|
// Store event in database
|
|
if (store_event(event) == 0) {
|
|
log_success("Event stored successfully");
|
|
return 0;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// WEBSOCKET PROTOCOL
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// WebSocket callback function for Nostr relay protocol
|
|
static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len) {
|
|
struct per_session_data *pss = (struct per_session_data *)user;
|
|
|
|
switch (reason) {
|
|
case LWS_CALLBACK_ESTABLISHED:
|
|
log_info("WebSocket connection established");
|
|
memset(pss, 0, sizeof(*pss));
|
|
pthread_mutex_init(&pss->session_lock, NULL);
|
|
// TODO: Get real client IP address
|
|
strncpy(pss->client_ip, "127.0.0.1", CLIENT_IP_MAX_LENGTH - 1);
|
|
pss->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0';
|
|
break;
|
|
|
|
case LWS_CALLBACK_RECEIVE:
|
|
if (len > 0) {
|
|
char *message = malloc(len + 1);
|
|
if (message) {
|
|
memcpy(message, in, len);
|
|
message[len] = '\0';
|
|
|
|
log_info("Received WebSocket message");
|
|
|
|
// Parse JSON message
|
|
cJSON* json = cJSON_Parse(message);
|
|
if (json && cJSON_IsArray(json)) {
|
|
// Get message type
|
|
cJSON* type = cJSON_GetArrayItem(json, 0);
|
|
if (type && cJSON_IsString(type)) {
|
|
const char* msg_type = cJSON_GetStringValue(type);
|
|
|
|
if (strcmp(msg_type, "EVENT") == 0) {
|
|
// Handle EVENT message
|
|
cJSON* event = cJSON_GetArrayItem(json, 1);
|
|
if (event && cJSON_IsObject(event)) {
|
|
int result = handle_event_message(event);
|
|
|
|
// Broadcast event to matching persistent subscriptions
|
|
if (result == 0) {
|
|
broadcast_event_to_subscriptions(event);
|
|
}
|
|
|
|
// Send OK response
|
|
cJSON* event_id = cJSON_GetObjectItem(event, "id");
|
|
if (event_id && cJSON_IsString(event_id)) {
|
|
cJSON* response = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(response, cJSON_CreateString("OK"));
|
|
cJSON_AddItemToArray(response, cJSON_CreateString(cJSON_GetStringValue(event_id)));
|
|
cJSON_AddItemToArray(response, cJSON_CreateBool(result == 0));
|
|
cJSON_AddItemToArray(response, cJSON_CreateString(result == 0 ? "" : "error: failed to store event"));
|
|
|
|
char *response_str = cJSON_Print(response);
|
|
if (response_str) {
|
|
size_t response_len = strlen(response_str);
|
|
unsigned char *buf = malloc(LWS_PRE + response_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, response_str, response_len);
|
|
lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
|
|
free(buf);
|
|
}
|
|
free(response_str);
|
|
}
|
|
cJSON_Delete(response);
|
|
}
|
|
}
|
|
} else if (strcmp(msg_type, "REQ") == 0) {
|
|
// Handle REQ message
|
|
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
|
|
|
|
if (sub_id && cJSON_IsString(sub_id)) {
|
|
const char* subscription_id = cJSON_GetStringValue(sub_id);
|
|
|
|
// Create array of filter objects from position 2 onwards
|
|
cJSON* filters = cJSON_CreateArray();
|
|
int json_size = cJSON_GetArraySize(json);
|
|
for (int i = 2; i < json_size; i++) {
|
|
cJSON* filter = cJSON_GetArrayItem(json, i);
|
|
if (filter) {
|
|
cJSON_AddItemToArray(filters, cJSON_Duplicate(filter, 1));
|
|
}
|
|
}
|
|
|
|
handle_req_message(subscription_id, filters, wsi, pss);
|
|
|
|
// Clean up the filters array we created
|
|
cJSON_Delete(filters);
|
|
|
|
// Send EOSE (End of Stored Events)
|
|
cJSON* eose_response = cJSON_CreateArray();
|
|
cJSON_AddItemToArray(eose_response, cJSON_CreateString("EOSE"));
|
|
cJSON_AddItemToArray(eose_response, cJSON_CreateString(subscription_id));
|
|
|
|
char *eose_str = cJSON_Print(eose_response);
|
|
if (eose_str) {
|
|
size_t eose_len = strlen(eose_str);
|
|
unsigned char *buf = malloc(LWS_PRE + eose_len);
|
|
if (buf) {
|
|
memcpy(buf + LWS_PRE, eose_str, eose_len);
|
|
lws_write(wsi, buf + LWS_PRE, eose_len, LWS_WRITE_TEXT);
|
|
free(buf);
|
|
}
|
|
free(eose_str);
|
|
}
|
|
cJSON_Delete(eose_response);
|
|
}
|
|
} else if (strcmp(msg_type, "CLOSE") == 0) {
|
|
// Handle CLOSE message
|
|
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
|
|
if (sub_id && cJSON_IsString(sub_id)) {
|
|
const char* subscription_id = cJSON_GetStringValue(sub_id);
|
|
|
|
// Remove from global manager
|
|
remove_subscription_from_manager(subscription_id, wsi);
|
|
|
|
// Remove from session list if present
|
|
if (pss) {
|
|
pthread_mutex_lock(&pss->session_lock);
|
|
|
|
subscription_t** current = &pss->subscriptions;
|
|
while (*current) {
|
|
if (strcmp((*current)->id, subscription_id) == 0) {
|
|
subscription_t* to_remove = *current;
|
|
*current = to_remove->session_next;
|
|
pss->subscription_count--;
|
|
break;
|
|
}
|
|
current = &((*current)->session_next);
|
|
}
|
|
|
|
pthread_mutex_unlock(&pss->session_lock);
|
|
}
|
|
|
|
char debug_msg[256];
|
|
snprintf(debug_msg, sizeof(debug_msg), "Closed subscription: %s", subscription_id);
|
|
log_info(debug_msg);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (json) cJSON_Delete(json);
|
|
free(message);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case LWS_CALLBACK_CLOSED:
|
|
log_info("WebSocket connection closed");
|
|
|
|
// Clean up session subscriptions
|
|
if (pss) {
|
|
pthread_mutex_lock(&pss->session_lock);
|
|
|
|
subscription_t* sub = pss->subscriptions;
|
|
while (sub) {
|
|
subscription_t* next = sub->session_next;
|
|
remove_subscription_from_manager(sub->id, wsi);
|
|
sub = next;
|
|
}
|
|
|
|
pss->subscriptions = NULL;
|
|
pss->subscription_count = 0;
|
|
|
|
pthread_mutex_unlock(&pss->session_lock);
|
|
pthread_mutex_destroy(&pss->session_lock);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// WebSocket protocol definition
|
|
static struct lws_protocols protocols[] = {
|
|
{
|
|
"nostr-relay-protocol",
|
|
nostr_relay_callback,
|
|
sizeof(struct per_session_data),
|
|
4096, // rx buffer size
|
|
0, NULL, 0
|
|
},
|
|
{ NULL, NULL, 0, 0, 0, NULL, 0 } // terminator
|
|
};
|
|
|
|
// Start libwebsockets-based WebSocket Nostr relay server
|
|
int start_websocket_relay() {
|
|
struct lws_context_creation_info info;
|
|
|
|
log_info("Starting libwebsockets-based Nostr relay server...");
|
|
|
|
memset(&info, 0, sizeof(info));
|
|
info.port = DEFAULT_PORT;
|
|
info.protocols = protocols;
|
|
info.gid = -1;
|
|
info.uid = -1;
|
|
|
|
// Minimal libwebsockets configuration
|
|
info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
|
|
|
|
// Remove interface restrictions - let system choose
|
|
// info.vhost_name = NULL;
|
|
// info.iface = NULL;
|
|
|
|
// Increase max connections for relay usage
|
|
info.max_http_header_pool = 16;
|
|
info.timeout_secs = 10;
|
|
|
|
// Max payload size for Nostr events
|
|
info.max_http_header_data = 4096;
|
|
|
|
ws_context = lws_create_context(&info);
|
|
if (!ws_context) {
|
|
log_error("Failed to create libwebsockets context");
|
|
perror("libwebsockets creation error");
|
|
return -1;
|
|
}
|
|
|
|
log_success("WebSocket relay started on ws://127.0.0.1:8888");
|
|
|
|
// Main event loop with proper signal handling
|
|
while (g_server_running) {
|
|
int result = lws_service(ws_context, 1000);
|
|
|
|
if (result < 0) {
|
|
log_error("libwebsockets service error");
|
|
break;
|
|
}
|
|
}
|
|
|
|
log_info("Shutting down WebSocket server...");
|
|
lws_context_destroy(ws_context);
|
|
ws_context = NULL;
|
|
|
|
log_success("WebSocket relay shut down cleanly");
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// MAIN PROGRAM
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Print usage information
|
|
void print_usage(const char* program_name) {
|
|
printf("Usage: %s [OPTIONS]\n", program_name);
|
|
printf("\n");
|
|
printf("C Nostr Relay Server\n");
|
|
printf("\n");
|
|
printf("Options:\n");
|
|
printf(" -p, --port PORT Listen port (default: %d)\n", DEFAULT_PORT);
|
|
printf(" -h, --help Show this help message\n");
|
|
printf("\n");
|
|
}
|
|
|
|
int main(int argc, char* argv[]) {
|
|
int port = DEFAULT_PORT;
|
|
|
|
// Parse command line arguments
|
|
for (int i = 1; i < argc; i++) {
|
|
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
|
print_usage(argv[0]);
|
|
return 0;
|
|
} else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) {
|
|
if (i + 1 < argc) {
|
|
port = atoi(argv[++i]);
|
|
if (port <= 0 || port > 65535) {
|
|
log_error("Invalid port number");
|
|
return 1;
|
|
}
|
|
} else {
|
|
log_error("Port argument requires a value");
|
|
return 1;
|
|
}
|
|
} else {
|
|
log_error("Unknown argument");
|
|
print_usage(argv[0]);
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
// Set up signal handlers
|
|
signal(SIGINT, signal_handler);
|
|
signal(SIGTERM, signal_handler);
|
|
|
|
printf(BLUE BOLD "=== C Nostr Relay Server ===" RESET "\n");
|
|
|
|
// Initialize database
|
|
if (init_database() != 0) {
|
|
log_error("Failed to initialize database");
|
|
return 1;
|
|
}
|
|
|
|
// Initialize nostr library
|
|
if (nostr_init() != 0) {
|
|
log_error("Failed to initialize nostr library");
|
|
close_database();
|
|
return 1;
|
|
}
|
|
|
|
log_info("Starting relay server...");
|
|
|
|
// Start WebSocket Nostr relay server
|
|
int result = start_websocket_relay();
|
|
|
|
// Cleanup
|
|
nostr_cleanup();
|
|
close_database();
|
|
|
|
if (result == 0) {
|
|
log_success("Server shutdown complete");
|
|
} else {
|
|
log_error("Server shutdown with errors");
|
|
}
|
|
|
|
return result;
|
|
} |