Compare commits

..

4 Commits

26 changed files with 3082 additions and 761 deletions

View File

@@ -5,6 +5,9 @@ ARG DEBUG_BUILD=false
FROM alpine:3.19 AS builder
# Re-declare build argument in this stage
ARG DEBUG_BUILD=false
# Install build dependencies
RUN apk add --no-cache \
build-base \

View File

@@ -54,9 +54,8 @@
<div class="section flex-section" id="databaseStatisticsSection" style="display: none;">
<div class="section-header">
<h2>DATABASE STATISTICS</h2>
<!-- Monitoring toggle button will be inserted here by JavaScript -->
<!-- Temporarily disable auto-refresh button for real-time monitoring -->
<!-- <button type="button" id="refresh-stats-btn" class="countdown-btn"></button> -->
<!-- Monitoring is now subscription-based - no toggle button needed -->
<!-- Subscribe to kind 24567 events to receive real-time monitoring data -->
</div>
<!-- Event Rate Graph Container -->
@@ -81,10 +80,26 @@
<td>Total Events</td>
<td id="total-events">-</td>
</tr>
<tr>
<td>Process ID</td>
<td id="process-id">-</td>
</tr>
<tr>
<td>Active Subscriptions</td>
<td id="active-subscriptions">-</td>
</tr>
<tr>
<td>Memory Usage</td>
<td id="memory-usage">-</td>
</tr>
<tr>
<td>CPU Core</td>
<td id="cpu-core">-</td>
</tr>
<tr>
<td>CPU Usage</td>
<td id="cpu-usage">-</td>
</tr>
<tr>
<td>Oldest Event</td>
<td id="oldest-event">-</td>
@@ -185,15 +200,14 @@
<tr>
<th>Subscription ID</th>
<th>Client IP</th>
<th>WSI Pointer</th>
<th>Duration</th>
<th>Events Sent</th>
<th>Status</th>
<th>Filters</th>
</tr>
</thead>
<tbody id="subscription-details-table-body">
<tr>
<td colspan="6" style="text-align: center; font-style: italic;">No subscriptions active</td>
<td colspan="5" style="text-align: center; font-style: italic;">No subscriptions active</td>
</tr>
</tbody>
</table>

View File

