v0.7.31 - Fixed production crash by replacing in-memory subscription iteration with database queries in monitoring system
This commit is contained in:
180
src/api.c
180
src/api.c
@@ -233,51 +233,57 @@ cJSON* query_top_pubkeys(void) {
|
||||
return top_pubkeys;
|
||||
}
|
||||
|
||||
// Query active subscriptions from in-memory manager (NO DATABASE QUERY)
|
||||
// Query active subscriptions summary from database
|
||||
cJSON* query_active_subscriptions(void) {
|
||||
// Access the global subscription manager
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
extern sqlite3* g_db;
|
||||
if (!g_db) {
|
||||
DEBUG_ERROR("Database not available for active subscriptions query");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int total_subs = g_subscription_manager.total_subscriptions;
|
||||
// Get configuration limits
|
||||
int max_subs = g_subscription_manager.max_total_subscriptions;
|
||||
int max_per_client = g_subscription_manager.max_subscriptions_per_client;
|
||||
|
||||
// Calculate per-client statistics by iterating through active subscriptions
|
||||
// Query total active subscriptions from database
|
||||
sqlite3_stmt* stmt;
|
||||
const char* sql =
|
||||
"SELECT COUNT(*) as total_subs, "
|
||||
"COUNT(DISTINCT client_ip) as client_count "
|
||||
"FROM subscription_events "
|
||||
"WHERE event_type = 'created' AND ended_at IS NULL";
|
||||
|
||||
if (sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL) != SQLITE_OK) {
|
||||
DEBUG_ERROR("Failed to prepare active subscriptions query");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int total_subs = 0;
|
||||
int client_count = 0;
|
||||
|
||||
if (sqlite3_step(stmt) == SQLITE_ROW) {
|
||||
total_subs = sqlite3_column_int(stmt, 0);
|
||||
client_count = sqlite3_column_int(stmt, 1);
|
||||
}
|
||||
sqlite3_finalize(stmt);
|
||||
|
||||
// Query max subscriptions per client
|
||||
int most_subs_per_client = 0;
|
||||
const char* max_sql =
|
||||
"SELECT MAX(sub_count) FROM ("
|
||||
" SELECT COUNT(*) as sub_count "
|
||||
" FROM subscription_events "
|
||||
" WHERE event_type = 'created' AND ended_at IS NULL "
|
||||
" GROUP BY client_ip"
|
||||
")";
|
||||
|
||||
// Count subscriptions per WebSocket connection
|
||||
subscription_t* current = g_subscription_manager.active_subscriptions;
|
||||
struct lws* last_wsi = NULL;
|
||||
int current_client_subs = 0;
|
||||
|
||||
while (current) {
|
||||
if (current->wsi != last_wsi) {
|
||||
// New client
|
||||
if (last_wsi != NULL) {
|
||||
client_count++;
|
||||
if (current_client_subs > most_subs_per_client) {
|
||||
most_subs_per_client = current_client_subs;
|
||||
}
|
||||
}
|
||||
last_wsi = current->wsi;
|
||||
current_client_subs = 1;
|
||||
} else {
|
||||
current_client_subs++;
|
||||
if (sqlite3_prepare_v2(g_db, max_sql, -1, &stmt, NULL) == SQLITE_OK) {
|
||||
if (sqlite3_step(stmt) == SQLITE_ROW) {
|
||||
most_subs_per_client = sqlite3_column_int(stmt, 0);
|
||||
}
|
||||
current = current->next;
|
||||
sqlite3_finalize(stmt);
|
||||
}
|
||||
|
||||
// Handle last client
|
||||
if (last_wsi != NULL) {
|
||||
client_count++;
|
||||
if (current_client_subs > most_subs_per_client) {
|
||||
most_subs_per_client = current_client_subs;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
// Calculate statistics
|
||||
double utilization_percentage = max_subs > 0 ? (total_subs * 100.0 / max_subs) : 0.0;
|
||||
double avg_subs_per_client = client_count > 0 ? (total_subs * 1.0 / client_count) : 0.0;
|
||||
@@ -301,10 +307,29 @@ cJSON* query_active_subscriptions(void) {
|
||||
return subscriptions;
|
||||
}
|
||||
|
||||
// Query detailed subscription information from in-memory manager (ADMIN ONLY)
|
||||
// Query detailed subscription information from database log (ADMIN ONLY)
|
||||
// Uses subscription_events table instead of in-memory iteration to avoid mutex contention
|
||||
cJSON* query_subscription_details(void) {
|
||||
// Access the global subscription manager
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
extern sqlite3* g_db;
|
||||
if (!g_db) {
|
||||
DEBUG_ERROR("Database not available for subscription details query");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Query active subscriptions directly from subscription_events table
|
||||
// Get subscriptions that were created but not yet closed/expired/disconnected
|
||||
sqlite3_stmt* stmt;
|
||||
const char* sql =
|
||||
"SELECT subscription_id, client_ip, filter_json, events_sent, "
|
||||
"created_at, (strftime('%s', 'now') - created_at) as duration_seconds "
|
||||
"FROM subscription_events "
|
||||
"WHERE event_type = 'created' AND ended_at IS NULL "
|
||||
"ORDER BY created_at DESC LIMIT 100";
|
||||
|
||||
if (sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL) != SQLITE_OK) {
|
||||
DEBUG_ERROR("Failed to prepare subscription details query");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
time_t current_time = time(NULL);
|
||||
cJSON* subscriptions_data = cJSON_CreateObject();
|
||||
@@ -314,70 +339,43 @@ cJSON* query_subscription_details(void) {
|
||||
cJSON* data = cJSON_CreateObject();
|
||||
cJSON* subscriptions_array = cJSON_CreateArray();
|
||||
|
||||
// Iterate through all active subscriptions
|
||||
subscription_t* current = g_subscription_manager.active_subscriptions;
|
||||
while (current) {
|
||||
// Iterate through query results
|
||||
while (sqlite3_step(stmt) == SQLITE_ROW) {
|
||||
cJSON* sub_obj = cJSON_CreateObject();
|
||||
|
||||
// Basic subscription info
|
||||
cJSON_AddStringToObject(sub_obj, "id", current->id);
|
||||
cJSON_AddStringToObject(sub_obj, "client_ip", current->client_ip);
|
||||
cJSON_AddNumberToObject(sub_obj, "created_at", (double)current->created_at);
|
||||
cJSON_AddNumberToObject(sub_obj, "duration_seconds", (double)(current_time - current->created_at));
|
||||
cJSON_AddNumberToObject(sub_obj, "events_sent", current->events_sent);
|
||||
cJSON_AddBoolToObject(sub_obj, "active", current->active);
|
||||
// Extract subscription data from database
|
||||
const char* sub_id = (const char*)sqlite3_column_text(stmt, 0);
|
||||
const char* client_ip = (const char*)sqlite3_column_text(stmt, 1);
|
||||
const char* filter_json = (const char*)sqlite3_column_text(stmt, 2);
|
||||
long long events_sent = sqlite3_column_int64(stmt, 3);
|
||||
long long created_at = sqlite3_column_int64(stmt, 4);
|
||||
long long duration_seconds = sqlite3_column_int64(stmt, 5);
|
||||
|
||||
// Extract filter details
|
||||
cJSON* filters_array = cJSON_CreateArray();
|
||||
subscription_filter_t* filter = current->filters;
|
||||
// Add basic subscription info
|
||||
cJSON_AddStringToObject(sub_obj, "id", sub_id ? sub_id : "");
|
||||
cJSON_AddStringToObject(sub_obj, "client_ip", client_ip ? client_ip : "");
|
||||
cJSON_AddNumberToObject(sub_obj, "created_at", (double)created_at);
|
||||
cJSON_AddNumberToObject(sub_obj, "duration_seconds", (double)duration_seconds);
|
||||
cJSON_AddNumberToObject(sub_obj, "events_sent", events_sent);
|
||||
cJSON_AddBoolToObject(sub_obj, "active", 1); // All from this view are active
|
||||
|
||||
while (filter) {
|
||||
cJSON* filter_obj = cJSON_CreateObject();
|
||||
|
||||
// Add kinds array if present
|
||||
if (filter->kinds) {
|
||||
cJSON_AddItemToObject(filter_obj, "kinds", cJSON_Duplicate(filter->kinds, 1));
|
||||
// Parse and add filter JSON if available
|
||||
if (filter_json) {
|
||||
cJSON* filters = cJSON_Parse(filter_json);
|
||||
if (filters) {
|
||||
cJSON_AddItemToObject(sub_obj, "filters", filters);
|
||||
} else {
|
||||
// If parsing fails, add empty array
|
||||
cJSON_AddItemToObject(sub_obj, "filters", cJSON_CreateArray());
|
||||
}
|
||||
|
||||
// Add authors array if present
|
||||
if (filter->authors) {
|
||||
cJSON_AddItemToObject(filter_obj, "authors", cJSON_Duplicate(filter->authors, 1));
|
||||
}
|
||||
|
||||
// Add ids array if present
|
||||
if (filter->ids) {
|
||||
cJSON_AddItemToObject(filter_obj, "ids", cJSON_Duplicate(filter->ids, 1));
|
||||
}
|
||||
|
||||
// Add since/until timestamps if set
|
||||
if (filter->since > 0) {
|
||||
cJSON_AddNumberToObject(filter_obj, "since", (double)filter->since);
|
||||
}
|
||||
if (filter->until > 0) {
|
||||
cJSON_AddNumberToObject(filter_obj, "until", (double)filter->until);
|
||||
}
|
||||
|
||||
// Add limit if set
|
||||
if (filter->limit > 0) {
|
||||
cJSON_AddNumberToObject(filter_obj, "limit", filter->limit);
|
||||
}
|
||||
|
||||
// Add tag filters if present
|
||||
if (filter->tag_filters) {
|
||||
cJSON_AddItemToObject(filter_obj, "tag_filters", cJSON_Duplicate(filter->tag_filters, 1));
|
||||
}
|
||||
|
||||
cJSON_AddItemToArray(filters_array, filter_obj);
|
||||
filter = filter->next;
|
||||
} else {
|
||||
cJSON_AddItemToObject(sub_obj, "filters", cJSON_CreateArray());
|
||||
}
|
||||
|
||||
cJSON_AddItemToObject(sub_obj, "filters", filters_array);
|
||||
cJSON_AddItemToArray(subscriptions_array, sub_obj);
|
||||
|
||||
current = current->next;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
sqlite3_finalize(stmt);
|
||||
|
||||
// Add subscriptions array and count to data
|
||||
cJSON_AddItemToObject(data, "subscriptions", subscriptions_array);
|
||||
|
||||
Reference in New Issue
Block a user