v0.7.34 - We seemed to maybe finally fixed the monitoring error?

This commit is contained in:
Your Name
2025-10-22 10:19:43 -04:00
parent 9cb9b746d8
commit 9179d57cc9
21 changed files with 2877 additions and 503 deletions

View File

@@ -45,6 +45,9 @@ int update_config_in_table(const char* key, const char* value);
// Forward declaration for monitoring helper function
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void));
// Forward declaration for CPU metrics query function
cJSON* query_cpu_metrics(void);
// Monitoring system helper functions
int get_monitoring_throttle_seconds(void) {
return get_config_int("kind_24567_reporting_throttle_sec", 5);
@@ -236,7 +239,7 @@ cJSON* query_active_subscriptions(void) {
const char* sql =
"SELECT COUNT(*) as total_subs, "
"COUNT(DISTINCT client_ip) as client_count "
"FROM subscription_events "
"FROM subscriptions "
"WHERE event_type = 'created' AND ended_at IS NULL";
if (sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL) != SQLITE_OK) {
@@ -258,7 +261,7 @@ cJSON* query_active_subscriptions(void) {
const char* max_sql =
"SELECT MAX(sub_count) FROM ("
" SELECT COUNT(*) as sub_count "
" FROM subscription_events "
" FROM subscriptions "
" WHERE event_type = 'created' AND ended_at IS NULL "
" GROUP BY client_ip"
")";
@@ -294,7 +297,7 @@ cJSON* query_active_subscriptions(void) {
}
// Query detailed subscription information from database log (ADMIN ONLY)
// Uses subscription_events table instead of in-memory iteration to avoid mutex contention
// Uses subscriptions table instead of in-memory iteration to avoid mutex contention
cJSON* query_subscription_details(void) {
extern sqlite3* g_db;
if (!g_db) {
@@ -302,13 +305,13 @@ cJSON* query_subscription_details(void) {
return NULL;
}
// Query active subscriptions directly from subscription_events table
// Query active subscriptions directly from subscriptions table
// Get subscriptions that were created but not yet closed/expired/disconnected
sqlite3_stmt* stmt;
const char* sql =
"SELECT subscription_id, client_ip, filter_json, events_sent, "
"SELECT subscription_id, client_ip, wsi_pointer, filter_json, events_sent, "
"created_at, (strftime('%s', 'now') - created_at) as duration_seconds "
"FROM subscription_events "
"FROM subscriptions "
"WHERE event_type = 'created' AND ended_at IS NULL "
"ORDER BY created_at DESC LIMIT 100";
@@ -332,14 +335,16 @@ cJSON* query_subscription_details(void) {
// Extract subscription data from database
const char* sub_id = (const char*)sqlite3_column_text(stmt, 0);
const char* client_ip = (const char*)sqlite3_column_text(stmt, 1);
const char* filter_json = (const char*)sqlite3_column_text(stmt, 2);
long long events_sent = sqlite3_column_int64(stmt, 3);
long long created_at = sqlite3_column_int64(stmt, 4);
long long duration_seconds = sqlite3_column_int64(stmt, 5);
const char* wsi_pointer = (const char*)sqlite3_column_text(stmt, 2);
const char* filter_json = (const char*)sqlite3_column_text(stmt, 3);
long long events_sent = sqlite3_column_int64(stmt, 4);
long long created_at = sqlite3_column_int64(stmt, 5);
long long duration_seconds = sqlite3_column_int64(stmt, 6);
// Add basic subscription info
cJSON_AddStringToObject(sub_obj, "id", sub_id ? sub_id : "");
cJSON_AddStringToObject(sub_obj, "client_ip", client_ip ? client_ip : "");
cJSON_AddStringToObject(sub_obj, "wsi_pointer", wsi_pointer ? wsi_pointer : "");
cJSON_AddNumberToObject(sub_obj, "created_at", (double)created_at);
cJSON_AddNumberToObject(sub_obj, "duration_seconds", (double)duration_seconds);
cJSON_AddNumberToObject(sub_obj, "events_sent", events_sent);
@@ -404,6 +409,12 @@ int generate_monitoring_event(void) {
return -1;
}
// Generate CPU metrics monitoring event
if (generate_monitoring_event_for_type("cpu_metrics", query_cpu_metrics) != 0) {
DEBUG_ERROR("Failed to generate cpu_metrics monitoring event");
return -1;
}
DEBUG_INFO("Generated and broadcast all monitoring events");
return 0;
}
@@ -1105,6 +1116,68 @@ int handle_embedded_file_writeable(struct lws* wsi) {
return 0;
}
// Query CPU usage metrics
cJSON* query_cpu_metrics(void) {
cJSON* cpu_stats = cJSON_CreateObject();
cJSON_AddStringToObject(cpu_stats, "data_type", "cpu_metrics");
cJSON_AddNumberToObject(cpu_stats, "timestamp", (double)time(NULL));
// Read process CPU times from /proc/self/stat
FILE* proc_stat = fopen("/proc/self/stat", "r");
if (proc_stat) {
unsigned long utime, stime; // user and system CPU time in clock ticks
if (fscanf(proc_stat, "%*d %*s %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %lu %lu", &utime, &stime) == 2) {
unsigned long total_proc_time = utime + stime;
// Get system CPU times from /proc/stat
FILE* sys_stat = fopen("/proc/stat", "r");
if (sys_stat) {
unsigned long user, nice, system, idle, iowait, irq, softirq;
if (fscanf(sys_stat, "cpu %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &system, &idle, &iowait, &irq, &softirq) == 7) {
unsigned long total_sys_time = user + nice + system + idle + iowait + irq + softirq;
// Calculate CPU percentages (simplified - would need deltas for accuracy)
// For now, just store the raw values - frontend can calculate deltas
cJSON_AddNumberToObject(cpu_stats, "process_cpu_time", (double)total_proc_time);
cJSON_AddNumberToObject(cpu_stats, "system_cpu_time", (double)total_sys_time);
cJSON_AddNumberToObject(cpu_stats, "system_idle_time", (double)idle);
}
fclose(sys_stat);
}
// Get current CPU core the process is running on
int current_core = sched_getcpu();
if (current_core >= 0) {
cJSON_AddNumberToObject(cpu_stats, "current_cpu_core", current_core);
}
}
fclose(proc_stat);
}
// Get process ID
pid_t pid = getpid();
cJSON_AddNumberToObject(cpu_stats, "process_id", (double)pid);
// Get memory usage from /proc/self/status
FILE* mem_stat = fopen("/proc/self/status", "r");
if (mem_stat) {
char line[256];
while (fgets(line, sizeof(line), mem_stat)) {
if (strncmp(line, "VmRSS:", 6) == 0) {
unsigned long rss_kb;
if (sscanf(line, "VmRSS: %lu kB", &rss_kb) == 1) {
double rss_mb = rss_kb / 1024.0;
cJSON_AddNumberToObject(cpu_stats, "memory_usage_mb", rss_mb);
}
break;
}
}
fclose(mem_stat);
}
return cpu_stats;
}
// Generate stats JSON from database queries
char* generate_stats_json(void) {
extern sqlite3* g_db;
@@ -2262,7 +2335,7 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else {
char response_content[512];
char response_content[1024];
snprintf(response_content, sizeof(response_content),
"❌ Unknown monitoring command: %s\n\n"
"Available command:\n"

File diff suppressed because one or more lines are too long

View File

@@ -10,10 +10,10 @@
#define MAIN_H
// Version information (auto-updated by build system)
#define VERSION "v0.7.33"
#define VERSION "v0.7.34"
#define VERSION_MAJOR 0
#define VERSION_MINOR 7
#define VERSION_PATCH 33
#define VERSION_PATCH 34
// Relay metadata (authoritative source for NIP-11 information)
#define RELAY_NAME "C-Relay"

View File

@@ -1,12 +1,12 @@
/* Embedded SQL Schema for C Nostr Relay
* Generated from db/schema.sql - Do not edit manually
* Schema Version: 7
* Schema Version: 8
*/
#ifndef SQL_SCHEMA_H
#define SQL_SCHEMA_H
/* Schema version constant */
#define EMBEDDED_SCHEMA_VERSION "7"
#define EMBEDDED_SCHEMA_VERSION "8"
/* Embedded SQL schema as C string literal */
static const char* const EMBEDDED_SCHEMA_SQL =
@@ -15,7 +15,7 @@ static const char* const EMBEDDED_SCHEMA_SQL =
-- Configuration system using config table\n\
\n\
-- Schema version tracking\n\
PRAGMA user_version = 7;\n\
PRAGMA user_version = 8;\n\
\n\
-- Enable foreign key support\n\
PRAGMA foreign_keys = ON;\n\
@@ -58,8 +58,8 @@ CREATE TABLE schema_info (\n\
\n\
-- Insert schema metadata\n\
INSERT INTO schema_info (key, value) VALUES\n\
('version', '7'),\n\
('description', 'Hybrid Nostr relay schema with event-based and table-based configuration'),\n\
('version', '8'),\n\
('description', 'Hybrid Nostr relay schema with subscription deduplication support'),\n\
('created_at', strftime('%s', 'now'));\n\
\n\
-- Helper views for common queries\n\
@@ -181,17 +181,19 @@ END;\n\
-- Persistent Subscriptions Logging Tables (Phase 2)\n\
-- Optional database logging for subscription analytics and debugging\n\
\n\
-- Subscription events log\n\
CREATE TABLE subscription_events (\n\
-- Subscriptions log (renamed from subscription_events for clarity)\n\
CREATE TABLE subscriptions (\n\
id INTEGER PRIMARY KEY AUTOINCREMENT,\n\
subscription_id TEXT NOT NULL, -- Subscription ID from client\n\
wsi_pointer TEXT NOT NULL, -- WebSocket pointer address (hex string)\n\
client_ip TEXT NOT NULL, -- Client IP address\n\
event_type TEXT NOT NULL CHECK (event_type IN ('created', 'closed', 'expired', 'disconnected')),\n\
filter_json TEXT, -- JSON representation of filters (for created events)\n\
events_sent INTEGER DEFAULT 0, -- Number of events sent to this subscription\n\
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),\n\
ended_at INTEGER, -- When subscription ended (for closed/expired/disconnected)\n\
duration INTEGER -- Computed: ended_at - created_at\n\
duration INTEGER, -- Computed: ended_at - created_at\n\
UNIQUE(subscription_id, wsi_pointer) -- Prevent duplicate subscriptions per connection\n\
);\n\
\n\
-- Subscription metrics summary\n\
@@ -218,10 +220,11 @@ CREATE TABLE event_broadcasts (\n\
);\n\
\n\
-- Indexes for subscription logging performance\n\
CREATE INDEX idx_subscription_events_id ON subscription_events(subscription_id);\n\
CREATE INDEX idx_subscription_events_type ON subscription_events(event_type);\n\
CREATE INDEX idx_subscription_events_created ON subscription_events(created_at DESC);\n\
CREATE INDEX idx_subscription_events_client ON subscription_events(client_ip);\n\
CREATE INDEX idx_subscriptions_id ON subscriptions(subscription_id);\n\
CREATE INDEX idx_subscriptions_type ON subscriptions(event_type);\n\
CREATE INDEX idx_subscriptions_created ON subscriptions(created_at DESC);\n\
CREATE INDEX idx_subscriptions_client ON subscriptions(client_ip);\n\
CREATE INDEX idx_subscriptions_wsi ON subscriptions(wsi_pointer);\n\
\n\
CREATE INDEX idx_subscription_metrics_date ON subscription_metrics(date DESC);\n\
\n\
@@ -231,10 +234,10 @@ CREATE INDEX idx_event_broadcasts_time ON event_broadcasts(broadcast_at DESC);\n
\n\
-- Trigger to update subscription duration when ended\n\
CREATE TRIGGER update_subscription_duration\n\
AFTER UPDATE OF ended_at ON subscription_events\n\
AFTER UPDATE OF ended_at ON subscriptions\n\
WHEN NEW.ended_at IS NOT NULL AND OLD.ended_at IS NULL\n\
BEGIN\n\
UPDATE subscription_events\n\
UPDATE subscriptions\n\
SET duration = NEW.ended_at - NEW.created_at\n\
WHERE id = NEW.id;\n\
END;\n\
@@ -249,7 +252,7 @@ SELECT\n\
MAX(events_sent) as max_events_sent,\n\
AVG(events_sent) as avg_events_sent,\n\
COUNT(DISTINCT client_ip) as unique_clients\n\
FROM subscription_events\n\
FROM subscriptions\n\
GROUP BY date(created_at, 'unixepoch')\n\
ORDER BY date DESC;\n\
\n\
@@ -262,10 +265,10 @@ SELECT\n\
events_sent,\n\
created_at,\n\
(strftime('%s', 'now') - created_at) as duration_seconds\n\
FROM subscription_events\n\
FROM subscriptions\n\
WHERE event_type = 'created'\n\
AND subscription_id NOT IN (\n\
SELECT subscription_id FROM subscription_events\n\
SELECT subscription_id FROM subscriptions\n\
WHERE event_type IN ('closed', 'expired', 'disconnected')\n\
);\n\
\n\

View File

@@ -238,25 +238,76 @@ void free_subscription(subscription_t* sub) {
// Add subscription to global manager (thread-safe)
int add_subscription_to_manager(subscription_t* sub) {
if (!sub) return -1;
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
// Check global limits
if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
// Check for existing subscription with same ID and WebSocket connection
// Remove it first to prevent duplicates (implements subscription replacement per NIP-01)
subscription_t** current = &g_subscription_manager.active_subscriptions;
int found_duplicate = 0;
subscription_t* duplicate_old = NULL;
while (*current) {
subscription_t* existing = *current;
// Match by subscription ID and WebSocket pointer
if (strcmp(existing->id, sub->id) == 0 && existing->wsi == sub->wsi) {
// Found duplicate: mark inactive and unlink from global list under lock
existing->active = 0;
*current = existing->next;
g_subscription_manager.total_subscriptions--;
found_duplicate = 1;
duplicate_old = existing; // defer free until after per-session unlink
break;
}
current = &(existing->next);
}
// Check global limits (only if not replacing an existing subscription)
if (!found_duplicate && g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
DEBUG_ERROR("Maximum total subscriptions reached");
return -1;
}
// Add to global list
sub->next = g_subscription_manager.active_subscriptions;
g_subscription_manager.active_subscriptions = sub;
g_subscription_manager.total_subscriptions++;
g_subscription_manager.total_created++;
// Only increment total_created if this is a new subscription (not a replacement)
if (!found_duplicate) {
g_subscription_manager.total_created++;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Log subscription creation to database
// If we replaced an existing subscription, unlink it from the per-session list before freeing
if (duplicate_old) {
// Obtain per-session data for this wsi
struct per_session_data* pss = (struct per_session_data*) lws_wsi_user(duplicate_old->wsi);
if (pss) {
pthread_mutex_lock(&pss->session_lock);
struct subscription** scur = &pss->subscriptions;
while (*scur) {
if (*scur == duplicate_old) {
// Unlink by pointer identity to avoid removing the newly-added one
*scur = duplicate_old->session_next;
if (pss->subscription_count > 0) {
pss->subscription_count--;
}
break;
}
scur = &((*scur)->session_next);
}
pthread_mutex_unlock(&pss->session_lock);
}
// Now safe to free the old subscription
free_subscription(duplicate_old);
}
// Log subscription creation to database (INSERT OR REPLACE handles duplicates)
log_subscription_created(sub);
return 0;
@@ -324,10 +375,7 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
// Check if an event matches a subscription filter
int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
DEBUG_TRACE("Checking event against subscription filter");
if (!event || !filter) {
DEBUG_TRACE("Exiting event_matches_filter - null parameters");
return 0;
}
@@ -503,7 +551,6 @@ int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
}
}
DEBUG_TRACE("Exiting event_matches_filter - match found");
return 1; // All filters passed
}
@@ -526,10 +573,7 @@ int event_matches_subscription(cJSON* event, subscription_t* subscription) {
// Broadcast event to all matching subscriptions (thread-safe)
int broadcast_event_to_subscriptions(cJSON* event) {
DEBUG_TRACE("Broadcasting event to subscriptions");
if (!event) {
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions - null event");
return 0;
}
@@ -611,12 +655,19 @@ int broadcast_event_to_subscriptions(cJSON* event) {
if (buf) {
memcpy(buf + LWS_PRE, msg_str, msg_len);
// Send to WebSocket connection with error checking
// Note: lws_write can fail if connection is closed, but won't crash
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result >= 0) {
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=EVENT sub=%s len=%zu data=%.100s%s",
current_temp->id,
msg_len,
msg_str,
msg_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
struct per_session_data* pss = (struct per_session_data*)lws_wsi_user(current_temp->wsi);
if (queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT) == 0) {
// Message queued successfully
broadcasts++;
// Update events sent counter for this subscription
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* update_sub = g_subscription_manager.active_subscriptions;
@@ -630,12 +681,14 @@ int broadcast_event_to_subscriptions(cJSON* event) {
update_sub = update_sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Log event broadcast to database (optional - can be disabled for performance)
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
if (event_id_obj && cJSON_IsString(event_id_obj)) {
log_event_broadcast(cJSON_GetStringValue(event_id_obj), current_temp->id, current_temp->client_ip);
}
} else {
DEBUG_ERROR("Failed to queue EVENT message for sub=%s", current_temp->id);
}
free(buf);
@@ -660,7 +713,6 @@ int broadcast_event_to_subscriptions(cJSON* event) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
DEBUG_LOG("Event broadcast complete: %d subscriptions matched", broadcasts);
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions");
return broadcasts;
}
@@ -707,6 +759,10 @@ int has_subscriptions_for_kind(int event_kind) {
void log_subscription_created(const subscription_t* sub) {
if (!g_db || !sub) return;
// Convert wsi pointer to string
char wsi_str[32];
snprintf(wsi_str, sizeof(wsi_str), "%p", (void*)sub->wsi);
// Create filter JSON for logging
char* filter_json = NULL;
if (sub->filters) {
@@ -753,16 +809,18 @@ void log_subscription_created(const subscription_t* sub) {
cJSON_Delete(filters_array);
}
// Use INSERT OR REPLACE to handle duplicates automatically
const char* sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) "
"VALUES (?, ?, 'created', ?)";
"INSERT OR REPLACE INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type, filter_json) "
"VALUES (?, ?, ?, 'created', ?)";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc == SQLITE_OK) {
sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 2, wsi_str, -1, SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 3, sub->client_ip, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
@@ -777,8 +835,8 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
if (!g_db || !sub_id) return;
const char* sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
"VALUES (?, ?, 'closed')";
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
"VALUES (?, '', ?, 'closed')";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
@@ -792,7 +850,7 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
// Update the corresponding 'created' entry with end time and events sent
const char* update_sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET ended_at = strftime('%s', 'now') "
"WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL";
@@ -810,7 +868,7 @@ void log_subscription_disconnected(const char* client_ip) {
// Mark all active subscriptions for this client as disconnected
const char* sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET ended_at = strftime('%s', 'now') "
"WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL";
@@ -825,8 +883,8 @@ void log_subscription_disconnected(const char* client_ip) {
if (changes > 0) {
// Log a disconnection event
const char* insert_sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
"VALUES ('disconnect', ?, 'disconnected')";
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
"VALUES ('disconnect', '', ?, 'disconnected')";
rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL);
if (rc == SQLITE_OK) {
@@ -863,7 +921,7 @@ void update_subscription_events_sent(const char* sub_id, int events_sent) {
if (!g_db || !sub_id) return;
const char* sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET events_sent = ? "
"WHERE subscription_id = ? AND event_type = 'created'";

View File

@@ -108,6 +108,136 @@ struct subscription_manager g_subscription_manager;
// Message queue functions for proper libwebsockets pattern
/**
* Queue a message for WebSocket writing following libwebsockets' proper pattern.
* This function adds messages to a per-session queue and requests writeable callback.
*
* @param wsi WebSocket instance
* @param pss Per-session data containing message queue
* @param message Message string to write
* @param length Length of message string
* @param type LWS_WRITE_* type (LWS_WRITE_TEXT, etc.)
* @return 0 on success, -1 on error
*/
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type) {
if (!wsi || !pss || !message || length == 0) {
DEBUG_ERROR("queue_message: invalid parameters");
return -1;
}
// Allocate message queue node
struct message_queue_node* node = malloc(sizeof(struct message_queue_node));
if (!node) {
DEBUG_ERROR("queue_message: failed to allocate queue node");
return -1;
}
// Allocate buffer with LWS_PRE space
size_t buffer_size = LWS_PRE + length;
unsigned char* buffer = malloc(buffer_size);
if (!buffer) {
DEBUG_ERROR("queue_message: failed to allocate message buffer");
free(node);
return -1;
}
// Copy message to buffer with LWS_PRE offset
memcpy(buffer + LWS_PRE, message, length);
// Initialize node
node->data = buffer;
node->length = length;
node->type = type;
node->next = NULL;
// Add to queue (thread-safe)
pthread_mutex_lock(&pss->session_lock);
if (!pss->message_queue_head) {
// Queue was empty
pss->message_queue_head = node;
pss->message_queue_tail = node;
} else {
// Add to end of queue
pss->message_queue_tail->next = node;
pss->message_queue_tail = node;
}
pss->message_queue_count++;
pthread_mutex_unlock(&pss->session_lock);
// Request writeable callback (only if not already requested)
if (!pss->writeable_requested) {
pss->writeable_requested = 1;
lws_callback_on_writable(wsi);
}
DEBUG_TRACE("Queued message: len=%zu, queue_count=%d", length, pss->message_queue_count);
return 0;
}
/**
* Process message queue when the socket becomes writeable.
* This function is called from LWS_CALLBACK_SERVER_WRITEABLE.
*
* @param wsi WebSocket instance
* @param pss Per-session data containing message queue
* @return 0 on success, -1 on error
*/
int process_message_queue(struct lws* wsi, struct per_session_data* pss) {
if (!wsi || !pss) {
DEBUG_ERROR("process_message_queue: invalid parameters");
return -1;
}
// Get next message from queue (thread-safe)
pthread_mutex_lock(&pss->session_lock);
struct message_queue_node* node = pss->message_queue_head;
if (!node) {
// Queue is empty
pss->writeable_requested = 0;
pthread_mutex_unlock(&pss->session_lock);
return 0;
}
// Remove from queue
pss->message_queue_head = node->next;
if (!pss->message_queue_head) {
pss->message_queue_tail = NULL;
}
pss->message_queue_count--;
pthread_mutex_unlock(&pss->session_lock);
// Write message (libwebsockets handles partial writes internally)
int write_result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
// Free node resources
free(node->data);
free(node);
if (write_result < 0) {
DEBUG_ERROR("process_message_queue: write failed, result=%d", write_result);
return -1;
}
DEBUG_TRACE("Processed message: wrote %d bytes, remaining in queue: %d", write_result, pss->message_queue_count);
// If queue not empty, request another callback
pthread_mutex_lock(&pss->session_lock);
if (pss->message_queue_head) {
lws_callback_on_writable(wsi);
} else {
pss->writeable_requested = 0;
}
pthread_mutex_unlock(&pss->session_lock);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////
// WEBSOCKET PROTOCOL
@@ -719,16 +849,22 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
cJSON_AddItemToArray(response, cJSON_CreateString(cJSON_GetStringValue(event_id)));
cJSON_AddItemToArray(response, cJSON_CreateBool(result == 0));
cJSON_AddItemToArray(response, cJSON_CreateString(strlen(error_message) > 0 ? error_message : ""));
char *response_str = cJSON_Print(response);
if (response_str) {
size_t response_len = strlen(response_str);
unsigned char *buf = malloc(LWS_PRE + response_len);
if (buf) {
memcpy(buf + LWS_PRE, response_str, response_len);
lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=OK len=%zu data=%.100s%s",
response_len,
response_str,
response_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue OK response message");
}
free(response_str);
}
cJSON_Delete(response);
@@ -823,12 +959,18 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
char *eose_str = cJSON_Print(eose_response);
if (eose_str) {
size_t eose_len = strlen(eose_str);
unsigned char *buf = malloc(LWS_PRE + eose_len);
if (buf) {
memcpy(buf + LWS_PRE, eose_str, eose_len);
lws_write(wsi, buf + LWS_PRE, eose_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=EOSE len=%zu data=%.100s%s",
eose_len,
eose_str,
eose_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, eose_str, eose_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue EOSE message");
}
free(eose_str);
}
cJSON_Delete(eose_response);
@@ -908,9 +1050,22 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
// CRITICAL FIX: Remove from session list FIRST (while holding lock)
// to prevent race condition where global manager frees the subscription
// while we're still iterating through the session list
// CRITICAL FIX: Mark subscription as inactive in global manager FIRST
// This prevents other threads from accessing it during removal
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* target_sub = g_subscription_manager.active_subscriptions;
while (target_sub) {
if (strcmp(target_sub->id, subscription_id) == 0 && target_sub->wsi == wsi) {
target_sub->active = 0; // Mark as inactive immediately
break;
}
target_sub = target_sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Now safe to remove from session list
if (pss) {
pthread_mutex_lock(&pss->session_lock);
@@ -928,8 +1083,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
pthread_mutex_unlock(&pss->session_lock);
}
// Remove from global manager AFTER removing from session list
// This prevents use-after-free when iterating session subscriptions
// Finally remove from global manager (which will free it)
remove_subscription_from_manager(subscription_id, wsi);
// Subscription closed
@@ -972,6 +1126,13 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
// Handle message queue when socket becomes writeable
if (pss) {
process_message_queue(wsi, pss);
}
break;
case LWS_CALLBACK_CLOSED:
DEBUG_TRACE("WebSocket connection closed");
@@ -1005,20 +1166,66 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
auth_status,
reason);
// Clean up session subscriptions
// Clean up message queue to prevent memory leaks
while (pss->message_queue_head) {
struct message_queue_node* node = pss->message_queue_head;
pss->message_queue_head = node->next;
free(node->data);
free(node);
}
pss->message_queue_tail = NULL;
pss->message_queue_count = 0;
pss->writeable_requested = 0;
// Clean up session subscriptions - copy IDs first to avoid use-after-free
pthread_mutex_lock(&pss->session_lock);
// First pass: collect subscription IDs safely
typedef struct temp_sub_id {
char id[SUBSCRIPTION_ID_MAX_LENGTH];
struct temp_sub_id* next;
} temp_sub_id_t;
temp_sub_id_t* temp_ids = NULL;
temp_sub_id_t* temp_tail = NULL;
int temp_count = 0;
struct subscription* sub = pss->subscriptions;
while (sub) {
struct subscription* next = sub->session_next;
remove_subscription_from_manager(sub->id, wsi);
sub = next;
if (sub->active) { // Only process active subscriptions
temp_sub_id_t* temp = malloc(sizeof(temp_sub_id_t));
if (temp) {
memcpy(temp->id, sub->id, SUBSCRIPTION_ID_MAX_LENGTH);
temp->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
temp->next = NULL;
if (!temp_ids) {
temp_ids = temp;
temp_tail = temp;
} else {
temp_tail->next = temp;
temp_tail = temp;
}
temp_count++;
}
}
sub = sub->session_next;
}
// Clear session list immediately
pss->subscriptions = NULL;
pss->subscription_count = 0;
pthread_mutex_unlock(&pss->session_lock);
// Second pass: remove from global manager using copied IDs
temp_sub_id_t* current_temp = temp_ids;
while (current_temp) {
temp_sub_id_t* next_temp = current_temp->next;
remove_subscription_from_manager(current_temp->id, wsi);
free(current_temp);
current_temp = next_temp;
}
pthread_mutex_destroy(&pss->session_lock);
} else {
DEBUG_LOG("WebSocket CLOSED: ip=unknown duration=0s subscriptions=0 authenticated=no reason=unknown");
@@ -1685,12 +1892,18 @@ int handle_count_message(const char* sub_id, cJSON* filters, struct lws *wsi, st
char *count_str = cJSON_Print(count_response);
if (count_str) {
size_t count_len = strlen(count_str);
unsigned char *buf = malloc(LWS_PRE + count_len);
if (buf) {
memcpy(buf + LWS_PRE, count_str, count_len);
lws_write(wsi, buf + LWS_PRE, count_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=COUNT len=%zu data=%.100s%s",
count_len,
count_str,
count_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, count_str, count_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue COUNT message");
}
free(count_str);
}
cJSON_Delete(count_response);

View File

@@ -31,6 +31,14 @@
#define MAX_SEARCH_LENGTH 256
#define MAX_TAG_VALUE_LENGTH 1024
// Message queue node for proper libwebsockets pattern
struct message_queue_node {
unsigned char* data; // Message data (with LWS_PRE space)
size_t length; // Message length (without LWS_PRE)
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
struct message_queue_node* next; // Next node in queue
};
// Enhanced per-session data with subscription management, NIP-42 authentication, and rate limiting
struct per_session_data {
int authenticated;
@@ -59,6 +67,12 @@ struct per_session_data {
int malformed_request_count; // Count of malformed requests in current hour
time_t malformed_request_window_start; // Start of current hour window
time_t malformed_request_blocked_until; // Time until blocked for malformed requests
// Message queue for proper libwebsockets pattern (replaces single buffer)
struct message_queue_node* message_queue_head; // Head of message queue
struct message_queue_node* message_queue_tail; // Tail of message queue
int message_queue_count; // Number of messages in queue
int writeable_requested; // Flag: 1 if writeable callback requested
};
// NIP-11 HTTP session data structure for managing buffer lifetime
@@ -73,6 +87,10 @@ struct nip11_session_data {
// Function declarations
int start_websocket_relay(int port_override, int strict_port);
// Message queue functions for proper libwebsockets pattern
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type);
int process_message_queue(struct lws* wsi, struct per_session_data* pss);
// Auth rules checking function from request_validator.c
int check_database_auth_rules(const char *pubkey, const char *operation, const char *resource_hash);