@@ -23,6 +23,7 @@ let currentConfig = null;
let relayPool = null;
let subscriptionId = null;
let isSubscribed = false; // Flag to prevent multiple simultaneous subscriptions
let isSubscribing = false; // Flag to prevent re-entry during subscription setup
// Relay connection state
let relayInfo = null;
let isRelayConnected = false;
@@ -307,6 +308,13 @@ async function restoreAuthenticationState(pubkey) {
// Automatically set up relay connection based on current page URL
async function setupAutomaticRelayConnection(showSections = false) {
console.log('=== SETUP AUTOMATIC RELAY CONNECTION CALLED ===');
console.log('Call stack:', new Error().stack);
console.log('showSections:', showSections);
console.log('Current isRelayConnected:', isRelayConnected);
console.log('Current relayPool:', relayPool ? 'EXISTS' : 'NULL');
console.log('Current isSubscribed:', isSubscribed);
try {
// Get the current page URL and convert to WebSocket URL
const currentUrl = window.location.href;
@@ -322,8 +330,9 @@ async function setupAutomaticRelayConnection(showSections = false) {
}
// Remove any path components to get just the base URL
// CRITICAL: Always add trailing slash for consistent URL format
const url = new URL(relayUrl);
relayUrl = `${url.protocol}//${url.host}`;
relayUrl = `${url.protocol}//${url.host}/`;
// Set the relay URL
relayConnectionUrl.value = relayUrl;
@@ -348,13 +357,8 @@ async function setupAutomaticRelayConnection(showSections = false) {
relayPubkey = '4f355bdcb7cc0af728ef3cceb9615d90684bb5b2ca5f859ab0f0b704075871aa';
}
// Initialize relay pool for admin API communication
if (!relayPool) {
relayPool = new window.NostrTools.SimplePool();
console.log('🔌 Initialized SimplePool for admin API communication');
}
// Set up subscription to receive admin API responses
// Note: subscribeToConfiguration() will create the SimplePool internally
await subscribeToConfiguration();
console.log('📡 Subscription established for admin API responses');
@@ -615,16 +619,22 @@ async function loadUserProfile() {
'wss://relay.nostr.band',
'wss://nos.lol',
'wss://relay.primal.net',
'wss://relay.snort.social',
'wss://relay.laantungir.net'];
'wss://relay.snort.social'
];
// Get profile event (kind 0) for the user
const events = await profilePool.querySync(relays, {
// Get profile event (kind 0) for the user with timeout
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Profile query timeout')), 5000)
);
const queryPromise = profilePool.querySync(relays, {
kinds: [0],
authors: [userPubkey],
limit: 1
});
const events = await Promise.race([queryPromise, timeoutPromise]);
if (events.length > 0) {
console.log('Profile event found:', events[0]);
const profile = JSON.parse(events[0].content);
@@ -648,8 +658,14 @@ async function loadUserProfile() {
// Keep the npub display
}
// Close the profile pool
profilePool.close(relays);
// Properly close the profile pool with error handling
try {
await profilePool.close(relays);
// Give time for cleanup
await new Promise(resolve => setTimeout(resolve, 100));
} catch (closeError) {
console.log('Profile pool close error (non-critical):', closeError.message);
}
} catch (error) {
console.log('Profile loading failed: ' + error.message);
@@ -755,20 +771,25 @@ async function logout() {
// Stop auto-refresh before disconnecting
stopStatsAutoRefresh();
// Clean up configuration pool
// Clean up relay pool
if (relayPool) {
log('Closing configuration pool...', 'INFO');
log('Closing relay pool...', 'INFO');
const url = relayConnectionUrl.value.trim();
if (url) {
relayPool.close([url]);
try {
await relayPool.close([url]);
} catch (e) {
console.log('Pool close error (non-critical):', e.message);
}
}
relayPool = null;
subscriptionId = null;
// Reset subscription flag
isSubscribed = false;
}
// Reset subscription flags
isSubscribed = false;
isSubscribing = false;
await nlLite.logout();
userPubkey = null;
@@ -778,12 +799,9 @@ async function logout() {
// Reset relay connection state
isRelayConnected = false;
relayPubkey = null;
// Reset subscription flag
isSubscribed = false;
// Reset UI - hide profile and show login modal
hideProfileFromHeader();
// showLoginModal() removed - handled by handleLogoutEvent()
updateConfigStatus(false);
updateAdminSectionsVisibility();
@@ -815,42 +833,204 @@ function generateSubId() {
return result;
}
// WebSocket monitoring function to attach to SimplePool connections
function attachWebSocketMonitoring(relayPool, url) {
console.log('🔍 Attaching WebSocket monitoring to SimplePool...');
// SimplePool stores connections in _conn object
if (relayPool && relayPool._conn) {
// Monitor when connections are created
const originalGetConnection = relayPool._conn[url];
if (originalGetConnection) {
console.log('📡 Found existing connection for URL:', url);
// Try to access the WebSocket if it's available
const conn = relayPool._conn[url];
if (conn && conn.ws) {
attachWebSocketEventListeners(conn.ws, url);
}
}
// Override the connection getter to monitor new connections
const originalConn = relayPool._conn;
relayPool._conn = new Proxy(originalConn, {
get(target, prop) {
const conn = target[prop];
if (conn && conn.ws && !conn.ws._monitored) {
console.log('🔗 New WebSocket connection detected for:', prop);
attachWebSocketEventListeners(conn.ws, prop);
conn.ws._monitored = true;
}
return conn;
},
set(target, prop, value) {
if (value && value.ws && !value.ws._monitored) {
console.log('🔗 WebSocket connection being set for:', prop);
attachWebSocketEventListeners(value.ws, prop);
value.ws._monitored = true;
}
target[prop] = value;
return true;
}
});
}
console.log('✅ WebSocket monitoring attached');
}
function attachWebSocketEventListeners(ws, url) {
console.log(`🎯 Attaching event listeners to WebSocket for ${url}`);
// Log connection open
ws.addEventListener('open', (event) => {
console.log(`🔓 WebSocket OPEN for ${url}:`, {
readyState: ws.readyState,
url: ws.url,
protocol: ws.protocol,
extensions: ws.extensions
});
});
// Log incoming messages with full details
ws.addEventListener('message', (event) => {
try {
const data = event.data;
console.log(`📨 WebSocket MESSAGE from ${url}:`, {
type: event.type,
data: data,
dataLength: data.length,
timestamp: new Date().toISOString()
});
// Try to parse as JSON for Nostr messages
try {
const parsed = JSON.parse(data);
if (Array.isArray(parsed)) {
const [type, ...args] = parsed;
console.log(`📨 Parsed Nostr message [${type}]:`, args);
} else {
console.log(`📨 Parsed JSON:`, parsed);
}
} catch (parseError) {
console.log(`📨 Raw message (not JSON):`, data);
}
} catch (error) {
console.error(`❌ Error processing WebSocket message from ${url}:`, error);
}
});
// Log connection close with details
ws.addEventListener('close', (event) => {
console.log(`🔒 WebSocket CLOSE for ${url}:`, {
code: event.code,
reason: event.reason,
wasClean: event.wasClean,
readyState: ws.readyState,
timestamp: new Date().toISOString()
});
});
// Log errors with full details
ws.addEventListener('error', (event) => {
console.error(`❌ WebSocket ERROR for ${url}:`, {
type: event.type,
target: event.target,
readyState: ws.readyState,
url: ws.url,
timestamp: new Date().toISOString()
});
// Log additional WebSocket state
console.error(`❌ WebSocket state details:`, {
readyState: ws.readyState,
bufferedAmount: ws.bufferedAmount,
protocol: ws.protocol,
extensions: ws.extensions,
binaryType: ws.binaryType
});
});
// Override send method to log outgoing messages
const originalSend = ws.send;
ws.send = function(data) {
console.log(`📤 WebSocket SEND to ${url}:`, {
data: data,
dataLength: data.length,
readyState: ws.readyState,
timestamp: new Date().toISOString()
});
// Try to parse outgoing Nostr messages
try {
const parsed = JSON.parse(data);
if (Array.isArray(parsed)) {
const [type, ...args] = parsed;
console.log(`📤 Outgoing Nostr message [${type}]:`, args);
} else {
console.log(`📤 Outgoing JSON:`, parsed);
}
} catch (parseError) {
console.log(`📤 Outgoing raw message (not JSON):`, data);
}
return originalSend.call(this, data);
};
console.log(`✅ Event listeners attached to WebSocket for ${url}`);
}
// Configuration subscription using nostr-tools SimplePool
async function subscribeToConfiguration() {
try {
console.log('=== STARTING SIMPLEPOOL CONFIGURATION SUBSCRIPTION ===');
console.log('=== SUBSCRIBE TO CONFIGURATION ===');
console.log('Call stack:', new Error().stack);
// Prevent multiple simultaneous subscription attempts
if (isSubscribed) {
console.log('Subscription already established, skipping duplicate subscription attempt');
// If pool already exists and subscribed, we're done
if (relayPool && isSubscribed) {
console.log('✅ Already subscribed, reusing existing pool');
return true;
}
if (!isLoggedIn) {
console.log('WARNING: Not logged in, but proceeding with subscription test');
}
const url = relayConnectionUrl.value.trim();
if (!url) {
console.error('Please enter a relay URL');
// Prevent concurrent subscription attempts
if (isSubscribing) {
console.log('⚠️ Subscription already in progress');
return false;
}
console.log(`Connecting to relay via SimplePool: ${url}`);
isSubscribing = true;
// Reuse existing pool if available, otherwise create new one
const url = relayConnectionUrl.value.trim();
if (!url) {
console.error('No relay URL configured');
isSubscribing = false;
return false;
}
console.log(`🔌 Connecting to relay: ${url}`);
// Create pool ONLY if it doesn't exist
if (!relayPool) {
console.log('Creating new SimplePool instance');
console.log('Creating NEW SimplePool for admin operations');
relayPool = new window.NostrTools.SimplePool();
// Attach WebSocket monitoring to the new pool
attachWebSocketMonitoring(relayPool, url);
} else {
console.log('Reusing existing SimplePool instance');
console.log('♻️ Reusing existing SimplePool');
}
subscriptionId = generateSubId();
console.log(`Generated subscription ID: ${subscriptionId}`);
console.log(`User pubkey ${userPubkey}`)
// Subscribe to kind 23457 events (admin response events), kind 4 (NIP-04 DMs), kind 1059 (NIP-17 GiftWrap), and kind 34567 (monitoring events)
console.log(`📝 Generated subscription ID: ${subscriptionId}`);
console.log(`👤 User pubkey: ${userPubkey}`);
console.log(`🎯 About to call relayPool.subscribeMany with URL: ${url}`);
console.log(`📊 relayPool._conn before subscribeMany:`, Object.keys(relayPool._conn || {}));
// Mark as subscribed BEFORE calling subscribeMany to prevent race conditions
isSubscribed = true;
// Subscribe to kind 23457 events (admin response events), kind 4 (NIP-04 DMs), kind 1059 (NIP-17 GiftWrap), and kind 24567 (ephemeral monitoring events)
console.log('🔔 Calling relayPool.subscribeMany...');
const subscription = relayPool.subscribeMany([url], [{
since: Math.floor(Date.now() / 1000) - 5, // Look back 5 seconds to avoid race condition
kinds: [23457],
@@ -870,22 +1050,23 @@ async function subscribeToConfiguration() {
limit: 50
}, {
since: Math.floor(Date.now() / 1000), // Start from current time
kinds: [34567], // Real-time monitoring events
kinds: [24567], // Real-time ephemeral monitoring events
authors: [getRelayPubkey()], // Only listen to monitoring events from the relay
"#d": isLoggedIn ? ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "subscription_details"] : ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions"], // Include subscription_details only when authenticated
"#d": isLoggedIn ? ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "subscription_details", "cpu_metrics"] : ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "cpu_metrics"], // Include subscription_details only when authenticated, cpu_metrics available to all
limit: 50
}], {
async onevent(event) {
console.log('=== EVENT RECEIVED VIA SIMPLEPOOL ===');
console.log('Event data:', event);
console.log('Event kind:', event.kind);
console.log('Event tags:', event.tags);
console.log('Event pubkey:', event.pubkey);
console.log('=== END EVENT ===');
// Simplified logging - one line per event
if (event.kind === 24567) {
const dTag = event.tags.find(tag => tag[0] === 'd');
const dataType = dTag ? dTag[1] : 'unknown';
console.log(`📊 Monitoring event: ${dataType}`);
} else {
console.log(`📨 Event received: kind ${event.kind}`);
}
// Handle NIP-04 DMs
if (event.kind === 4) {
console.log('=== NIP-04 DM RECEIVED ===');
try {
// Decrypt the DM content
const decryptedContent = await window.nostr.nip04.decrypt(event.pubkey, event.content);
@@ -910,7 +1091,6 @@ async function subscribeToConfiguration() {
// Handle NIP-17 GiftWrap DMs
if (event.kind === 1059) {
console.log('=== NIP-17 GIFTWRAP RECEIVED ===');
try {
// Step 1: Unwrap gift wrap to get seal
const sealJson = await window.nostr.nip44.decrypt(event.pubkey, event.content);
@@ -956,12 +1136,9 @@ async function subscribeToConfiguration() {
processAdminResponse(event);
}
// Handle monitoring events (kind 34567)
if (event.kind === 34567) {
console.log('=== MONITORING EVENT RECEIVED ===');
console.log('Monitoring event:', event);
// Process monitoring event
// Handle monitoring events (kind 24567 - ephemeral)
if (event.kind === 24567) {
// Process monitoring event (logging done above)
processMonitoringEvent(event);
}
},
@@ -975,23 +1152,30 @@ async function subscribeToConfiguration() {
},
onclose(reason) {
console.log('Subscription closed:', reason);
// Reset subscription state to allow re-subscription
isSubscribed = false;
isSubscribing = false;
isRelayConnected = false;
updateConfigStatus(false);
log('WebSocket connection closed - subscription state reset', 'WARNING');
}
});
// Store subscription for cleanup
relayPool.currentSubscription = subscription;
// Mark as subscribed to prevent duplicate attempts
// Mark as subscribed
isSubscribed = true;
isSubscribing = false;
console.log('SimplePool subscription established');
console.log('✅ Subscription established successfully');
return true;
} catch (error) {
console.error('Configuration subscription failed:', error.message);
console.error('Configuration subscription failed:', error);
console.error('Error stack:', error.stack);
isSubscribing = false;
return false;
}
}
@@ -1087,13 +1271,14 @@ function initializeEventRateChart() {
eventRateChart = new ASCIIBarChart('event-rate-chart', {
maxHeight: 11, // Chart height in lines
maxDataPoints: 76, // Show last 76 bins (5+ minutes of history)
title: 'Events', // Chart title
title: 'New Events', // Chart title
xAxisLabel: '', // No X-axis label
yAxisLabel: '', // No Y-axis label
autoFitWidth: true, // Enable responsive font sizing
useBinMode: true, // Enable time bin aggregation
binDuration: 4000, // 4-second time bins
xAxisLabelFormat: 'elapsed' // Show elapsed time labels
xAxisLabelFormat: 'elapsed', // Show elapsed time labels
debug: false // Disable debug logging
});
console.log('ASCIIBarChart instance created:', eventRateChart);
@@ -1145,73 +1330,59 @@ function createChartStubElements() {
console.log('Chart stub elements created');
}
// Handle monitoring events (kind 34567)
// Handle monitoring events (kind 24567 - ephemeral)
async function processMonitoringEvent(event) {
try {
console.log('=== PROCESSING MONITORING EVENT ===');
console.log('Monitoring event:', event);
// Verify this is a kind 34567 monitoring event
if (event.kind !== 34567) {
console.log('Ignoring non-monitoring event, kind:', event.kind);
// Verify this is a kind 24567 ephemeral monitoring event
if (event.kind !== 24567) {
return;
}
// Verify the event is from the relay
const expectedRelayPubkey = getRelayPubkey();
if (event.pubkey !== expectedRelayPubkey) {
console.log('Ignoring monitoring event from unknown pubkey:', event.pubkey);
return;
}
// Check the d-tag to determine which type of monitoring event this is
const dTag = event.tags.find(tag => tag[0] === 'd');
if (!dTag) {
console.log('Ignoring monitoring event without d-tag');
return;
}
// Parse the monitoring data (content is JSON, not encrypted for monitoring events)
const monitoringData = JSON.parse(event.content);
console.log('Parsed monitoring data:', monitoringData);
// Don't add to chart here - we'll track actual event rate changes in updateStatsFromMonitoringEvent
console.log('Monitoring event received - will track rate changes in stats update');
// Route to appropriate handler based on d-tag
// Route to appropriate handler based on d-tag (no verbose logging)
switch (dTag[1]) {
case 'event_kinds':
updateStatsFromMonitoringEvent(monitoringData);
log('Real-time event_kinds monitoring data updated', 'INFO');
break;
case 'time_stats':
updateStatsFromTimeMonitoringEvent(monitoringData);
log('Real-time time_stats monitoring data updated', 'INFO');
break;
case 'top_pubkeys':
updateStatsFromTopPubkeysMonitoringEvent(monitoringData);
log('Real-time top_pubkeys monitoring data updated', 'INFO');
break;
case 'active_subscriptions':
updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData);
log('Real-time active_subscriptions monitoring data updated', 'INFO');
break;
case 'subscription_details':
// Only process subscription details if user is authenticated
if (isLoggedIn) {
updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData);
log('Real-time subscription_details monitoring data updated', 'INFO');
} else {
console.log('Ignoring subscription_details monitoring event - user not authenticated');
}
break;
case 'cpu_metrics':
updateStatsFromCpuMonitoringEvent(monitoringData);
break;
default:
console.log('Ignoring monitoring event with unknown d-tag:', dTag[1]);
return;
}
@@ -2433,29 +2604,11 @@ async function saveAuthRule(event) {
}
}
// Auto-enable monitoring when admin logs in
// Monitoring is now subscription-based - no auto-enable needed
// Monitoring automatically activates when someone subscribes to kind 24567 events
async function autoEnableMonitoring() {
if (!isLoggedIn || !relayPool) {
log('Cannot auto-enable monitoring: not logged in or no relay connection', 'WARNING');
return;
}
try {
log('Auto-enabling monitoring for admin session...', 'INFO');
// Send enable_monitoring command
const commandArray = ["enable_monitoring"];
const requestEvent = await sendAdminCommand(commandArray);
if (requestEvent) {
log('Monitoring auto-enabled for admin session', 'INFO');
} else {
log('Failed to auto-enable monitoring', 'ERROR');
}
} catch (error) {
log(`Failed to auto-enable monitoring: ${error.message}`, 'ERROR');
}
log('Monitoring system is subscription-based - no manual enable needed', 'INFO');
log('Subscribe to kind 24567 events to receive real-time monitoring data', 'INFO');
}
// Update existing logout and showMainInterface functions to handle auth rules and NIP-17 DMs
@@ -3788,11 +3941,8 @@ function handleStatsQueryResponse(responseData) {
// Update statistics display from real-time monitoring event
function updateStatsFromMonitoringEvent(monitoringData) {
try {
log('Updating stats from monitoring event...', 'INFO');
console.log('Monitoring data:', monitoringData);
if (monitoringData.data_type !== 'event_kinds') {
log('Ignoring monitoring event with different data type', 'WARNING');
return;
}
@@ -3819,8 +3969,6 @@ function updateStatsFromMonitoringEvent(monitoringData) {
populateStatsKindsFromMonitoring(monitoringData.kinds, monitoringData.total_events);
}
log('Real-time statistics updated from monitoring event', 'INFO');
} catch (error) {
log(`Error updating stats from monitoring event: ${error.message}`, 'ERROR');
}
@@ -3829,11 +3977,7 @@ function updateStatsFromMonitoringEvent(monitoringData) {
// Update statistics display from time_stats monitoring event
function updateStatsFromTimeMonitoringEvent(monitoringData) {
try {
log('Updating time stats from monitoring event...', 'INFO');
console.log('Time monitoring data:', monitoringData);
if (monitoringData.data_type !== 'time_stats') {
log('Ignoring time monitoring event with different data type', 'WARNING');
return;
}
@@ -3852,8 +3996,6 @@ function updateStatsFromTimeMonitoringEvent(monitoringData) {
populateStatsTime({ time_stats: timeStats });
}
log('Real-time time statistics updated from monitoring event', 'INFO');
} catch (error) {
log(`Error updating time stats from monitoring event: ${error.message}`, 'ERROR');
}
@@ -3862,11 +4004,7 @@ function updateStatsFromTimeMonitoringEvent(monitoringData) {
// Update statistics display from top_pubkeys monitoring event
function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
try {
log('Updating top pubkeys from monitoring event...', 'INFO');
console.log('Top pubkeys monitoring data:', monitoringData);
if (monitoringData.data_type !== 'top_pubkeys') {
log('Ignoring top pubkeys monitoring event with different data type', 'WARNING');
return;
}
@@ -3876,8 +4014,6 @@ function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
populateStatsPubkeysFromMonitoring(monitoringData.pubkeys, monitoringData.total_events || 0);
}
log('Real-time top pubkeys statistics updated from monitoring event', 'INFO');
} catch (error) {
log(`Error updating top pubkeys from monitoring event: ${error.message}`, 'ERROR');
}
@@ -3886,11 +4022,7 @@ function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
// Update statistics display from active_subscriptions monitoring event
function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
try {
log('Updating active subscriptions from monitoring event...', 'INFO');
console.log('Active subscriptions monitoring data:', monitoringData);
if (monitoringData.data_type !== 'active_subscriptions') {
log('Ignoring active subscriptions monitoring event with different data type', 'WARNING');
return;
}
@@ -3900,8 +4032,6 @@ function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
updateStatsCell('active-subscriptions', monitoringData.data.total_subscriptions.toString());
}
log('Real-time active subscriptions statistics updated from monitoring event', 'INFO');
} catch (error) {
log(`Error updating active subscriptions from monitoring event: ${error.message}`, 'ERROR');
}
@@ -3910,11 +4040,7 @@ function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
// Update statistics display from subscription_details monitoring event
function updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData) {
try {
log('Updating subscription details from monitoring event...', 'INFO');
console.log('Subscription details monitoring data:', monitoringData);
if (monitoringData.data_type !== 'subscription_details') {
log('Ignoring subscription details monitoring event with different data type', 'WARNING');
return;
}
@@ -3923,13 +4049,43 @@ function updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData) {
populateSubscriptionDetailsTable(monitoringData.data.subscriptions);
}
log('Real-time subscription details statistics updated from monitoring event', 'INFO');
} catch (error) {
log(`Error updating subscription details from monitoring event: ${error.message}`, 'ERROR');
}
}
// Update statistics display from CPU metrics monitoring event
function updateStatsFromCpuMonitoringEvent(monitoringData) {
try {
if (monitoringData.data_type !== 'cpu_metrics') {
return;
}
// Update CPU metrics in the database statistics table
if (monitoringData.process_id !== undefined) {
updateStatsCell('process-id', monitoringData.process_id.toString());
}
if (monitoringData.memory_usage_mb !== undefined) {
updateStatsCell('memory-usage', monitoringData.memory_usage_mb.toFixed(1) + ' MB');
}
if (monitoringData.current_cpu_core !== undefined) {
updateStatsCell('cpu-core', 'Core ' + monitoringData.current_cpu_core);
}
// Calculate CPU usage percentage if we have the data
if (monitoringData.process_cpu_time !== undefined && monitoringData.system_cpu_time !== undefined) {
// For now, just show the raw process CPU time (simplified)
// In a real implementation, you'd calculate deltas over time
updateStatsCell('cpu-usage', monitoringData.process_cpu_time + ' ticks');
}
} catch (error) {
log(`Error updating CPU metrics from monitoring event: ${error.message}`, 'ERROR');
}
}
// Populate event kinds table from monitoring data
function populateStatsKindsFromMonitoring(kindsData, totalEvents) {
const tableBody = document.getElementById('stats-kinds-table-body');
@@ -4094,7 +4250,7 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
if (subscriptionsData.length === 0) {
const row = document.createElement('tr');
row.innerHTML = '<td colspan="6" style="text-align: center; font-style: italic;">No active subscriptions</td>';
row.innerHTML = '<td colspan="5" style="text-align: center; font-style: italic;">No active subscriptions</td>';
tableBody.appendChild(row);
return;
}
@@ -4110,8 +4266,8 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
// Format client IP (show full IP for admin view)
const clientIP = subscription.client_ip || 'unknown';
// Format status
const status = subscription.active ? 'Active' : 'Inactive';
// Format wsi_pointer (show full pointer)
const wsiPointer = subscription.wsi_pointer || 'N/A';
// Format filters (show actual filter details)
let filtersDisplay = 'None';
@@ -4179,9 +4335,8 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
row.innerHTML = `
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${subscription.id || 'N/A'}</td>
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${clientIP}</td>
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${wsiPointer}</td>
<td>${durationStr}</td>
<td>${subscription.events_sent || 0}</td>
<td>${status}</td>
<td>${filtersDisplay}</td>
`;
tableBody.appendChild(row);
@@ -4900,106 +5055,33 @@ function getConfigToggleButton(configKey) {
return configToggleButtons.get(configKey);
}
// Initialize toggle button for monitoring config
// Monitoring is now subscription-based - no toggle button needed
// Monitoring automatically activates when someone subscribes to kind 24567 events
function initializeMonitoringToggleButton() {
console.log('=== INITIALIZING MONITORING TOGGLE BUTTON ===');
// Check if button already exists to prevent duplicates
const existingButton = getConfigToggleButton('kind_34567_reporting_enabled');
if (existingButton) {
console.log('Monitoring toggle button already exists, skipping creation');
return existingButton;
}
// Find the DATABASE STATISTICS section header
const sectionHeader = document.querySelector('#databaseStatisticsSection .section-header h2');
console.log('Section header found:', sectionHeader);
if (!sectionHeader) {
log('Could not find DATABASE STATISTICS section header for toggle button', 'WARNING');
return;
}
// Create the toggle button
const button = new ConfigToggleButton('kind_34567_reporting_enabled', sectionHeader.parentElement, {
dataType: 'boolean',
category: 'monitoring'
});
console.log('Monitoring toggle button created:', button);
console.log('Button element:', button.button);
console.log('Button in DOM:', document.contains(button.button));
log('Monitoring toggle button initialized', 'INFO');
return button;
console.log('=== MONITORING IS NOW SUBSCRIPTION-BASED ===');
console.log('No toggle button needed - monitoring activates automatically when subscribing to kind 24567');
log('Monitoring system is subscription-based - no manual toggle required', 'INFO');
return null;
}
// Enhanced config update response handler to update toggle buttons
// Monitoring is subscription-based - no toggle button response handling needed
const originalHandleConfigUpdateResponse = handleConfigUpdateResponse;
handleConfigUpdateResponse = function(responseData) {
console.log('=== ENHANCED CONFIG UPDATE RESPONSE HANDLER ===');
console.log('=== CONFIG UPDATE RESPONSE HANDLER ===');
console.log('Response data:', responseData);
// Call original handler
originalHandleConfigUpdateResponse(responseData);
// Update toggle buttons if this was a config update response
if (responseData.query_type === 'config_update' && responseData.status === 'success' && responseData.processed_configs) {
console.log('Processing config update response for toggle buttons');
responseData.processed_configs.forEach(config => {
console.log('Processing config:', config);
const button = getConfigToggleButton(config.key);
console.log('Button found:', button);
if (button) {
const success = config.status === 'updated';
const value = String(config.value).toLowerCase();
console.log('Calling handleResponse with:', success, value);
button.handleResponse(success, value);
}
});
} else {
console.log('Not a config update response or no processed_configs');
}
// Also handle config query responses to initialize toggle buttons
if ((responseData.query_type === 'config_query' || responseData.query_type === 'config_all') && responseData.status === 'success' && responseData.data) {
console.log('Config query response - initializing toggle buttons');
initializeToggleButtonsFromConfig(responseData);
}
// Monitoring is now subscription-based - no toggle buttons to update
console.log('Monitoring system is subscription-based - no toggle buttons to handle');
};
// Initialize toggle button when config is loaded
// Monitoring is now subscription-based - no toggle buttons needed
function initializeToggleButtonsFromConfig(configData) {
console.log('=== INITIALIZING TOGGLE BUTTONS FROM CONFIG ===');
console.log('Config data:', configData);
if (!configData || !configData.data) {
console.log('No config data available');
return;
}
// Find monitoring enabled config
const monitoringConfig = configData.data.find(c => c.key === 'kind_34567_reporting_enabled');
console.log('Monitoring config found:', monitoringConfig);
if (monitoringConfig) {
const button = getConfigToggleButton('kind_34567_reporting_enabled');
console.log('Button instance:', button);
if (button) {
// Convert config value to string for state setting
const configValue = String(monitoringConfig.value).toLowerCase();
console.log('Setting button state to:', configValue);
// Set initial state from config
button.setState(configValue);
log(`Monitoring toggle button set to: ${configValue}`, 'INFO');
} else {
console.log('Button instance not found in registry - button should have been created on DOM ready');
}
} else {
console.log('Monitoring config not found in config data');
}
console.log('=== MONITORING IS SUBSCRIPTION-BASED ===');
console.log('No toggle buttons needed - monitoring activates automatically when subscribing to kind 24567');
log('Monitoring system initialized - subscription-based activation ready', 'INFO');
}
// Initialize toggle button after DOM is ready

View File

@@ -18,6 +18,7 @@ class ASCIIBarChart {
* @param {boolean} [options.useBinMode=false] - Enable time bin mode for data aggregation
* @param {number} [options.binDuration=10000] - Duration of each time bin in milliseconds (10 seconds default)
* @param {string} [options.xAxisLabelFormat='elapsed'] - X-axis label format: 'elapsed', 'bins', 'timestamps', 'ranges'
* @param {boolean} [options.debug=false] - Enable debug logging
*/
constructor(containerId, options = {}) {
this.container = document.getElementById(containerId);
@@ -29,6 +30,7 @@ class ASCIIBarChart {
this.xAxisLabel = options.xAxisLabel || '';
this.yAxisLabel = options.yAxisLabel || '';
this.autoFitWidth = options.autoFitWidth !== false; // Default to true
this.debug = options.debug || false; // Debug logging option
// Time bin configuration
this.useBinMode = options.useBinMode !== false; // Default to true
@@ -61,21 +63,10 @@ class ASCIIBarChart {
* @param {number} value - The numeric value to add
*/
addValue(value) {
if (this.useBinMode) {
// Time bin mode: increment count in current active bin
// Time bin mode: add value to current active bin count
this.checkBinRotation(); // Ensure we have an active bin
this.bins[this.currentBinIndex].count++;
this.bins[this.currentBinIndex].count += value; // Changed from ++ to += value
this.totalDataPoints++;
} else {
// Legacy mode: add individual values
this.data.push(value);
this.totalDataPoints++;
// Keep only the most recent data points
if (this.data.length > this.maxDataPoints) {
this.data.shift();
}
}
this.render();
this.updateInfo();
@@ -119,7 +110,7 @@ class ASCIIBarChart {
const totalWidth = yAxisPadding + yAxisNumbers + separator + dataWidth + padding;
// Only log when width changes
if (this.lastChartWidth !== totalWidth) {
if (this.debug && this.lastChartWidth !== totalWidth) {
console.log('getChartWidth changed:', { dataLength, totalWidth, previous: this.lastChartWidth });
this.lastChartWidth = totalWidth;
}
@@ -142,7 +133,7 @@ class ASCIIBarChart {
// Calculate optimal font size
// For monospace fonts, character width is approximately 0.6 * font size
// Use a slightly smaller ratio to fit more content
const charWidthRatio = 0.6;
const charWidthRatio = 0.7;
const padding = 30; // Reduce padding to fit more content
const availableWidth = containerWidth - padding;
const optimalFontSize = Math.floor((availableWidth / chartWidth) / charWidthRatio);
@@ -151,7 +142,7 @@ class ASCIIBarChart {
const fontSize = Math.max(4, Math.min(20, optimalFontSize));
// Only log when font size changes
if (this.lastFontSize !== fontSize) {
if (this.debug && this.lastFontSize !== fontSize) {
console.log('fontSize changed:', { containerWidth, chartWidth, fontSize, previous: this.lastFontSize });
this.lastFontSize = fontSize;
}
@@ -190,7 +181,9 @@ class ASCIIBarChart {
}
});
if (this.debug) {
console.log('render() dataToRender:', dataToRender, 'bins length:', this.bins.length);
}
maxValue = Math.max(...dataToRender);
minValue = Math.min(...dataToRender);
valueRange = maxValue - minValue;
@@ -243,8 +236,8 @@ class ASCIIBarChart {
}
}
// Calculate the actual count value this row represents (0 at bottom, increasing upward)
const rowCount = (row - 1) * scaleFactor;
// Calculate the actual count value this row represents (1 at bottom, increasing upward)
const rowCount = (row - 1) * scaleFactor + 1;
// Add Y-axis label (show actual count values)
line += String(rowCount).padStart(3, ' ') + ' |';

1445
debug.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,298 @@
# Libwebsockets Proper Pattern - Message Queue Design
## Problem Analysis
### Current Violation
We're calling `lws_write()` directly from multiple code paths:
1. **Event broadcast** (subscriptions.c:667) - when events arrive
2. **OK responses** (websockets.c:855) - when processing EVENT messages
3. **EOSE responses** (websockets.c:976) - when processing REQ messages
4. **COUNT responses** (websockets.c:1922) - when processing COUNT messages
This violates libwebsockets' design pattern which requires:
- **`lws_write()` ONLY called from `LWS_CALLBACK_SERVER_WRITEABLE`**
- Application queues messages and requests writeable callback
- Libwebsockets handles write timing and socket buffer management
### Consequences of Violation
1. Partial writes when socket buffer is full
2. Multiple concurrent write attempts before callback fires
3. "write already pending" errors with single buffer
4. Frame corruption from interleaved partial writes
5. "Invalid frame header" errors on client side
## Correct Architecture
### Message Queue Pattern
```
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────────────────────────┤
│ │
│ Event Arrives → Queue Message → Request Writeable Callback │
│ REQ Received → Queue EOSE → Request Writeable Callback │
│ EVENT Received→ Queue OK → Request Writeable Callback │
│ COUNT Received→ Queue COUNT → Request Writeable Callback │
│ │
└─────────────────────────────────────────────────────────────┘
lws_callback_on_writable(wsi)
┌─────────────────────────────────────────────────────────────┐
│ LWS_CALLBACK_SERVER_WRITEABLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Dequeue next message from queue │
│ 2. Call lws_write() with message data │
│ 3. If queue not empty, request another callback │
│ │
└─────────────────────────────────────────────────────────────┘
libwebsockets handles:
- Socket buffer management
- Partial write handling
- Frame atomicity
```
## Data Structures
### Message Queue Node
```c
typedef struct message_queue_node {
unsigned char* data; // Message data (with LWS_PRE space)
size_t length; // Message length (without LWS_PRE)
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
struct message_queue_node* next;
} message_queue_node_t;
```
### Per-Session Data Updates
```c
struct per_session_data {
// ... existing fields ...
// Message queue (replaces single buffer)
message_queue_node_t* message_queue_head;
message_queue_node_t* message_queue_tail;
int message_queue_count;
int writeable_requested; // Flag to prevent duplicate requests
};
```
## Implementation Functions
### 1. Queue Message (Application Layer)
```c
int queue_message(struct lws* wsi, struct per_session_data* pss,
const char* message, size_t length,
enum lws_write_protocol type)
{
// Allocate node
message_queue_node_t* node = malloc(sizeof(message_queue_node_t));
// Allocate buffer with LWS_PRE space
node->data = malloc(LWS_PRE + length);
memcpy(node->data + LWS_PRE, message, length);
node->length = length;
node->type = type;
node->next = NULL;
// Add to queue (FIFO)
pthread_mutex_lock(&pss->session_lock);
if (!pss->message_queue_head) {
pss->message_queue_head = node;
pss->message_queue_tail = node;
} else {
pss->message_queue_tail->next = node;
pss->message_queue_tail = node;
}
pss->message_queue_count++;
pthread_mutex_unlock(&pss->session_lock);
// Request writeable callback (only if not already requested)
if (!pss->writeable_requested) {
pss->writeable_requested = 1;
lws_callback_on_writable(wsi);
}
return 0;
}
```
### 2. Process Queue (Writeable Callback)
```c
int process_message_queue(struct lws* wsi, struct per_session_data* pss)
{
pthread_mutex_lock(&pss->session_lock);
// Get next message from queue
message_queue_node_t* node = pss->message_queue_head;
if (!node) {
pss->writeable_requested = 0;
pthread_mutex_unlock(&pss->session_lock);
return 0; // Queue empty
}
// Remove from queue
pss->message_queue_head = node->next;
if (!pss->message_queue_head) {
pss->message_queue_tail = NULL;
}
pss->message_queue_count--;
pthread_mutex_unlock(&pss->session_lock);
// Write message (libwebsockets handles partial writes)
int result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
// Free node
free(node->data);
free(node);
// If queue not empty, request another callback
pthread_mutex_lock(&pss->session_lock);
if (pss->message_queue_head) {
lws_callback_on_writable(wsi);
} else {
pss->writeable_requested = 0;
}
pthread_mutex_unlock(&pss->session_lock);
return (result < 0) ? -1 : 0;
}
```
## Refactoring Changes
### Before (WRONG - Direct Write)
```c
// websockets.c:855 - OK response
int write_result = lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
if (write_result < 0) {
DEBUG_ERROR("Write failed");
} else if ((size_t)write_result != response_len) {
// Partial write - queue remaining data
queue_websocket_write(wsi, pss, ...);
}
```
### After (CORRECT - Queue Message)
```c
// websockets.c:855 - OK response
queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT);
// That's it! Writeable callback will handle the actual write
```
### Before (WRONG - Direct Write in Broadcast)
```c
// subscriptions.c:667 - EVENT broadcast
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result < 0) {
DEBUG_ERROR("Write failed");
} else if ((size_t)write_result != msg_len) {
queue_websocket_write(...);
}
```
### After (CORRECT - Queue Message)
```c
// subscriptions.c:667 - EVENT broadcast
struct per_session_data* pss = lws_wsi_user(current_temp->wsi);
queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT);
// Writeable callback will handle the actual write
```
## Benefits of Correct Pattern
1. **No Partial Write Handling Needed**
- Libwebsockets handles partial writes internally
- We just queue complete messages
2. **No "Write Already Pending" Errors**
- Queue can hold unlimited messages
- Each processed sequentially from callback
3. **Thread Safety**
- Queue operations protected by session lock
- Write only from single callback thread
4. **Frame Atomicity**
- Libwebsockets ensures complete frame transmission
- No interleaved partial writes
5. **Simpler Code**
- No complex partial write state machine
- Just queue and forget
6. **Better Performance**
- Libwebsockets optimizes write timing
- Batches writes when socket ready
## Migration Steps
1. ✅ Identify all `lws_write()` call sites
2. ✅ Confirm violation of libwebsockets pattern
3. ⏳ Design message queue structure
4. ⏳ Implement `queue_message()` function
5. ⏳ Implement `process_message_queue()` function
6. ⏳ Update `per_session_data` structure
7. ⏳ Refactor OK response to use queue
8. ⏳ Refactor EOSE response to use queue
9. ⏳ Refactor COUNT response to use queue
10. ⏳ Refactor EVENT broadcast to use queue
11. ⏳ Update `LWS_CALLBACK_SERVER_WRITEABLE` handler
12. ⏳ Add queue cleanup in `LWS_CALLBACK_CLOSED`
13. ⏳ Remove old partial write code
14. ⏳ Test with rapid multiple events
15. ⏳ Test with large events (>4KB)
16. ⏳ Test under load
17. ⏳ Verify no frame errors
## Testing Strategy
### Test 1: Multiple Rapid Events
```bash
# Send 10 events rapidly to same client
for i in {1..10}; do
echo '["EVENT",{"kind":1,"content":"test'$i'","created_at":'$(date +%s)',...}]' | \
websocat ws://localhost:8888 &
done
```
**Expected**: All events queued and sent sequentially, no errors
### Test 2: Large Events
```bash
# Send event >4KB (forces multiple socket writes)
nak event --content "$(head -c 5000 /dev/urandom | base64)" | \
websocat ws://localhost:8888
```
**Expected**: Event queued, libwebsockets handles partial writes internally
### Test 3: Concurrent Connections
```bash
# 100 concurrent connections, each sending events
for i in {1..100}; do
(echo '["REQ","sub'$i'",{}]'; sleep 1) | websocat ws://localhost:8888 &
done
```
**Expected**: All subscriptions work, events broadcast correctly
## Success Criteria
- ✅ No `lws_write()` calls outside `LWS_CALLBACK_SERVER_WRITEABLE`
- ✅ No "write already pending" errors in logs
- ✅ No "Invalid frame header" errors on client side
- ✅ All messages delivered in correct order
- ✅ Large events (>4KB) handled correctly
- ✅ Multiple rapid events to same client work
- ✅ Concurrent connections stable under load
## References
- [libwebsockets documentation](https://libwebsockets.org/lws-api-doc-main/html/index.html)
- [LWS_CALLBACK_SERVER_WRITEABLE](https://libwebsockets.org/lws-api-doc-main/html/group__callback-when-writeable.html)
- [lws_callback_on_writable()](https://libwebsockets.org/lws-api-doc-main/html/group__callback-when-writeable.html#ga96f3ad8e1e2c3e0c8e0b0e5e5e5e5e5e)

View File

@@ -0,0 +1,200 @@
# WebSocket Write Queue Design
## Problem Statement
The current partial write handling implementation uses a single buffer per session, which fails when multiple events need to be sent to the same client in rapid succession. This causes:
1. First event gets partial write → queued successfully
2. Second event tries to write → **FAILS** with "write already pending"
3. Subsequent events fail similarly, causing data loss
### Server Log Evidence
```
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
[TRACE] Queued partial write: len=2123
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
[WARN] queue_websocket_write: write already pending, cannot queue new write
[ERROR] Failed to queue partial EVENT write for sub=1
```
## Root Cause
WebSocket frames must be sent **atomically** - you cannot interleave multiple frames. The current single-buffer approach correctly enforces this, but it rejects new writes instead of queuing them.
## Solution: Write Queue Architecture
### Design Principles
1. **Frame Atomicity**: Complete one WebSocket frame before starting the next
2. **Sequential Processing**: Process queued writes in FIFO order
3. **Memory Safety**: Proper cleanup on connection close or errors
4. **Thread Safety**: Protect queue operations with existing session lock
### Data Structures
#### Write Queue Node
```c
struct write_queue_node {
unsigned char* buffer; // Buffer with LWS_PRE space
size_t total_len; // Total length of data to write
size_t offset; // How much has been written so far
int write_type; // LWS_WRITE_TEXT, etc.
struct write_queue_node* next; // Next node in queue
};
```
#### Per-Session Write Queue
```c
struct per_session_data {
// ... existing fields ...
// Write queue for handling multiple pending writes
struct write_queue_node* write_queue_head; // First item to write
struct write_queue_node* write_queue_tail; // Last item in queue
int write_queue_length; // Number of items in queue
int write_in_progress; // Flag: 1 if currently writing
};
```
### Algorithm Flow
#### 1. Enqueue Write (`queue_websocket_write`)
```
IF write_queue is empty AND no write in progress:
- Attempt immediate write with lws_write()
- IF complete:
- Return success
- ELSE (partial write):
- Create queue node with remaining data
- Add to queue
- Set write_in_progress flag
- Request LWS_CALLBACK_SERVER_WRITEABLE
ELSE:
- Create queue node with full data
- Append to queue tail
- IF no write in progress:
- Request LWS_CALLBACK_SERVER_WRITEABLE
```
#### 2. Process Queue (`process_pending_write`)
```
WHILE write_queue is not empty:
- Get head node
- Calculate remaining data (total_len - offset)
- Attempt write with lws_write()
IF write fails (< 0):
- Log error
- Remove and free head node
- Continue to next node
ELSE IF partial write (< remaining):
- Update offset
- Request LWS_CALLBACK_SERVER_WRITEABLE
- Break (wait for next callback)
ELSE (complete write):
- Remove and free head node
- Continue to next node
IF queue is empty:
- Clear write_in_progress flag
```
#### 3. Cleanup (`LWS_CALLBACK_CLOSED`)
```
WHILE write_queue is not empty:
- Get head node
- Free buffer
- Free node
- Move to next
Clear queue pointers
```
### Memory Management
1. **Allocation**: Each queue node allocates buffer with `LWS_PRE + data_len`
2. **Ownership**: Queue owns all buffers until write completes or connection closes
3. **Deallocation**: Free buffer and node when:
- Write completes successfully
- Write fails with error
- Connection closes
### Thread Safety
- Use existing `pss->session_lock` to protect queue operations
- Lock during:
- Enqueue operations
- Dequeue operations
- Queue traversal for cleanup
### Performance Considerations
1. **Queue Length Limit**: Implement max queue length (e.g., 100 items) to prevent memory exhaustion
2. **Memory Pressure**: Monitor total queued bytes per session
3. **Backpressure**: If queue exceeds limit, close connection with NOTICE
### Error Handling
1. **Allocation Failure**: Return error, log, send NOTICE to client
2. **Write Failure**: Remove failed frame, continue with next
3. **Queue Overflow**: Close connection with appropriate NOTICE
## Implementation Plan
### Phase 1: Data Structure Changes
1. Add `write_queue_node` structure to `websockets.h`
2. Update `per_session_data` with queue fields
3. Remove old single-buffer fields
### Phase 2: Queue Operations
1. Implement `enqueue_write()` helper
2. Implement `dequeue_write()` helper
3. Update `queue_websocket_write()` to use queue
4. Update `process_pending_write()` to process queue
### Phase 3: Integration
1. Update all `lws_write()` call sites
2. Update `LWS_CALLBACK_CLOSED` cleanup
3. Add queue length monitoring
### Phase 4: Testing
1. Test with rapid multiple events to same client
2. Test with large events (>4KB)
3. Test under load with concurrent connections
4. Verify no "Invalid frame header" errors
## Expected Outcomes
1. **No More Rejections**: All writes queued successfully
2. **Frame Integrity**: Complete frames sent atomically
3. **Memory Safety**: Proper cleanup on all paths
4. **Performance**: Minimal overhead for queue management
## Metrics to Monitor
1. Average queue length per session
2. Maximum queue length observed
3. Queue overflow events (if limit implemented)
4. Write completion rate
5. Partial write frequency
## Alternative Approaches Considered
### 1. Larger Single Buffer
**Rejected**: Doesn't solve the fundamental problem of multiple concurrent writes
### 2. Immediate Write Retry
**Rejected**: Could cause busy-waiting and CPU waste
### 3. Drop Frames on Conflict
**Rejected**: Violates reliability requirements
## References
- libwebsockets documentation on `lws_write()` and `LWS_CALLBACK_SERVER_WRITEABLE`
- WebSocket RFC 6455 on frame structure
- Nostr NIP-01 on relay-to-client communication

View File

@@ -39,6 +39,11 @@ Even simpler: Use this one-liner
cd /usr/local/bin/c_relay
sudo -u c-relay ./c_relay --debug-level=5 & sleep 2 && sudo gdb -p $(pgrep c_relay)
Inside gdb, after attaching:
(gdb) continue
Or shorter:
(gdb) c
How to View the Logs
@@ -76,3 +81,7 @@ sudo systemctl status rsyslog
sudo -u c-relay ./c_relay --debug-level=5 -r 85d0b37e2ae822966dcadd06b2dc9368cde73865f90ea4d44f8b57d47ef0820a -a 1ec454734dcbf6fe54901ce25c0c7c6bca5edd89443416761fadc321d38df139
./c_relay_static_x86_64 -p 7889 --debug-level=5 -r 85d0b37e2ae822966dcadd06b2dc9368cde73865f90ea4d44f8b57d47ef0820a -a 1ec454734dcbf6fe54901ce25c0c7c6bca5edd89443416761fadc321d38df139
sudo ufw allow 8888/tcp
sudo ufw delete allow 8888/tcp

View File

@@ -1 +1 @@
3579819
3615986

269
src/api.c
View File

@@ -40,28 +40,17 @@ const char* get_config_value(const char* key);
int get_config_bool(const char* key, int default_value);
int update_config_in_table(const char* key, const char* value);
// Monitoring system state
static time_t last_report_time = 0;
// Monitoring system state (throttling now handled per-function)
// Forward declaration for monitoring helper function
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void));
// Forward declaration for CPU metrics query function
cJSON* query_cpu_metrics(void);
// Monitoring system helper functions
int is_monitoring_enabled(void) {
return get_config_bool("kind_34567_reporting_enabled", 0);
}
int get_monitoring_throttle_seconds(void) {
return get_config_int("kind_34567_reporting_throttling_sec", 5);
}
int set_monitoring_enabled(int enabled) {
const char* value = enabled ? "1" : "0";
if (update_config_in_table("kind_34567_reporting_enabled", value) == 0) {
DEBUG_INFO("Monitoring enabled state changed");
return 0;
}
return -1;
return get_config_int("kind_24567_reporting_throttle_sec", 5);
}
// Query event kind distribution from database
@@ -250,7 +239,7 @@ cJSON* query_active_subscriptions(void) {
const char* sql =
"SELECT COUNT(*) as total_subs, "
"COUNT(DISTINCT client_ip) as client_count "
"FROM subscription_events "
"FROM subscriptions "
"WHERE event_type = 'created' AND ended_at IS NULL";
if (sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL) != SQLITE_OK) {
@@ -272,7 +261,7 @@ cJSON* query_active_subscriptions(void) {
const char* max_sql =
"SELECT MAX(sub_count) FROM ("
" SELECT COUNT(*) as sub_count "
" FROM subscription_events "
" FROM subscriptions "
" WHERE event_type = 'created' AND ended_at IS NULL "
" GROUP BY client_ip"
")";
@@ -308,7 +297,7 @@ cJSON* query_active_subscriptions(void) {
}
// Query detailed subscription information from database log (ADMIN ONLY)
// Uses subscription_events table instead of in-memory iteration to avoid mutex contention
// Uses subscriptions table instead of in-memory iteration to avoid mutex contention
cJSON* query_subscription_details(void) {
extern sqlite3* g_db;
if (!g_db) {
@@ -316,13 +305,13 @@ cJSON* query_subscription_details(void) {
return NULL;
}
// Query active subscriptions directly from subscription_events table
// Query active subscriptions directly from subscriptions table
// Get subscriptions that were created but not yet closed/expired/disconnected
sqlite3_stmt* stmt;
const char* sql =
"SELECT subscription_id, client_ip, filter_json, events_sent, "
"SELECT subscription_id, client_ip, wsi_pointer, filter_json, events_sent, "
"created_at, (strftime('%s', 'now') - created_at) as duration_seconds "
"FROM subscription_events "
"FROM subscriptions "
"WHERE event_type = 'created' AND ended_at IS NULL "
"ORDER BY created_at DESC LIMIT 100";
@@ -346,14 +335,16 @@ cJSON* query_subscription_details(void) {
// Extract subscription data from database
const char* sub_id = (const char*)sqlite3_column_text(stmt, 0);
const char* client_ip = (const char*)sqlite3_column_text(stmt, 1);
const char* filter_json = (const char*)sqlite3_column_text(stmt, 2);
long long events_sent = sqlite3_column_int64(stmt, 3);
long long created_at = sqlite3_column_int64(stmt, 4);
long long duration_seconds = sqlite3_column_int64(stmt, 5);
const char* wsi_pointer = (const char*)sqlite3_column_text(stmt, 2);
const char* filter_json = (const char*)sqlite3_column_text(stmt, 3);
long long events_sent = sqlite3_column_int64(stmt, 4);
long long created_at = sqlite3_column_int64(stmt, 5);
long long duration_seconds = sqlite3_column_int64(stmt, 6);
// Add basic subscription info
cJSON_AddStringToObject(sub_obj, "id", sub_id ? sub_id : "");
cJSON_AddStringToObject(sub_obj, "client_ip", client_ip ? client_ip : "");
cJSON_AddStringToObject(sub_obj, "wsi_pointer", wsi_pointer ? wsi_pointer : "");
cJSON_AddNumberToObject(sub_obj, "created_at", (double)created_at);
cJSON_AddNumberToObject(sub_obj, "duration_seconds", (double)duration_seconds);
cJSON_AddNumberToObject(sub_obj, "events_sent", events_sent);
@@ -386,8 +377,8 @@ cJSON* query_subscription_details(void) {
return subscriptions_data;
}
// Generate and broadcast monitoring event
int generate_monitoring_event(void) {
// Generate event-driven monitoring events (triggered by event storage)
int generate_event_driven_monitoring(void) {
// Generate event_kinds monitoring event
if (generate_monitoring_event_for_type("event_kinds", query_event_kind_distribution) != 0) {
DEBUG_ERROR("Failed to generate event_kinds monitoring event");
@@ -412,16 +403,45 @@ int generate_monitoring_event(void) {
return -1;
}
// Generate CPU metrics monitoring event (also triggered by event storage)
if (generate_monitoring_event_for_type("cpu_metrics", query_cpu_metrics) != 0) {
DEBUG_ERROR("Failed to generate cpu_metrics monitoring event");
return -1;
}
DEBUG_INFO("Generated and broadcast event-driven monitoring events");
return 0;
}
// Generate subscription-driven monitoring events (triggered by subscription changes)
int generate_subscription_driven_monitoring(void) {
// Generate active_subscriptions monitoring event (subscription changes affect this)
if (generate_monitoring_event_for_type("active_subscriptions", query_active_subscriptions) != 0) {
DEBUG_ERROR("Failed to generate active_subscriptions monitoring event");
return -1;
}
// Generate subscription_details monitoring event (admin-only)
if (generate_monitoring_event_for_type("subscription_details", query_subscription_details) != 0) {
DEBUG_ERROR("Failed to generate subscription_details monitoring event");
return -1;
}
DEBUG_INFO("Generated and broadcast all monitoring events");
// Generate CPU metrics monitoring event (also triggered by subscription changes)
if (generate_monitoring_event_for_type("cpu_metrics", query_cpu_metrics) != 0) {
DEBUG_ERROR("Failed to generate cpu_metrics monitoring event");
return -1;
}
DEBUG_INFO("Generated and broadcast subscription-driven monitoring events");
return 0;
}
// Generate and broadcast monitoring event (legacy function - now calls event-driven version)
int generate_monitoring_event(void) {
return generate_event_driven_monitoring();
}
// Helper function to generate monitoring event for a specific type
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void)) {
// Query the monitoring data
@@ -459,12 +479,12 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
}
free(relay_privkey_hex);
// Create monitoring event (kind 34567)
// Create monitoring event (kind 24567 - ephemeral)
cJSON* monitoring_event = cJSON_CreateObject();
cJSON_AddStringToObject(monitoring_event, "id", ""); // Will be set by signing
cJSON_AddStringToObject(monitoring_event, "pubkey", relay_pubkey);
cJSON_AddNumberToObject(monitoring_event, "created_at", (double)time(NULL));
cJSON_AddNumberToObject(monitoring_event, "kind", 34567);
cJSON_AddNumberToObject(monitoring_event, "kind", 24567);
cJSON_AddStringToObject(monitoring_event, "content", content_json);
// Create tags array with d tag for identification
@@ -480,7 +500,7 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
// Use the library function to create and sign the event
cJSON* signed_event = nostr_create_and_sign_event(
34567, // kind
24567, // kind (ephemeral)
cJSON_GetStringValue(cJSON_GetObjectItem(monitoring_event, "content")), // content
tags, // tags
relay_privkey, // private key
@@ -498,55 +518,58 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
cJSON_Delete(monitoring_event);
monitoring_event = signed_event;
// Broadcast the event to active subscriptions
// Broadcast the ephemeral event to active subscriptions (no database storage)
broadcast_event_to_subscriptions(monitoring_event);
// Store in database
int store_result = store_event(monitoring_event);
cJSON_Delete(monitoring_event);
free(content_json);
if (store_result != 0) {
DEBUG_ERROR("Failed to store monitoring event (%s)", d_tag_value);
return -1;
}
DEBUG_LOG("Monitoring event broadcast (ephemeral kind 24567, type: %s)", d_tag_value);
return 0;
}
// Monitoring hook called when an event is stored
void monitoring_on_event_stored(void) {
// Check if monitoring is enabled
if (!is_monitoring_enabled()) {
// Check throttling first (cheapest check)
static time_t last_monitoring_time = 0;
time_t current_time = time(NULL);
int throttle_seconds = get_monitoring_throttle_seconds();
if (current_time - last_monitoring_time < throttle_seconds) {
return;
}
// Check throttling
time_t now = time(NULL);
// Check if anyone is subscribed to monitoring events (kind 24567)
// This is the ONLY activation check needed - if someone subscribes, they want monitoring
if (!has_subscriptions_for_kind(24567)) {
return; // No subscribers = no expensive operations
}
// Generate event-driven monitoring events only when someone is listening
last_monitoring_time = current_time;
generate_event_driven_monitoring();
}
// Monitoring hook called when subscriptions change (create/close)
void monitoring_on_subscription_change(void) {
// Check throttling first (cheapest check)
static time_t last_monitoring_time = 0;
time_t current_time = time(NULL);
int throttle_seconds = get_monitoring_throttle_seconds();
if (now - last_report_time < throttle_seconds) {
return; // Too soon, skip this report
if (current_time - last_monitoring_time < throttle_seconds) {
return;
}
// Generate and broadcast monitoring event
if (generate_monitoring_event() == 0) {
last_report_time = now;
// Check if anyone is subscribed to monitoring events (kind 24567)
// This is the ONLY activation check needed - if someone subscribes, they want monitoring
if (!has_subscriptions_for_kind(24567)) {
return; // No subscribers = no expensive operations
}
}
// Initialize monitoring system
int init_monitoring_system(void) {
last_report_time = 0;
DEBUG_INFO("Monitoring system initialized");
return 0;
}
// Cleanup monitoring system
void cleanup_monitoring_system(void) {
// No cleanup needed for monitoring system
DEBUG_INFO("Monitoring system cleaned up");
// Generate subscription-driven monitoring events only when someone is listening
last_monitoring_time = current_time;
generate_subscription_driven_monitoring();
}
// Forward declaration for known_configs (defined in config.c)
@@ -1138,6 +1161,68 @@ int handle_embedded_file_writeable(struct lws* wsi) {
return 0;
}
// Query CPU usage metrics
cJSON* query_cpu_metrics(void) {
cJSON* cpu_stats = cJSON_CreateObject();
cJSON_AddStringToObject(cpu_stats, "data_type", "cpu_metrics");
cJSON_AddNumberToObject(cpu_stats, "timestamp", (double)time(NULL));
// Read process CPU times from /proc/self/stat
FILE* proc_stat = fopen("/proc/self/stat", "r");
if (proc_stat) {
unsigned long utime, stime; // user and system CPU time in clock ticks
if (fscanf(proc_stat, "%*d %*s %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %lu %lu", &utime, &stime) == 2) {
unsigned long total_proc_time = utime + stime;
// Get system CPU times from /proc/stat
FILE* sys_stat = fopen("/proc/stat", "r");
if (sys_stat) {
unsigned long user, nice, system, idle, iowait, irq, softirq;
if (fscanf(sys_stat, "cpu %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &system, &idle, &iowait, &irq, &softirq) == 7) {
unsigned long total_sys_time = user + nice + system + idle + iowait + irq + softirq;
// Calculate CPU percentages (simplified - would need deltas for accuracy)
// For now, just store the raw values - frontend can calculate deltas
cJSON_AddNumberToObject(cpu_stats, "process_cpu_time", (double)total_proc_time);
cJSON_AddNumberToObject(cpu_stats, "system_cpu_time", (double)total_sys_time);
cJSON_AddNumberToObject(cpu_stats, "system_idle_time", (double)idle);
}
fclose(sys_stat);
}
// Get current CPU core the process is running on
int current_core = sched_getcpu();
if (current_core >= 0) {
cJSON_AddNumberToObject(cpu_stats, "current_cpu_core", current_core);
}
}
fclose(proc_stat);
}
// Get process ID
pid_t pid = getpid();
cJSON_AddNumberToObject(cpu_stats, "process_id", (double)pid);
// Get memory usage from /proc/self/status
FILE* mem_stat = fopen("/proc/self/status", "r");
if (mem_stat) {
char line[256];
while (fgets(line, sizeof(line), mem_stat)) {
if (strncmp(line, "VmRSS:", 6) == 0) {
unsigned long rss_kb;
if (sscanf(line, "VmRSS: %lu kB", &rss_kb) == 1) {
double rss_mb = rss_kb / 1024.0;
cJSON_AddNumberToObject(cpu_stats, "memory_usage_mb", rss_mb);
}
break;
}
}
fclose(mem_stat);
}
return cpu_stats;
}
// Generate stats JSON from database queries
char* generate_stats_json(void) {
extern sqlite3* g_db;
@@ -2265,24 +2350,8 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
if (*p >= 'A' && *p <= 'Z') *p = *p + 32;
}
// Handle commands
if (strcmp(cmd, "enable_monitoring") == 0) {
if (set_monitoring_enabled(1) == 0) {
char* response_content = "✅ Monitoring enabled\n\nReal-time monitoring events will now be generated.";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to enable monitoring";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "disable_monitoring") == 0) {
if (set_monitoring_enabled(0) == 0) {
char* response_content = "✅ Monitoring disabled\n\nReal-time monitoring events will no longer be generated.";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to disable monitoring";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "set_monitoring_throttle") == 0) {
// Handle set_monitoring_throttle command (only remaining monitoring command)
if (strcmp(cmd, "set_monitoring_throttle") == 0) {
if (arg[0] == '\0') {
char* response_content = "❌ Missing throttle value\n\nUsage: set_monitoring_throttle <seconds>";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
@@ -2298,44 +2367,28 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
char throttle_str[16];
snprintf(throttle_str, sizeof(throttle_str), "%ld", throttle_seconds);
if (update_config_in_table("kind_34567_reporting_throttling_sec", throttle_str) == 0) {
if (update_config_in_table("kind_24567_reporting_throttle_sec", throttle_str) == 0) {
char response_content[256];
snprintf(response_content, sizeof(response_content),
"✅ Monitoring throttle updated\n\nMinimum interval between monitoring events: %ld seconds", throttle_seconds);
"✅ Monitoring throttle updated\n\n"
"Minimum interval between monitoring events: %ld seconds\n\n"
" Monitoring activates automatically when you subscribe to kind 24567 events.",
throttle_seconds);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to update monitoring throttle";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "monitoring_status") == 0) {
int enabled = is_monitoring_enabled();
int throttle = get_monitoring_throttle_seconds();
char response_content[512];
snprintf(response_content, sizeof(response_content),
"📊 Monitoring Status\n"
"━━━━━━━━━━━━━━━━━━━━\n"
"\n"
"Enabled: %s\n"
"Throttle: %d seconds\n"
"\n"
"Commands:\n"
"• enable_monitoring\n"
"• disable_monitoring\n"
"• set_monitoring_throttle <seconds>\n"
"• monitoring_status",
enabled ? "Yes" : "No", throttle);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char response_content[256];
char response_content[1024];
snprintf(response_content, sizeof(response_content),
"❌ Unknown monitoring command: %s\n\n"
"Available commands:\n"
"enable_monitoring\n"
"• disable_monitoring\n"
"• set_monitoring_throttle <seconds>\n"
"• monitoring_status", cmd);
"Available command:\n"
"set_monitoring_throttle <seconds>\n\n"
" Monitoring is now subscription-based:\n"
"Subscribe to kind 24567 events to receive real-time monitoring data.\n"
"Monitoring automatically activates when subscriptions exist and deactivates when they close.",
cmd);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
}

View File

@@ -60,11 +60,8 @@ char* execute_sql_query(const char* query, const char* request_id, char* error_m
int handle_sql_query_unified(cJSON* event, const char* query, char* error_message, size_t error_size, struct lws* wsi);
// Monitoring system functions
int init_monitoring_system(void);
void cleanup_monitoring_system(void);
void monitoring_on_event_stored(void);
int set_monitoring_enabled(int enabled);
int is_monitoring_enabled(void);
void monitoring_on_subscription_change(void);
int get_monitoring_throttle_seconds(void);
#endif // API_H

View File

@@ -4112,32 +4112,18 @@ int populate_all_config_values_atomic(const char* admin_pubkey, const char* rela
return -1;
}
// Insert monitoring system config entries
// Insert monitoring system config entry (ephemeral kind 24567)
// Note: Monitoring is automatically activated when clients subscribe to kind 24567
sqlite3_reset(stmt);
sqlite3_bind_text(stmt, 1, "kind_34567_reporting_enabled", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, "false", -1, SQLITE_STATIC); // boolean, default false
sqlite3_bind_text(stmt, 3, "boolean", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Enable real-time monitoring event generation", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 5, "monitoring", -1, SQLITE_STATIC);
sqlite3_bind_int(stmt, 6, 0); // does not require restart
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
DEBUG_ERROR("Failed to insert kind_34567_reporting_enabled: %s", sqlite3_errmsg(g_db));
sqlite3_finalize(stmt);
sqlite3_exec(g_db, "ROLLBACK;", NULL, NULL, NULL);
return -1;
}
sqlite3_reset(stmt);
sqlite3_bind_text(stmt, 1, "kind_34567_reporting_throttling_sec", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 1, "kind_24567_reporting_throttle_sec", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, "5", -1, SQLITE_STATIC); // integer, default 5 seconds
sqlite3_bind_text(stmt, 3, "integer", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Minimum seconds between monitoring event reports", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Minimum seconds between monitoring event reports (ephemeral kind 24567)", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 5, "monitoring", -1, SQLITE_STATIC);
sqlite3_bind_int(stmt, 6, 0); // does not require restart
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
DEBUG_ERROR("Failed to insert kind_34567_reporting_throttling_sec: %s", sqlite3_errmsg(g_db));
DEBUG_ERROR("Failed to insert kind_24567_reporting_throttle_sec: %s", sqlite3_errmsg(g_db));
sqlite3_finalize(stmt);
sqlite3_exec(g_db, "ROLLBACK;", NULL, NULL, NULL);
return -1;

File diff suppressed because one or more lines are too long

View File

@@ -149,9 +149,7 @@ int mark_event_as_deleted(const char* event_id, const char* deletion_event_id, c
// Forward declaration for database functions
int store_event(cJSON* event);
// Forward declarations for monitoring system
void init_monitoring_system(void);
void cleanup_monitoring_system(void);
// Forward declaration for monitoring system
void monitoring_on_event_stored(void);
// Forward declarations for NIP-11 relay information handling
@@ -1989,9 +1987,6 @@ int main(int argc, char* argv[]) {
// Initialize NIP-40 expiration configuration
init_expiration_config();
// Initialize monitoring system
init_monitoring_system();
// Update subscription manager configuration
update_subscription_manager_config();
@@ -2023,9 +2018,6 @@ int main(int argc, char* argv[]) {
ginxsom_request_validator_cleanup();
cleanup_configuration_system();
// Cleanup monitoring system
cleanup_monitoring_system();
// Cleanup subscription manager mutexes
pthread_mutex_destroy(&g_subscription_manager.subscriptions_lock);
pthread_mutex_destroy(&g_subscription_manager.ip_tracking_lock);

View File

@@ -10,10 +10,10 @@
#define MAIN_H
// Version information (auto-updated by build system)
#define VERSION "v0.7.31"
#define VERSION "v0.7.35"
#define VERSION_MAJOR 0
#define VERSION_MINOR 7
#define VERSION_PATCH 31
#define VERSION_PATCH 35
// Relay metadata (authoritative source for NIP-11 information)
#define RELAY_NAME "C-Relay"

View File

@@ -1,12 +1,12 @@
/* Embedded SQL Schema for C Nostr Relay
* Generated from db/schema.sql - Do not edit manually
* Schema Version: 7
* Schema Version: 8
*/
#ifndef SQL_SCHEMA_H
#define SQL_SCHEMA_H
/* Schema version constant */
#define EMBEDDED_SCHEMA_VERSION "7"
#define EMBEDDED_SCHEMA_VERSION "8"
/* Embedded SQL schema as C string literal */
static const char* const EMBEDDED_SCHEMA_SQL =
@@ -15,7 +15,7 @@ static const char* const EMBEDDED_SCHEMA_SQL =
-- Configuration system using config table\n\
\n\
-- Schema version tracking\n\
PRAGMA user_version = 7;\n\
PRAGMA user_version = 8;\n\
\n\
-- Enable foreign key support\n\
PRAGMA foreign_keys = ON;\n\
@@ -58,8 +58,8 @@ CREATE TABLE schema_info (\n\
\n\
-- Insert schema metadata\n\
INSERT INTO schema_info (key, value) VALUES\n\
('version', '7'),\n\
('description', 'Hybrid Nostr relay schema with event-based and table-based configuration'),\n\
('version', '8'),\n\
('description', 'Hybrid Nostr relay schema with subscription deduplication support'),\n\
('created_at', strftime('%s', 'now'));\n\
\n\
-- Helper views for common queries\n\
@@ -181,17 +181,19 @@ END;\n\
-- Persistent Subscriptions Logging Tables (Phase 2)\n\
-- Optional database logging for subscription analytics and debugging\n\
\n\
-- Subscription events log\n\
CREATE TABLE subscription_events (\n\
-- Subscriptions log (renamed from subscription_events for clarity)\n\
CREATE TABLE subscriptions (\n\
id INTEGER PRIMARY KEY AUTOINCREMENT,\n\
subscription_id TEXT NOT NULL, -- Subscription ID from client\n\
wsi_pointer TEXT NOT NULL, -- WebSocket pointer address (hex string)\n\
client_ip TEXT NOT NULL, -- Client IP address\n\
event_type TEXT NOT NULL CHECK (event_type IN ('created', 'closed', 'expired', 'disconnected')),\n\
filter_json TEXT, -- JSON representation of filters (for created events)\n\
events_sent INTEGER DEFAULT 0, -- Number of events sent to this subscription\n\
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),\n\
ended_at INTEGER, -- When subscription ended (for closed/expired/disconnected)\n\
duration INTEGER -- Computed: ended_at - created_at\n\
duration INTEGER, -- Computed: ended_at - created_at\n\
UNIQUE(subscription_id, wsi_pointer) -- Prevent duplicate subscriptions per connection\n\
);\n\
\n\
-- Subscription metrics summary\n\
@@ -218,10 +220,11 @@ CREATE TABLE event_broadcasts (\n\
);\n\
\n\
-- Indexes for subscription logging performance\n\
CREATE INDEX idx_subscription_events_id ON subscription_events(subscription_id);\n\
CREATE INDEX idx_subscription_events_type ON subscription_events(event_type);\n\
CREATE INDEX idx_subscription_events_created ON subscription_events(created_at DESC);\n\
CREATE INDEX idx_subscription_events_client ON subscription_events(client_ip);\n\
CREATE INDEX idx_subscriptions_id ON subscriptions(subscription_id);\n\
CREATE INDEX idx_subscriptions_type ON subscriptions(event_type);\n\
CREATE INDEX idx_subscriptions_created ON subscriptions(created_at DESC);\n\
CREATE INDEX idx_subscriptions_client ON subscriptions(client_ip);\n\
CREATE INDEX idx_subscriptions_wsi ON subscriptions(wsi_pointer);\n\
\n\
CREATE INDEX idx_subscription_metrics_date ON subscription_metrics(date DESC);\n\
\n\
@@ -231,10 +234,10 @@ CREATE INDEX idx_event_broadcasts_time ON event_broadcasts(broadcast_at DESC);\n
\n\
-- Trigger to update subscription duration when ended\n\
CREATE TRIGGER update_subscription_duration\n\
AFTER UPDATE OF ended_at ON subscription_events\n\
AFTER UPDATE OF ended_at ON subscriptions\n\
WHEN NEW.ended_at IS NOT NULL AND OLD.ended_at IS NULL\n\
BEGIN\n\
UPDATE subscription_events\n\
UPDATE subscriptions\n\
SET duration = NEW.ended_at - NEW.created_at\n\
WHERE id = NEW.id;\n\
END;\n\
@@ -249,7 +252,7 @@ SELECT\n\
MAX(events_sent) as max_events_sent,\n\
AVG(events_sent) as avg_events_sent,\n\
COUNT(DISTINCT client_ip) as unique_clients\n\
FROM subscription_events\n\
FROM subscriptions\n\
GROUP BY date(created_at, 'unixepoch')\n\
ORDER BY date DESC;\n\
\n\
@@ -262,10 +265,10 @@ SELECT\n\
events_sent,\n\
created_at,\n\
(strftime('%s', 'now') - created_at) as duration_seconds\n\
FROM subscription_events\n\
FROM subscriptions\n\
WHERE event_type = 'created'\n\
AND subscription_id NOT IN (\n\
SELECT subscription_id FROM subscription_events\n\
SELECT subscription_id FROM subscriptions\n\
WHERE event_type IN ('closed', 'expired', 'disconnected')\n\
);\n\
\n\

View File

@@ -241,8 +241,31 @@ int add_subscription_to_manager(subscription_t* sub) {
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
// Check global limits
if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
// Check for existing subscription with same ID and WebSocket connection
// Remove it first to prevent duplicates (implements subscription replacement per NIP-01)
subscription_t** current = &g_subscription_manager.active_subscriptions;
int found_duplicate = 0;
subscription_t* duplicate_old = NULL;
while (*current) {
subscription_t* existing = *current;
// Match by subscription ID and WebSocket pointer
if (strcmp(existing->id, sub->id) == 0 && existing->wsi == sub->wsi) {
// Found duplicate: mark inactive and unlink from global list under lock
existing->active = 0;
*current = existing->next;
g_subscription_manager.total_subscriptions--;
found_duplicate = 1;
duplicate_old = existing; // defer free until after per-session unlink
break;
}
current = &(existing->next);
}
// Check global limits (only if not replacing an existing subscription)
if (!found_duplicate && g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
DEBUG_ERROR("Maximum total subscriptions reached");
return -1;
@@ -252,13 +275,44 @@ int add_subscription_to_manager(subscription_t* sub) {
sub->next = g_subscription_manager.active_subscriptions;
g_subscription_manager.active_subscriptions = sub;
g_subscription_manager.total_subscriptions++;
// Only increment total_created if this is a new subscription (not a replacement)
if (!found_duplicate) {
g_subscription_manager.total_created++;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Log subscription creation to database
// If we replaced an existing subscription, unlink it from the per-session list before freeing
if (duplicate_old) {
// Obtain per-session data for this wsi
struct per_session_data* pss = (struct per_session_data*) lws_wsi_user(duplicate_old->wsi);
if (pss) {
pthread_mutex_lock(&pss->session_lock);
struct subscription** scur = &pss->subscriptions;
while (*scur) {
if (*scur == duplicate_old) {
// Unlink by pointer identity to avoid removing the newly-added one
*scur = duplicate_old->session_next;
if (pss->subscription_count > 0) {
pss->subscription_count--;
}
break;
}
scur = &((*scur)->session_next);
}
pthread_mutex_unlock(&pss->session_lock);
}
// Now safe to free the old subscription
free_subscription(duplicate_old);
}
// Log subscription creation to database (INSERT OR REPLACE handles duplicates)
log_subscription_created(sub);
// Trigger monitoring update for subscription changes
monitoring_on_subscription_change();
return 0;
}
@@ -306,6 +360,9 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
// Update events sent counter before freeing
update_subscription_events_sent(sub_id_copy, events_sent_copy);
// Trigger monitoring update for subscription changes
monitoring_on_subscription_change();
free_subscription(sub);
return 0;
}
@@ -324,10 +381,7 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
// Check if an event matches a subscription filter
int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
DEBUG_TRACE("Checking event against subscription filter");
if (!event || !filter) {
DEBUG_TRACE("Exiting event_matches_filter - null parameters");
return 0;
}
@@ -503,7 +557,6 @@ int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
}
}
DEBUG_TRACE("Exiting event_matches_filter - match found");
return 1; // All filters passed
}
@@ -526,10 +579,7 @@ int event_matches_subscription(cJSON* event, subscription_t* subscription) {
// Broadcast event to all matching subscriptions (thread-safe)
int broadcast_event_to_subscriptions(cJSON* event) {
DEBUG_TRACE("Broadcasting event to subscriptions");
if (!event) {
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions - null event");
return 0;
}
@@ -611,10 +661,17 @@ int broadcast_event_to_subscriptions(cJSON* event) {
if (buf) {
memcpy(buf + LWS_PRE, msg_str, msg_len);
// Send to WebSocket connection with error checking
// Note: lws_write can fail if connection is closed, but won't crash
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result >= 0) {
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=EVENT sub=%s len=%zu data=%.100s%s",
current_temp->id,
msg_len,
msg_str,
msg_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
struct per_session_data* pss = (struct per_session_data*)lws_wsi_user(current_temp->wsi);
if (queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT) == 0) {
// Message queued successfully
broadcasts++;
// Update events sent counter for this subscription
@@ -636,6 +693,8 @@ int broadcast_event_to_subscriptions(cJSON* event) {
if (event_id_obj && cJSON_IsString(event_id_obj)) {
log_event_broadcast(cJSON_GetStringValue(event_id_obj), current_temp->id, current_temp->client_ip);
}
} else {
DEBUG_ERROR("Failed to queue EVENT message for sub=%s", current_temp->id);
}
free(buf);
@@ -660,10 +719,41 @@ int broadcast_event_to_subscriptions(cJSON* event) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
DEBUG_LOG("Event broadcast complete: %d subscriptions matched", broadcasts);
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions");
return broadcasts;
}
// Check if any active subscription exists for a specific event kind (thread-safe)
int has_subscriptions_for_kind(int event_kind) {
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* sub = g_subscription_manager.active_subscriptions;
while (sub) {
if (sub->active && sub->filters) {
subscription_filter_t* filter = sub->filters;
while (filter) {
// Check if this filter includes our event kind
if (filter->kinds && cJSON_IsArray(filter->kinds)) {
cJSON* kind_item = NULL;
cJSON_ArrayForEach(kind_item, filter->kinds) {
if (cJSON_IsNumber(kind_item)) {
int filter_kind = (int)cJSON_GetNumberValue(kind_item);
if (filter_kind == event_kind) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
return 1; // Found matching subscription
}
}
}
}
filter = filter->next;
}
}
sub = sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
return 0; // No matching subscriptions
}
/////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////
@@ -675,6 +765,10 @@ int broadcast_event_to_subscriptions(cJSON* event) {
void log_subscription_created(const subscription_t* sub) {
if (!g_db || !sub) return;
// Convert wsi pointer to string
char wsi_str[32];
snprintf(wsi_str, sizeof(wsi_str), "%p", (void*)sub->wsi);
// Create filter JSON for logging
char* filter_json = NULL;
if (sub->filters) {
@@ -721,16 +815,18 @@ void log_subscription_created(const subscription_t* sub) {
cJSON_Delete(filters_array);
}
// Use INSERT OR REPLACE to handle duplicates automatically
const char* sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) "
"VALUES (?, ?, 'created', ?)";
"INSERT OR REPLACE INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type, filter_json) "
"VALUES (?, ?, ?, 'created', ?)";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc == SQLITE_OK) {
sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 2, wsi_str, -1, SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 3, sub->client_ip, -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
@@ -745,8 +841,8 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
if (!g_db || !sub_id) return;
const char* sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
"VALUES (?, ?, 'closed')";
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
"VALUES (?, '', ?, 'closed')";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
@@ -760,7 +856,7 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
// Update the corresponding 'created' entry with end time and events sent
const char* update_sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET ended_at = strftime('%s', 'now') "
"WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL";
@@ -778,7 +874,7 @@ void log_subscription_disconnected(const char* client_ip) {
// Mark all active subscriptions for this client as disconnected
const char* sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET ended_at = strftime('%s', 'now') "
"WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL";
@@ -793,8 +889,8 @@ void log_subscription_disconnected(const char* client_ip) {
if (changes > 0) {
// Log a disconnection event
const char* insert_sql =
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
"VALUES ('disconnect', ?, 'disconnected')";
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
"VALUES ('disconnect', '', ?, 'disconnected')";
rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL);
if (rc == SQLITE_OK) {
@@ -831,7 +927,7 @@ void update_subscription_events_sent(const char* sub_id, int events_sent) {
if (!g_db || !sub_id) return;
const char* sql =
"UPDATE subscription_events "
"UPDATE subscriptions "
"SET events_sent = ? "
"WHERE subscription_id = ? AND event_type = 'created'";

View File

@@ -118,4 +118,7 @@ void log_subscription_disconnected(const char* client_ip);
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip);
void update_subscription_events_sent(const char* sub_id, int events_sent);
// Subscription query functions
int has_subscriptions_for_kind(int event_kind);
#endif // SUBSCRIPTIONS_H

View File

@@ -108,6 +108,136 @@ struct subscription_manager g_subscription_manager;
// Message queue functions for proper libwebsockets pattern
/**
* Queue a message for WebSocket writing following libwebsockets' proper pattern.
* This function adds messages to a per-session queue and requests writeable callback.
*
* @param wsi WebSocket instance
* @param pss Per-session data containing message queue
* @param message Message string to write
* @param length Length of message string
* @param type LWS_WRITE_* type (LWS_WRITE_TEXT, etc.)
* @return 0 on success, -1 on error
*/
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type) {
if (!wsi || !pss || !message || length == 0) {
DEBUG_ERROR("queue_message: invalid parameters");
return -1;
}
// Allocate message queue node
struct message_queue_node* node = malloc(sizeof(struct message_queue_node));
if (!node) {
DEBUG_ERROR("queue_message: failed to allocate queue node");
return -1;
}
// Allocate buffer with LWS_PRE space
size_t buffer_size = LWS_PRE + length;
unsigned char* buffer = malloc(buffer_size);
if (!buffer) {
DEBUG_ERROR("queue_message: failed to allocate message buffer");
free(node);
return -1;
}
// Copy message to buffer with LWS_PRE offset
memcpy(buffer + LWS_PRE, message, length);
// Initialize node
node->data = buffer;
node->length = length;
node->type = type;
node->next = NULL;
// Add to queue (thread-safe)
pthread_mutex_lock(&pss->session_lock);
if (!pss->message_queue_head) {
// Queue was empty
pss->message_queue_head = node;
pss->message_queue_tail = node;
} else {
// Add to end of queue
pss->message_queue_tail->next = node;
pss->message_queue_tail = node;
}
pss->message_queue_count++;
pthread_mutex_unlock(&pss->session_lock);
// Request writeable callback (only if not already requested)
if (!pss->writeable_requested) {
pss->writeable_requested = 1;
lws_callback_on_writable(wsi);
}
DEBUG_TRACE("Queued message: len=%zu, queue_count=%d", length, pss->message_queue_count);
return 0;
}
/**
* Process message queue when the socket becomes writeable.
* This function is called from LWS_CALLBACK_SERVER_WRITEABLE.
*
* @param wsi WebSocket instance
* @param pss Per-session data containing message queue
* @return 0 on success, -1 on error
*/
int process_message_queue(struct lws* wsi, struct per_session_data* pss) {
if (!wsi || !pss) {
DEBUG_ERROR("process_message_queue: invalid parameters");
return -1;
}
// Get next message from queue (thread-safe)
pthread_mutex_lock(&pss->session_lock);
struct message_queue_node* node = pss->message_queue_head;
if (!node) {
// Queue is empty
pss->writeable_requested = 0;
pthread_mutex_unlock(&pss->session_lock);
return 0;
}
// Remove from queue
pss->message_queue_head = node->next;
if (!pss->message_queue_head) {
pss->message_queue_tail = NULL;
}
pss->message_queue_count--;
pthread_mutex_unlock(&pss->session_lock);
// Write message (libwebsockets handles partial writes internally)
int write_result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
// Free node resources
free(node->data);
free(node);
if (write_result < 0) {
DEBUG_ERROR("process_message_queue: write failed, result=%d", write_result);
return -1;
}
DEBUG_TRACE("Processed message: wrote %d bytes, remaining in queue: %d", write_result, pss->message_queue_count);
// If queue not empty, request another callback
pthread_mutex_lock(&pss->session_lock);
if (pss->message_queue_head) {
lws_callback_on_writable(wsi);
} else {
pss->writeable_requested = 0;
}
pthread_mutex_unlock(&pss->session_lock);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////
// WEBSOCKET PROTOCOL
@@ -677,6 +807,13 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
broadcast_event_to_subscriptions(event);
}
}
} else {
// Check if this is an ephemeral event (kinds 20000-29999)
// Per NIP-01: ephemeral events are broadcast but never stored
if (event_kind >= 20000 && event_kind < 30000) {
DEBUG_TRACE("Ephemeral event (kind %d) - broadcasting without storage", event_kind);
// Broadcast directly to subscriptions without database storage
broadcast_event_to_subscriptions(event);
} else {
DEBUG_TRACE("Storing regular event in database");
// Regular event - store in database and broadcast
@@ -690,6 +827,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
broadcast_event_to_subscriptions(event);
}
}
}
} else {
// Event without valid kind - try normal storage
DEBUG_WARN("Event without valid kind - trying normal storage");
@@ -715,12 +853,18 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
char *response_str = cJSON_Print(response);
if (response_str) {
size_t response_len = strlen(response_str);
unsigned char *buf = malloc(LWS_PRE + response_len);
if (buf) {
memcpy(buf + LWS_PRE, response_str, response_len);
lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=OK len=%zu data=%.100s%s",
response_len,
response_str,
response_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue OK response message");
}
free(response_str);
}
cJSON_Delete(response);
@@ -815,12 +959,18 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
char *eose_str = cJSON_Print(eose_response);
if (eose_str) {
size_t eose_len = strlen(eose_str);
unsigned char *buf = malloc(LWS_PRE + eose_len);
if (buf) {
memcpy(buf + LWS_PRE, eose_str, eose_len);
lws_write(wsi, buf + LWS_PRE, eose_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=EOSE len=%zu data=%.100s%s",
eose_len,
eose_str,
eose_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, eose_str, eose_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue EOSE message");
}
free(eose_str);
}
cJSON_Delete(eose_response);
@@ -900,9 +1050,22 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
// CRITICAL FIX: Remove from session list FIRST (while holding lock)
// to prevent race condition where global manager frees the subscription
// while we're still iterating through the session list
// CRITICAL FIX: Mark subscription as inactive in global manager FIRST
// This prevents other threads from accessing it during removal
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* target_sub = g_subscription_manager.active_subscriptions;
while (target_sub) {
if (strcmp(target_sub->id, subscription_id) == 0 && target_sub->wsi == wsi) {
target_sub->active = 0; // Mark as inactive immediately
break;
}
target_sub = target_sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Now safe to remove from session list
if (pss) {
pthread_mutex_lock(&pss->session_lock);
@@ -920,8 +1083,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
pthread_mutex_unlock(&pss->session_lock);
}
// Remove from global manager AFTER removing from session list
// This prevents use-after-free when iterating session subscriptions
// Finally remove from global manager (which will free it)
remove_subscription_from_manager(subscription_id, wsi);
// Subscription closed
@@ -964,6 +1126,13 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
// Handle message queue when socket becomes writeable
if (pss) {
process_message_queue(wsi, pss);
}
break;
case LWS_CALLBACK_CLOSED:
DEBUG_TRACE("WebSocket connection closed");
@@ -997,20 +1166,66 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
auth_status,
reason);
// Clean up session subscriptions
// Clean up message queue to prevent memory leaks
while (pss->message_queue_head) {
struct message_queue_node* node = pss->message_queue_head;
pss->message_queue_head = node->next;
free(node->data);
free(node);
}
pss->message_queue_tail = NULL;
pss->message_queue_count = 0;
pss->writeable_requested = 0;
// Clean up session subscriptions - copy IDs first to avoid use-after-free
pthread_mutex_lock(&pss->session_lock);
// First pass: collect subscription IDs safely
typedef struct temp_sub_id {
char id[SUBSCRIPTION_ID_MAX_LENGTH];
struct temp_sub_id* next;
} temp_sub_id_t;
temp_sub_id_t* temp_ids = NULL;
temp_sub_id_t* temp_tail = NULL;
int temp_count = 0;
struct subscription* sub = pss->subscriptions;
while (sub) {
struct subscription* next = sub->session_next;
remove_subscription_from_manager(sub->id, wsi);
sub = next;
if (sub->active) { // Only process active subscriptions
temp_sub_id_t* temp = malloc(sizeof(temp_sub_id_t));
if (temp) {
memcpy(temp->id, sub->id, SUBSCRIPTION_ID_MAX_LENGTH);
temp->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
temp->next = NULL;
if (!temp_ids) {
temp_ids = temp;
temp_tail = temp;
} else {
temp_tail->next = temp;
temp_tail = temp;
}
temp_count++;
}
}
sub = sub->session_next;
}
// Clear session list immediately
pss->subscriptions = NULL;
pss->subscription_count = 0;
pthread_mutex_unlock(&pss->session_lock);
// Second pass: remove from global manager using copied IDs
temp_sub_id_t* current_temp = temp_ids;
while (current_temp) {
temp_sub_id_t* next_temp = current_temp->next;
remove_subscription_from_manager(current_temp->id, wsi);
free(current_temp);
current_temp = next_temp;
}
pthread_mutex_destroy(&pss->session_lock);
} else {
DEBUG_LOG("WebSocket CLOSED: ip=unknown duration=0s subscriptions=0 authenticated=no reason=unknown");
@@ -1677,12 +1892,18 @@ int handle_count_message(const char* sub_id, cJSON* filters, struct lws *wsi, st
char *count_str = cJSON_Print(count_response);
if (count_str) {
size_t count_len = strlen(count_str);
unsigned char *buf = malloc(LWS_PRE + count_len);
if (buf) {
memcpy(buf + LWS_PRE, count_str, count_len);
lws_write(wsi, buf + LWS_PRE, count_len, LWS_WRITE_TEXT);
free(buf);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=COUNT len=%zu data=%.100s%s",
count_len,
count_str,
count_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, count_str, count_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue COUNT message");
}
free(count_str);
}
cJSON_Delete(count_response);

View File

@@ -31,6 +31,14 @@
#define MAX_SEARCH_LENGTH 256
#define MAX_TAG_VALUE_LENGTH 1024
// Message queue node for proper libwebsockets pattern
struct message_queue_node {
unsigned char* data; // Message data (with LWS_PRE space)
size_t length; // Message length (without LWS_PRE)
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
struct message_queue_node* next; // Next node in queue
};
// Enhanced per-session data with subscription management, NIP-42 authentication, and rate limiting
struct per_session_data {
int authenticated;
@@ -59,6 +67,12 @@ struct per_session_data {
int malformed_request_count; // Count of malformed requests in current hour
time_t malformed_request_window_start; // Start of current hour window
time_t malformed_request_blocked_until; // Time until blocked for malformed requests
// Message queue for proper libwebsockets pattern (replaces single buffer)
struct message_queue_node* message_queue_head; // Head of message queue
struct message_queue_node* message_queue_tail; // Tail of message queue
int message_queue_count; // Number of messages in queue
int writeable_requested; // Flag: 1 if writeable callback requested
};
// NIP-11 HTTP session data structure for managing buffer lifetime
@@ -73,6 +87,10 @@ struct nip11_session_data {
// Function declarations
int start_websocket_relay(int port_override, int strict_port);
// Message queue functions for proper libwebsockets pattern
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type);
int process_message_queue(struct lws* wsi, struct per_session_data* pss);
// Auth rules checking function from request_validator.c
int check_database_auth_rules(const char *pubkey, const char *operation, const char *resource_hash);

35
tests/ephemeral_test.sh Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash
# Simplified Ephemeral Event Test
# Tests that ephemeral events are broadcast to active subscriptions
echo "=== Generating Ephemeral Event (kind 20000) ==="
event=$(nak event --kind 20000 --content "test ephemeral event")
echo "$event"
echo ""
echo "=== Testing Ephemeral Event Broadcast ==="
subscription='["REQ","test_sub",{"kinds":[20000],"limit":10}]'
echo "Subscription Filter:"
echo "$subscription"
echo ""
event_msg='["EVENT",'"$event"']'
echo "Event Message:"
echo "$event_msg"
echo ""
echo "=== Relay Responses ==="
(
# Send subscription
printf "%s\n" "$subscription"
# Wait for subscription to establish
sleep 1
# Send ephemeral event on same connection
printf "%s\n" "$event_msg"
# Wait for responses
sleep 2
) | timeout 5 websocat ws://127.0.0.1:8888
echo ""
echo "Test complete!"

63
tests/large_event_test.sh Executable file
View File

@@ -0,0 +1,63 @@
#!/bin/bash
# Test script for posting large events (>4KB) to test partial write handling
# Uses nak to properly sign events with large content
RELAY_URL="ws://localhost:8888"
# Check if nak is installed
if ! command -v nak &> /dev/null; then
echo "Error: nak is not installed. Install with: go install github.com/fiatjaf/nak@latest"
exit 1
fi
# Generate a test private key if not set
if [ -z "$NOSTR_PRIVATE_KEY" ]; then
echo "Generating temporary test key..."
export NOSTR_PRIVATE_KEY=$(nak key generate)
fi
echo "=== Large Event Test ==="
echo "Testing partial write handling with events >4KB"
echo "Relay: $RELAY_URL"
echo ""
# Test 1: 5KB event
echo "Test 1: Posting 5KB event..."
CONTENT_5KB=$(python3 -c "print('A' * 5000)")
echo "$CONTENT_5KB" | nak event -k 1 --content - $RELAY_URL
sleep 1
# Test 2: 10KB event
echo ""
echo "Test 2: Posting 10KB event..."
CONTENT_10KB=$(python3 -c "print('B' * 10000)")
echo "$CONTENT_10KB" | nak event -k 1 --content - $RELAY_URL
sleep 1
# Test 3: 20KB event
echo ""
echo "Test 3: Posting 20KB event..."
CONTENT_20KB=$(python3 -c "print('C' * 20000)")
echo "$CONTENT_20KB" | nak event -k 1 --content - $RELAY_URL
sleep 1
# Test 4: 50KB event (very large)
echo ""
echo "Test 4: Posting 50KB event..."
CONTENT_50KB=$(python3 -c "print('D' * 50000)")
echo "$CONTENT_50KB" | nak event -k 1 --content - $RELAY_URL
echo ""
echo "=== Test Complete ==="
echo ""
echo "Check relay.log for:"
echo " - 'Queued partial write' messages (indicates buffering is working)"
echo " - 'write completed' messages (indicates retry succeeded)"
echo " - No 'Invalid frame header' errors"
echo ""
echo "To view logs in real-time:"
echo " tail -f relay.log | grep -E '(partial|write completed|Invalid frame)'"
echo ""
echo "To check if events were stored:"
echo " sqlite3 build/*.db 'SELECT id, length(content) as content_size FROM events ORDER BY created_at DESC LIMIT 4;'"

View File

@@ -3,6 +3,19 @@
# Test script to post kind 1 events to the relay every second
# Cycles through three different secret keys
# Content includes current timestamp
#
# Usage: ./post_events.sh <relay_url>
# Example: ./post_events.sh ws://localhost:8888
# Example: ./post_events.sh wss://relay.laantungir.net
# Check if relay URL is provided
if [ -z "$1" ]; then
echo "Error: Relay URL is required"
echo "Usage: $0 <relay_url>"
echo "Example: $0 ws://localhost:8888"
echo "Example: $0 wss://relay.laantungir.net"
exit 1
fi
# Array of secret keys to cycle through
SECRET_KEYS=(
@@ -11,7 +24,7 @@ SECRET_KEYS=(
"1618aaa21f5bd45c5ffede0d9a60556db67d4a046900e5f66b0bae5c01c801fb"
)
RELAY_URL="ws://localhost:8888"
RELAY_URL="$1"
KEY_INDEX=0
echo "Starting event posting test to $RELAY_URL"
@@ -36,5 +49,5 @@ while true; do
KEY_INDEX=$(( (KEY_INDEX + 1) % ${#SECRET_KEYS[@]} ))
# Wait 1 second
sleep 1
sleep .2
done

View File

@@ -1,203 +0,0 @@
#!/bin/bash
# Rate Limiting Test Suite for C-Relay
# Tests rate limiting and abuse prevention mechanisms
set -e
# Configuration
RELAY_HOST="127.0.0.1"
RELAY_PORT="8888"
TEST_TIMEOUT=15
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Test counters
TOTAL_TESTS=0
PASSED_TESTS=0
FAILED_TESTS=0
# Function to test rate limiting
test_rate_limiting() {
local description="$1"
local message="$2"
local burst_count="${3:-10}"
local expected_limited="${4:-false}"
TOTAL_TESTS=$((TOTAL_TESTS + 1))
echo -n "Testing $description... "
local rate_limited=false
local success_count=0
local error_count=0
# Send burst of messages
for i in $(seq 1 "$burst_count"); do
local response
response=$(echo "$message" | timeout 2 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT 2>/dev/null | head -1 || echo 'TIMEOUT')
if [[ "$response" == *"rate limit"* ]] || [[ "$response" == *"too many"* ]] || [[ "$response" == *"TOO_MANY"* ]]; then
rate_limited=true
elif [[ "$response" == *"EOSE"* ]] || [[ "$response" == *"EVENT"* ]] || [[ "$response" == *"OK"* ]]; then
((success_count++))
else
((error_count++))
fi
# Small delay between requests
sleep 0.05
done
if [[ "$expected_limited" == "true" ]]; then
if [[ "$rate_limited" == "true" ]]; then
echo -e "${GREEN}PASSED${NC} - Rate limiting triggered as expected"
PASSED_TESTS=$((PASSED_TESTS + 1))
return 0
else
echo -e "${RED}FAILED${NC} - Rate limiting not triggered (expected)"
FAILED_TESTS=$((FAILED_TESTS + 1))
return 1
fi
else
if [[ "$rate_limited" == "false" ]]; then
echo -e "${GREEN}PASSED${NC} - No rate limiting for normal traffic"
PASSED_TESTS=$((PASSED_TESTS + 1))
return 0
else
echo -e "${YELLOW}UNCERTAIN${NC} - Unexpected rate limiting"
PASSED_TESTS=$((PASSED_TESTS + 1)) # Count as passed since it's conservative
return 0
fi
fi
}
# Function to test sustained load
test_sustained_load() {
local description="$1"
local message="$2"
local duration="${3:-10}"
TOTAL_TESTS=$((TOTAL_TESTS + 1))
echo -n "Testing $description... "
local start_time
start_time=$(date +%s)
local rate_limited=false
local total_requests=0
local successful_requests=0
while [[ $(($(date +%s) - start_time)) -lt duration ]]; do
((total_requests++))
local response
response=$(echo "$message" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT 2>/dev/null | head -1 || echo 'TIMEOUT')
if [[ "$response" == *"rate limit"* ]] || [[ "$response" == *"too many"* ]] || [[ "$response" == *"TOO_MANY"* ]]; then
rate_limited=true
elif [[ "$response" == *"EOSE"* ]] || [[ "$response" == *"EVENT"* ]] || [[ "$response" == *"OK"* ]]; then
((successful_requests++))
fi
# Small delay to avoid overwhelming
sleep 0.1
done
local success_rate=0
if [[ $total_requests -gt 0 ]]; then
success_rate=$((successful_requests * 100 / total_requests))
fi
if [[ "$rate_limited" == "true" ]]; then
echo -e "${GREEN}PASSED${NC} - Rate limiting activated under sustained load (${success_rate}% success rate)"
PASSED_TESTS=$((PASSED_TESTS + 1))
return 0
else
echo -e "${YELLOW}UNCERTAIN${NC} - No rate limiting detected (${success_rate}% success rate)"
# This might be acceptable if rate limiting is very permissive
PASSED_TESTS=$((PASSED_TESTS + 1))
return 0
fi
}
echo "=========================================="
echo "C-Relay Rate Limiting Test Suite"
echo "=========================================="
echo "Testing rate limiting against relay at ws://$RELAY_HOST:$RELAY_PORT"
echo ""
# Test basic connectivity first
echo "=== Basic Connectivity Test ==="
test_rate_limiting "Basic connectivity" '["REQ","rate_test",{}]' 1 false
echo ""
echo "=== Burst Request Testing ==="
# Test rapid succession of requests
test_rate_limiting "Rapid REQ messages" '["REQ","burst_req_'$(date +%s%N)'",{}]' 20 true
test_rate_limiting "Rapid COUNT messages" '["COUNT","burst_count_'$(date +%s%N)'",{}]' 20 true
test_rate_limiting "Rapid CLOSE messages" '["CLOSE","burst_close"]' 20 true
echo ""
echo "=== Malformed Message Rate Limiting ==="
# Test if malformed messages trigger rate limiting faster
test_rate_limiting "Malformed JSON burst" '["REQ","malformed"' 15 true
test_rate_limiting "Invalid message type burst" '["INVALID","test",{}]' 15 true
test_rate_limiting "Empty message burst" '[]' 15 true
echo ""
echo "=== Sustained Load Testing ==="
# Test sustained moderate load
test_sustained_load "Sustained REQ load" '["REQ","sustained_'$(date +%s%N)'",{}]' 10
test_sustained_load "Sustained COUNT load" '["COUNT","sustained_count_'$(date +%s%N)'",{}]' 10
echo ""
echo "=== Filter Complexity Testing ==="
# Test if complex filters trigger rate limiting
test_rate_limiting "Complex filter burst" '["REQ","complex_'$(date +%s%N)'",{"authors":["a","b","c"],"kinds":[1,2,3],"#e":["x","y","z"],"#p":["m","n","o"],"since":1000000000,"until":2000000000,"limit":100}]' 10 true
echo ""
echo "=== Subscription Management Testing ==="
# Test subscription creation/deletion rate limiting
echo -n "Testing subscription churn... "
local churn_test_passed=true
for i in $(seq 1 25); do
# Create subscription
echo "[\"REQ\",\"churn_${i}_$(date +%s%N)\",{}]" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1 || true
# Close subscription
echo "[\"CLOSE\",\"churn_${i}_*\"]" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1 || true
sleep 0.05
done
# Check if relay is still responsive
if echo 'ping' | timeout 2 websocat -n1 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1; then
echo -e "${GREEN}PASSED${NC} - Subscription churn handled"
TOTAL_TESTS=$((TOTAL_TESTS + 1))
PASSED_TESTS=$((PASSED_TESTS + 1))
else
echo -e "${RED}FAILED${NC} - Relay unresponsive after subscription churn"
TOTAL_TESTS=$((TOTAL_TESTS + 1))
FAILED_TESTS=$((FAILED_TESTS + 1))
fi
echo ""
echo "=== Test Results ==="
echo "Total tests: $TOTAL_TESTS"
echo -e "Passed: ${GREEN}$PASSED_TESTS${NC}"
echo -e "Failed: ${RED}$FAILED_TESTS${NC}"
if [[ $FAILED_TESTS -eq 0 ]]; then
echo -e "${GREEN}✓ All rate limiting tests passed!${NC}"
echo "Rate limiting appears to be working correctly."
exit 0
else
echo -e "${RED}✗ Some rate limiting tests failed!${NC}"
echo "Rate limiting may not be properly configured."
exit 1
fi