v0.7.7 - Prevent sql attacks and rate limiting on subscriptions

This commit is contained in:
Your Name
2025-10-10 15:44:10 -04:00
parent 00a8f16262
commit 6709e229b3
11 changed files with 746 additions and 152 deletions

View File

@@ -472,52 +472,102 @@ int broadcast_event_to_subscriptions(cJSON* event) {
}
int broadcasts = 0;
// Create a temporary list of matching subscriptions to avoid holding lock during I/O
typedef struct temp_sub {
struct lws* wsi;
char id[SUBSCRIPTION_ID_MAX_LENGTH];
char client_ip[CLIENT_IP_MAX_LENGTH];
struct temp_sub* next;
} temp_sub_t;
temp_sub_t* matching_subs = NULL;
int matching_count = 0;
// First pass: collect matching subscriptions while holding lock
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* sub = g_subscription_manager.active_subscriptions;
while (sub) {
if (sub->active && event_matches_subscription(event, sub)) {
// Create EVENT message for this subscription
cJSON* event_msg = cJSON_CreateArray();
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub->id));
cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1));
char* msg_str = cJSON_Print(event_msg);
if (msg_str) {
size_t msg_len = strlen(msg_str);
unsigned char* buf = malloc(LWS_PRE + msg_len);
if (buf) {
memcpy(buf + LWS_PRE, msg_str, msg_len);
// Send to WebSocket connection
int write_result = lws_write(sub->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result >= 0) {
sub->events_sent++;
broadcasts++;
// Log event broadcast to database (optional - can be disabled for performance)
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
if (event_id_obj && cJSON_IsString(event_id_obj)) {
log_event_broadcast(cJSON_GetStringValue(event_id_obj), sub->id, sub->client_ip);
}
}
free(buf);
}
free(msg_str);
if (sub->active && sub->wsi && event_matches_subscription(event, sub)) {
temp_sub_t* temp = malloc(sizeof(temp_sub_t));
if (temp) {
temp->wsi = sub->wsi;
strncpy(temp->id, sub->id, SUBSCRIPTION_ID_MAX_LENGTH - 1);
temp->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
strncpy(temp->client_ip, sub->client_ip, CLIENT_IP_MAX_LENGTH - 1);
temp->client_ip[CLIENT_IP_MAX_LENGTH - 1] = '\0';
temp->next = matching_subs;
matching_subs = temp;
matching_count++;
}
cJSON_Delete(event_msg);
}
sub = sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Second pass: send messages without holding lock
temp_sub_t* current_temp = matching_subs;
while (current_temp) {
// Create EVENT message for this subscription
cJSON* event_msg = cJSON_CreateArray();
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
cJSON_AddItemToArray(event_msg, cJSON_CreateString(current_temp->id));
cJSON_AddItemToArray(event_msg, cJSON_Duplicate(event, 1));
char* msg_str = cJSON_Print(event_msg);
if (msg_str) {
size_t msg_len = strlen(msg_str);
unsigned char* buf = malloc(LWS_PRE + msg_len);
if (buf) {
memcpy(buf + LWS_PRE, msg_str, msg_len);
// Send to WebSocket connection with error checking
// Note: lws_write can fail if connection is closed, but won't crash
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result >= 0) {
broadcasts++;
// Update events sent counter for this subscription
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* update_sub = g_subscription_manager.active_subscriptions;
while (update_sub) {
if (update_sub->wsi == current_temp->wsi &&
strcmp(update_sub->id, current_temp->id) == 0) {
update_sub->events_sent++;
break;
}
update_sub = update_sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Log event broadcast to database (optional - can be disabled for performance)
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
if (event_id_obj && cJSON_IsString(event_id_obj)) {
log_event_broadcast(cJSON_GetStringValue(event_id_obj), current_temp->id, current_temp->client_ip);
}
}
free(buf);
}
free(msg_str);
}
sub = sub->next;
cJSON_Delete(event_msg);
current_temp = current_temp->next;
}
// Clean up temporary subscription list
while (matching_subs) {
temp_sub_t* next = matching_subs->next;
free(matching_subs);
matching_subs = next;
}
// Update global statistics
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
g_subscription_manager.total_events_broadcast += broadcasts;
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
return broadcasts;
@@ -688,19 +738,149 @@ void log_event_broadcast(const char* event_id, const char* sub_id, const char* c
// Update events sent counter for a subscription
void update_subscription_events_sent(const char* sub_id, int events_sent) {
if (!g_db || !sub_id) return;
const char* sql =
"UPDATE subscription_events "
"SET events_sent = ? "
"WHERE subscription_id = ? AND event_type = 'created'";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc == SQLITE_OK) {
sqlite3_bind_int(stmt, 1, events_sent);
sqlite3_bind_text(stmt, 2, sub_id, -1, SQLITE_STATIC);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
}
}
///////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////
// PER-IP CONNECTION TRACKING
///////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////
// Get or create IP connection info (thread-safe)
ip_connection_info_t* get_or_create_ip_connection(const char* client_ip) {
if (!client_ip) return NULL;
pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock);
// Look for existing IP connection info
ip_connection_info_t* current = g_subscription_manager.ip_connections;
while (current) {
if (strcmp(current->ip_address, client_ip) == 0) {
// Found existing entry, update activity
current->last_activity = time(NULL);
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return current;
}
current = current->next;
}
// Create new IP connection info
ip_connection_info_t* new_ip = calloc(1, sizeof(ip_connection_info_t));
if (!new_ip) {
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return NULL;
}
// Copy IP address safely
strncpy(new_ip->ip_address, client_ip, CLIENT_IP_MAX_LENGTH - 1);
new_ip->ip_address[CLIENT_IP_MAX_LENGTH - 1] = '\0';
// Initialize tracking data
time_t now = time(NULL);
new_ip->active_connections = 1;
new_ip->total_subscriptions = 0;
new_ip->first_connection = now;
new_ip->last_activity = now;
// Add to linked list
new_ip->next = g_subscription_manager.ip_connections;
g_subscription_manager.ip_connections = new_ip;
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return new_ip;
}
// Update IP connection activity timestamp
void update_ip_connection_activity(const char* client_ip) {
if (!client_ip) return;
pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock);
ip_connection_info_t* current = g_subscription_manager.ip_connections;
while (current) {
if (strcmp(current->ip_address, client_ip) == 0) {
current->last_activity = time(NULL);
break;
}
current = current->next;
}
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
}
// Remove IP connection (when last connection from IP closes)
void remove_ip_connection(const char* client_ip) {
if (!client_ip) return;
pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock);
ip_connection_info_t** current = &g_subscription_manager.ip_connections;
while (*current) {
ip_connection_info_t* entry = *current;
if (strcmp(entry->ip_address, client_ip) == 0) {
// Remove from list
*current = entry->next;
free(entry);
break;
}
current = &((*current)->next);
}
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
}
// Get total subscriptions for an IP address
int get_total_subscriptions_for_ip(const char* client_ip) {
if (!client_ip) return 0;
pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock);
ip_connection_info_t* current = g_subscription_manager.ip_connections;
while (current) {
if (strcmp(current->ip_address, client_ip) == 0) {
int total = current->total_subscriptions;
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return total;
}
current = current->next;
}
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return 0;
}
// Get active connections for an IP address
int get_active_connections_for_ip(const char* client_ip) {
if (!client_ip) return 0;
pthread_mutex_lock(&g_subscription_manager.ip_tracking_lock);
ip_connection_info_t* current = g_subscription_manager.ip_connections;
while (current) {
if (strcmp(current->ip_address, client_ip) == 0) {
int active = current->active_connections;
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return active;
}
current = current->next;
}
pthread_mutex_unlock(&g_subscription_manager.ip_tracking_lock);
return 0;
}