Files
c-relay/src/subscriptions.h

124 lines
5.2 KiB
C

// Subscription system structures and functions for C-Relay
// This header defines subscription management functionality
#ifndef SUBSCRIPTIONS_H
#define SUBSCRIPTIONS_H
#include <pthread.h>
#include <time.h>
#include <stdint.h>
#include "../nostr_core_lib/cjson/cJSON.h"
#include "config.h" // For CLIENT_IP_MAX_LENGTH
#include "websockets.h" // For validation constants
// Forward declaration for libwebsockets struct
struct lws;
// Constants
#define SUBSCRIPTION_ID_MAX_LENGTH 64
#define MAX_FILTERS_PER_SUBSCRIPTION 10
#define MAX_TOTAL_SUBSCRIPTIONS 5000
// Validation limits (shared with websockets.h)
#define MAX_SEARCH_TERM_LENGTH 256
#define MIN_TIMESTAMP 0L
#define MAX_TIMESTAMP 4102444800L // 2100-01-01
#define MIN_LIMIT 1
#define MAX_LIMIT 10000
// Forward declarations for typedefs
typedef struct subscription_filter subscription_filter_t;
typedef struct subscription subscription_t;
typedef struct subscription_manager subscription_manager_t;
// Subscription filter structure
struct subscription_filter {
// Filter criteria (all optional)
cJSON* kinds; // Array of event kinds [1,2,3]
cJSON* authors; // Array of author pubkeys
cJSON* ids; // Array of event IDs
long since; // Unix timestamp (0 = not set)
long until; // Unix timestamp (0 = not set)
int limit; // Result limit (0 = no limit)
cJSON* tag_filters; // Object with tag filters: {"#e": ["id1"], "#p": ["pubkey1"]}
// Linked list for multiple filters per subscription
struct subscription_filter* next;
};
// Active subscription structure
struct subscription {
char id[SUBSCRIPTION_ID_MAX_LENGTH]; // Subscription ID
struct lws* wsi; // WebSocket connection handle
subscription_filter_t* filters; // Linked list of filters (OR'd together)
time_t created_at; // When subscription was created
int events_sent; // Counter for sent events
int active; // 1 = active, 0 = closed
// Client info for logging
char client_ip[CLIENT_IP_MAX_LENGTH]; // Client IP address
// Linked list pointers
struct subscription* next; // Next subscription globally
struct subscription* session_next; // Next subscription for this session
};
// Per-IP connection tracking
typedef struct ip_connection_info {
char ip_address[CLIENT_IP_MAX_LENGTH]; // IP address
int active_connections; // Number of active connections from this IP
int total_subscriptions; // Total subscriptions across all connections from this IP
time_t first_connection; // When first connection from this IP was established
time_t last_activity; // Last activity timestamp from this IP
struct ip_connection_info* next; // Next in linked list
} ip_connection_info_t;
// Global subscription manager
struct subscription_manager {
subscription_t* active_subscriptions; // Head of global subscription list
pthread_mutex_t subscriptions_lock; // Global thread safety
int total_subscriptions; // Current count
// Configuration
int max_subscriptions_per_client; // Default: 20
int max_total_subscriptions; // Default: 5000
// Per-IP connection tracking
ip_connection_info_t* ip_connections; // Head of per-IP connection list
pthread_mutex_t ip_tracking_lock; // Thread safety for IP tracking
// Statistics
uint64_t total_created; // Lifetime subscription count
uint64_t total_events_broadcast; // Lifetime event broadcast count
};
// Function declarations
int validate_subscription_id(const char* sub_id);
subscription_filter_t* create_subscription_filter(cJSON* filter_json);
void free_subscription_filter(subscription_filter_t* filter);
subscription_t* create_subscription(const char* sub_id, struct lws* wsi, cJSON* filters_array, const char* client_ip);
void free_subscription(subscription_t* sub);
int add_subscription_to_manager(subscription_t* sub);
int remove_subscription_from_manager(const char* sub_id, struct lws* wsi);
int event_matches_filter(cJSON* event, subscription_filter_t* filter);
int event_matches_subscription(cJSON* event, subscription_t* subscription);
int broadcast_event_to_subscriptions(cJSON* event);
// Per-IP connection tracking functions
ip_connection_info_t* get_or_create_ip_connection(const char* client_ip);
void update_ip_connection_activity(const char* client_ip);
void remove_ip_connection(const char* client_ip);
int get_total_subscriptions_for_ip(const char* client_ip);
int get_active_connections_for_ip(const char* client_ip);
// Database logging functions
void log_subscription_created(const subscription_t* sub);
void log_subscription_closed(const char* sub_id, const char* client_ip, const char* reason);
void log_subscription_disconnected(const char* client_ip);
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip);
void update_subscription_events_sent(const char* sub_id, int events_sent);
// Subscription query functions
int has_subscriptions_for_kind(int event_kind);
#endif // SUBSCRIPTIONS_H