Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78d484cfe0 | ||
|
|
182e12817d | ||
|
|
9179d57cc9 |
@@ -5,6 +5,9 @@ ARG DEBUG_BUILD=false
|
||||
|
||||
FROM alpine:3.19 AS builder
|
||||
|
||||
# Re-declare build argument in this stage
|
||||
ARG DEBUG_BUILD=false
|
||||
|
||||
# Install build dependencies
|
||||
RUN apk add --no-cache \
|
||||
build-base \
|
||||
|
||||
@@ -1109,3 +1109,91 @@ body.dark-mode .sql-results-table tbody tr:nth-child(even) {
|
||||
border-radius: var(--border-radius);
|
||||
box-sizing: border-box;
|
||||
}
|
||||
|
||||
/* ================================
|
||||
SIDE NAVIGATION MENU
|
||||
================================ */
|
||||
|
||||
.side-nav {
|
||||
position: fixed;
|
||||
top: 0;
|
||||
left: -300px;
|
||||
width: 280px;
|
||||
height: 100vh;
|
||||
background: var(--secondary-color);
|
||||
border-right: var(--border-width) solid var(--border-color);
|
||||
z-index: 1000;
|
||||
transition: left 0.3s ease;
|
||||
overflow-y: auto;
|
||||
padding-top: 80px;
|
||||
}
|
||||
|
||||
.side-nav.open {
|
||||
left: 0;
|
||||
}
|
||||
|
||||
.side-nav-overlay {
|
||||
position: fixed;
|
||||
top: 0;
|
||||
left: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
background: rgba(0, 0, 0, 0.5);
|
||||
z-index: 999;
|
||||
display: none;
|
||||
}
|
||||
|
||||
.side-nav-overlay.show {
|
||||
display: block;
|
||||
}
|
||||
|
||||
.nav-menu {
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
.nav-menu li {
|
||||
border-bottom: var(--border-width) solid var(--muted-color);
|
||||
}
|
||||
|
||||
.nav-menu li:last-child {
|
||||
border-bottom: none;
|
||||
}
|
||||
|
||||
.nav-item {
|
||||
display: block;
|
||||
padding: 15px 20px;
|
||||
color: var(--primary-color);
|
||||
text-decoration: none;
|
||||
font-family: var(--font-family);
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
transition: all 0.2s ease;
|
||||
cursor: pointer;
|
||||
border: none;
|
||||
background: none;
|
||||
width: 100%;
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.nav-item:hover {
|
||||
background: rgba(0, 0, 0, 0.05);
|
||||
border-left: 4px solid var(--accent-color);
|
||||
padding-left: 16px;
|
||||
}
|
||||
|
||||
.nav-item.active {
|
||||
background: rgba(255, 0, 0, 0.1);
|
||||
border-left: 4px solid var(--accent-color);
|
||||
padding-left: 16px;
|
||||
}
|
||||
|
||||
.header-title.clickable {
|
||||
cursor: pointer;
|
||||
transition: all 0.2s ease;
|
||||
}
|
||||
|
||||
.header-title.clickable:hover {
|
||||
opacity: 0.8;
|
||||
}
|
||||
|
||||
@@ -9,11 +9,26 @@
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<!-- Side Navigation Menu -->
|
||||
<nav class="side-nav" id="side-nav">
|
||||
<ul class="nav-menu">
|
||||
<li><button class="nav-item" data-page="statistics">Statistics</button></li>
|
||||
<li><button class="nav-item" data-page="subscriptions">Subscriptions</button></li>
|
||||
<li><button class="nav-item" data-page="configuration">Configuration</button></li>
|
||||
<li><button class="nav-item" data-page="authorization">Authorization</button></li>
|
||||
<li><button class="nav-item" data-page="dm">DM</button></li>
|
||||
<li><button class="nav-item" data-page="database">Database Query</button></li>
|
||||
</ul>
|
||||
</nav>
|
||||
|
||||
<!-- Side Navigation Overlay -->
|
||||
<div class="side-nav-overlay" id="side-nav-overlay"></div>
|
||||
|
||||
<!-- Header with title and profile display -->
|
||||
<div class="section">
|
||||
|
||||
<div class="header-content">
|
||||
<div class="header-title">
|
||||
<div class="header-title clickable" id="header-title">
|
||||
<span class="relay-letter" data-letter="R">R</span>
|
||||
<span class="relay-letter" data-letter="E">E</span>
|
||||
<span class="relay-letter" data-letter="L">L</span>
|
||||
@@ -80,10 +95,26 @@
|
||||
<td>Total Events</td>
|
||||
<td id="total-events">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Process ID</td>
|
||||
<td id="process-id">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Active Subscriptions</td>
|
||||
<td id="active-subscriptions">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Memory Usage</td>
|
||||
<td id="memory-usage">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>CPU Core</td>
|
||||
<td id="cpu-core">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>CPU Usage</td>
|
||||
<td id="cpu-usage">-</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Oldest Event</td>
|
||||
<td id="oldest-event">-</td>
|
||||
@@ -184,15 +215,14 @@
|
||||
<tr>
|
||||
<th>Subscription ID</th>
|
||||
<th>Client IP</th>
|
||||
<th>WSI Pointer</th>
|
||||
<th>Duration</th>
|
||||
<th>Events Sent</th>
|
||||
<th>Status</th>
|
||||
<th>Filters</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="subscription-details-table-body">
|
||||
<tr>
|
||||
<td colspan="6" style="text-align: center; font-style: italic;">No subscriptions active</td>
|
||||
<td colspan="5" style="text-align: center; font-style: italic;">No subscriptions active</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
580
api/index.js
580
api/index.js
@@ -23,6 +23,7 @@ let currentConfig = null;
|
||||
let relayPool = null;
|
||||
let subscriptionId = null;
|
||||
let isSubscribed = false; // Flag to prevent multiple simultaneous subscriptions
|
||||
let isSubscribing = false; // Flag to prevent re-entry during subscription setup
|
||||
// Relay connection state
|
||||
let relayInfo = null;
|
||||
let isRelayConnected = false;
|
||||
@@ -34,6 +35,10 @@ let statsAutoRefreshInterval = null;
|
||||
let countdownInterval = null;
|
||||
let countdownSeconds = 10;
|
||||
|
||||
// Side navigation state
|
||||
let currentPage = 'statistics'; // Default page
|
||||
let sideNavOpen = false;
|
||||
|
||||
// SQL Query state
|
||||
let pendingSqlQueries = new Map();
|
||||
|
||||
@@ -307,6 +312,13 @@ async function restoreAuthenticationState(pubkey) {
|
||||
|
||||
// Automatically set up relay connection based on current page URL
|
||||
async function setupAutomaticRelayConnection(showSections = false) {
|
||||
console.log('=== SETUP AUTOMATIC RELAY CONNECTION CALLED ===');
|
||||
console.log('Call stack:', new Error().stack);
|
||||
console.log('showSections:', showSections);
|
||||
console.log('Current isRelayConnected:', isRelayConnected);
|
||||
console.log('Current relayPool:', relayPool ? 'EXISTS' : 'NULL');
|
||||
console.log('Current isSubscribed:', isSubscribed);
|
||||
|
||||
try {
|
||||
// Get the current page URL and convert to WebSocket URL
|
||||
const currentUrl = window.location.href;
|
||||
@@ -322,8 +334,9 @@ async function setupAutomaticRelayConnection(showSections = false) {
|
||||
}
|
||||
|
||||
// Remove any path components to get just the base URL
|
||||
// CRITICAL: Always add trailing slash for consistent URL format
|
||||
const url = new URL(relayUrl);
|
||||
relayUrl = `${url.protocol}//${url.host}`;
|
||||
relayUrl = `${url.protocol}//${url.host}/`;
|
||||
|
||||
// Set the relay URL
|
||||
relayConnectionUrl.value = relayUrl;
|
||||
@@ -348,13 +361,8 @@ async function setupAutomaticRelayConnection(showSections = false) {
|
||||
relayPubkey = '4f355bdcb7cc0af728ef3cceb9615d90684bb5b2ca5f859ab0f0b704075871aa';
|
||||
}
|
||||
|
||||
// Initialize relay pool for admin API communication
|
||||
if (!relayPool) {
|
||||
relayPool = new window.NostrTools.SimplePool();
|
||||
console.log('🔌 Initialized SimplePool for admin API communication');
|
||||
}
|
||||
|
||||
// Set up subscription to receive admin API responses
|
||||
// Note: subscribeToConfiguration() will create the SimplePool internally
|
||||
await subscribeToConfiguration();
|
||||
console.log('📡 Subscription established for admin API responses');
|
||||
|
||||
@@ -480,36 +488,33 @@ function handleLogoutEvent() {
|
||||
|
||||
// Update visibility of admin sections based on login and relay connection status
|
||||
function updateAdminSectionsVisibility() {
|
||||
const divConfig = document.getElementById('div_config');
|
||||
const authRulesSection = document.getElementById('authRulesSection');
|
||||
const databaseStatisticsSection = document.getElementById('databaseStatisticsSection');
|
||||
const subscriptionDetailsSection = document.getElementById('subscriptionDetailsSection');
|
||||
const nip17DMSection = document.getElementById('nip17DMSection');
|
||||
const sqlQuerySection = document.getElementById('sqlQuerySection');
|
||||
const shouldShow = isLoggedIn && isRelayConnected;
|
||||
|
||||
if (divConfig) divConfig.style.display = shouldShow ? 'block' : 'none';
|
||||
if (authRulesSection) authRulesSection.style.display = shouldShow ? 'block' : 'none';
|
||||
if (databaseStatisticsSection) databaseStatisticsSection.style.display = shouldShow ? 'block' : 'none';
|
||||
if (subscriptionDetailsSection) subscriptionDetailsSection.style.display = shouldShow ? 'block' : 'none';
|
||||
if (nip17DMSection) nip17DMSection.style.display = shouldShow ? 'block' : 'none';
|
||||
if (sqlQuerySection) sqlQuerySection.style.display = shouldShow ? 'block' : 'none';
|
||||
// If logged in and connected, show the current page, otherwise hide all sections
|
||||
if (shouldShow) {
|
||||
// Show the current page
|
||||
switchPage(currentPage);
|
||||
|
||||
// Start/stop auto-refresh based on visibility
|
||||
if (shouldShow && databaseStatisticsSection && databaseStatisticsSection.style.display === 'block') {
|
||||
// Load statistics immediately (no auto-refresh - using real-time monitoring events)
|
||||
sendStatsQuery().catch(error => {
|
||||
console.log('Auto-fetch statistics failed: ' + error.message);
|
||||
});
|
||||
// startStatsAutoRefresh(); // DISABLED - using real-time monitoring events instead
|
||||
// Also load configuration and auth rules automatically when sections become visible
|
||||
fetchConfiguration().catch(error => {
|
||||
console.log('Auto-fetch configuration failed: ' + error.message);
|
||||
});
|
||||
loadAuthRules().catch(error => {
|
||||
console.log('Auto-load auth rules failed: ' + error.message);
|
||||
});
|
||||
// Load data for the current page
|
||||
loadCurrentPageData();
|
||||
} else {
|
||||
// Hide all sections when not logged in or not connected
|
||||
const sections = [
|
||||
'databaseStatisticsSection',
|
||||
'subscriptionDetailsSection',
|
||||
'div_config',
|
||||
'authRulesSection',
|
||||
'nip17DMSection',
|
||||
'sqlQuerySection'
|
||||
];
|
||||
|
||||
sections.forEach(sectionId => {
|
||||
const section = document.getElementById(sectionId);
|
||||
if (section) {
|
||||
section.style.display = 'none';
|
||||
}
|
||||
});
|
||||
|
||||
stopStatsAutoRefresh();
|
||||
}
|
||||
|
||||
@@ -517,6 +522,31 @@ function updateAdminSectionsVisibility() {
|
||||
updateCountdownDisplay();
|
||||
}
|
||||
|
||||
// Load data for the current page
|
||||
function loadCurrentPageData() {
|
||||
switch (currentPage) {
|
||||
case 'statistics':
|
||||
// Load statistics immediately (no auto-refresh - using real-time monitoring events)
|
||||
sendStatsQuery().catch(error => {
|
||||
console.log('Auto-fetch statistics failed: ' + error.message);
|
||||
});
|
||||
break;
|
||||
case 'configuration':
|
||||
// Load configuration
|
||||
fetchConfiguration().catch(error => {
|
||||
console.log('Auto-fetch configuration failed: ' + error.message);
|
||||
});
|
||||
break;
|
||||
case 'authorization':
|
||||
// Load auth rules
|
||||
loadAuthRules().catch(error => {
|
||||
console.log('Auto-load auth rules failed: ' + error.message);
|
||||
});
|
||||
break;
|
||||
// Other pages don't need initial data loading
|
||||
}
|
||||
}
|
||||
|
||||
// Show login modal
|
||||
function showLoginModal() {
|
||||
if (loginModal && loginModalContainer) {
|
||||
@@ -615,16 +645,22 @@ async function loadUserProfile() {
|
||||
'wss://relay.nostr.band',
|
||||
'wss://nos.lol',
|
||||
'wss://relay.primal.net',
|
||||
'wss://relay.snort.social',
|
||||
'wss://relay.laantungir.net'];
|
||||
'wss://relay.snort.social'
|
||||
];
|
||||
|
||||
// Get profile event (kind 0) for the user
|
||||
const events = await profilePool.querySync(relays, {
|
||||
// Get profile event (kind 0) for the user with timeout
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Profile query timeout')), 5000)
|
||||
);
|
||||
|
||||
const queryPromise = profilePool.querySync(relays, {
|
||||
kinds: [0],
|
||||
authors: [userPubkey],
|
||||
limit: 1
|
||||
});
|
||||
|
||||
const events = await Promise.race([queryPromise, timeoutPromise]);
|
||||
|
||||
if (events.length > 0) {
|
||||
console.log('Profile event found:', events[0]);
|
||||
const profile = JSON.parse(events[0].content);
|
||||
@@ -648,8 +684,14 @@ async function loadUserProfile() {
|
||||
// Keep the npub display
|
||||
}
|
||||
|
||||
// Close the profile pool
|
||||
profilePool.close(relays);
|
||||
// Properly close the profile pool with error handling
|
||||
try {
|
||||
await profilePool.close(relays);
|
||||
// Give time for cleanup
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
} catch (closeError) {
|
||||
console.log('Profile pool close error (non-critical):', closeError.message);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.log('Profile loading failed: ' + error.message);
|
||||
@@ -755,20 +797,25 @@ async function logout() {
|
||||
// Stop auto-refresh before disconnecting
|
||||
stopStatsAutoRefresh();
|
||||
|
||||
|
||||
// Clean up configuration pool
|
||||
// Clean up relay pool
|
||||
if (relayPool) {
|
||||
log('Closing configuration pool...', 'INFO');
|
||||
log('Closing relay pool...', 'INFO');
|
||||
const url = relayConnectionUrl.value.trim();
|
||||
if (url) {
|
||||
relayPool.close([url]);
|
||||
try {
|
||||
await relayPool.close([url]);
|
||||
} catch (e) {
|
||||
console.log('Pool close error (non-critical):', e.message);
|
||||
}
|
||||
}
|
||||
relayPool = null;
|
||||
subscriptionId = null;
|
||||
// Reset subscription flag
|
||||
isSubscribed = false;
|
||||
}
|
||||
|
||||
// Reset subscription flags
|
||||
isSubscribed = false;
|
||||
isSubscribing = false;
|
||||
|
||||
await nlLite.logout();
|
||||
|
||||
userPubkey = null;
|
||||
@@ -778,12 +825,9 @@ async function logout() {
|
||||
// Reset relay connection state
|
||||
isRelayConnected = false;
|
||||
relayPubkey = null;
|
||||
// Reset subscription flag
|
||||
isSubscribed = false;
|
||||
|
||||
// Reset UI - hide profile and show login modal
|
||||
hideProfileFromHeader();
|
||||
// showLoginModal() removed - handled by handleLogoutEvent()
|
||||
|
||||
updateConfigStatus(false);
|
||||
updateAdminSectionsVisibility();
|
||||
@@ -815,42 +859,204 @@ function generateSubId() {
|
||||
return result;
|
||||
}
|
||||
|
||||
// WebSocket monitoring function to attach to SimplePool connections
|
||||
function attachWebSocketMonitoring(relayPool, url) {
|
||||
console.log('🔍 Attaching WebSocket monitoring to SimplePool...');
|
||||
|
||||
// SimplePool stores connections in _conn object
|
||||
if (relayPool && relayPool._conn) {
|
||||
// Monitor when connections are created
|
||||
const originalGetConnection = relayPool._conn[url];
|
||||
if (originalGetConnection) {
|
||||
console.log('📡 Found existing connection for URL:', url);
|
||||
|
||||
// Try to access the WebSocket if it's available
|
||||
const conn = relayPool._conn[url];
|
||||
if (conn && conn.ws) {
|
||||
attachWebSocketEventListeners(conn.ws, url);
|
||||
}
|
||||
}
|
||||
|
||||
// Override the connection getter to monitor new connections
|
||||
const originalConn = relayPool._conn;
|
||||
relayPool._conn = new Proxy(originalConn, {
|
||||
get(target, prop) {
|
||||
const conn = target[prop];
|
||||
if (conn && conn.ws && !conn.ws._monitored) {
|
||||
console.log('🔗 New WebSocket connection detected for:', prop);
|
||||
attachWebSocketEventListeners(conn.ws, prop);
|
||||
conn.ws._monitored = true;
|
||||
}
|
||||
return conn;
|
||||
},
|
||||
set(target, prop, value) {
|
||||
if (value && value.ws && !value.ws._monitored) {
|
||||
console.log('🔗 WebSocket connection being set for:', prop);
|
||||
attachWebSocketEventListeners(value.ws, prop);
|
||||
value.ws._monitored = true;
|
||||
}
|
||||
target[prop] = value;
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
console.log('✅ WebSocket monitoring attached');
|
||||
}
|
||||
|
||||
function attachWebSocketEventListeners(ws, url) {
|
||||
console.log(`🎯 Attaching event listeners to WebSocket for ${url}`);
|
||||
|
||||
// Log connection open
|
||||
ws.addEventListener('open', (event) => {
|
||||
console.log(`🔓 WebSocket OPEN for ${url}:`, {
|
||||
readyState: ws.readyState,
|
||||
url: ws.url,
|
||||
protocol: ws.protocol,
|
||||
extensions: ws.extensions
|
||||
});
|
||||
});
|
||||
|
||||
// Log incoming messages with full details
|
||||
ws.addEventListener('message', (event) => {
|
||||
try {
|
||||
const data = event.data;
|
||||
console.log(`📨 WebSocket MESSAGE from ${url}:`, {
|
||||
type: event.type,
|
||||
data: data,
|
||||
dataLength: data.length,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
// Try to parse as JSON for Nostr messages
|
||||
try {
|
||||
const parsed = JSON.parse(data);
|
||||
if (Array.isArray(parsed)) {
|
||||
const [type, ...args] = parsed;
|
||||
console.log(`📨 Parsed Nostr message [${type}]:`, args);
|
||||
} else {
|
||||
console.log(`📨 Parsed JSON:`, parsed);
|
||||
}
|
||||
} catch (parseError) {
|
||||
console.log(`📨 Raw message (not JSON):`, data);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`❌ Error processing WebSocket message from ${url}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
// Log connection close with details
|
||||
ws.addEventListener('close', (event) => {
|
||||
console.log(`🔒 WebSocket CLOSE for ${url}:`, {
|
||||
code: event.code,
|
||||
reason: event.reason,
|
||||
wasClean: event.wasClean,
|
||||
readyState: ws.readyState,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
});
|
||||
|
||||
// Log errors with full details
|
||||
ws.addEventListener('error', (event) => {
|
||||
console.error(`❌ WebSocket ERROR for ${url}:`, {
|
||||
type: event.type,
|
||||
target: event.target,
|
||||
readyState: ws.readyState,
|
||||
url: ws.url,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
// Log additional WebSocket state
|
||||
console.error(`❌ WebSocket state details:`, {
|
||||
readyState: ws.readyState,
|
||||
bufferedAmount: ws.bufferedAmount,
|
||||
protocol: ws.protocol,
|
||||
extensions: ws.extensions,
|
||||
binaryType: ws.binaryType
|
||||
});
|
||||
});
|
||||
|
||||
// Override send method to log outgoing messages
|
||||
const originalSend = ws.send;
|
||||
ws.send = function(data) {
|
||||
console.log(`📤 WebSocket SEND to ${url}:`, {
|
||||
data: data,
|
||||
dataLength: data.length,
|
||||
readyState: ws.readyState,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
// Try to parse outgoing Nostr messages
|
||||
try {
|
||||
const parsed = JSON.parse(data);
|
||||
if (Array.isArray(parsed)) {
|
||||
const [type, ...args] = parsed;
|
||||
console.log(`📤 Outgoing Nostr message [${type}]:`, args);
|
||||
} else {
|
||||
console.log(`📤 Outgoing JSON:`, parsed);
|
||||
}
|
||||
} catch (parseError) {
|
||||
console.log(`📤 Outgoing raw message (not JSON):`, data);
|
||||
}
|
||||
|
||||
return originalSend.call(this, data);
|
||||
};
|
||||
|
||||
console.log(`✅ Event listeners attached to WebSocket for ${url}`);
|
||||
}
|
||||
|
||||
// Configuration subscription using nostr-tools SimplePool
|
||||
async function subscribeToConfiguration() {
|
||||
try {
|
||||
console.log('=== STARTING SIMPLEPOOL CONFIGURATION SUBSCRIPTION ===');
|
||||
console.log('=== SUBSCRIBE TO CONFIGURATION ===');
|
||||
console.log('Call stack:', new Error().stack);
|
||||
|
||||
// Prevent multiple simultaneous subscription attempts
|
||||
if (isSubscribed) {
|
||||
console.log('Subscription already established, skipping duplicate subscription attempt');
|
||||
// If pool already exists and subscribed, we're done
|
||||
if (relayPool && isSubscribed) {
|
||||
console.log('✅ Already subscribed, reusing existing pool');
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!isLoggedIn) {
|
||||
console.log('WARNING: Not logged in, but proceeding with subscription test');
|
||||
}
|
||||
|
||||
const url = relayConnectionUrl.value.trim();
|
||||
if (!url) {
|
||||
console.error('Please enter a relay URL');
|
||||
// Prevent concurrent subscription attempts
|
||||
if (isSubscribing) {
|
||||
console.log('⚠️ Subscription already in progress');
|
||||
return false;
|
||||
}
|
||||
|
||||
console.log(`Connecting to relay via SimplePool: ${url}`);
|
||||
isSubscribing = true;
|
||||
|
||||
// Reuse existing pool if available, otherwise create new one
|
||||
const url = relayConnectionUrl.value.trim();
|
||||
if (!url) {
|
||||
console.error('No relay URL configured');
|
||||
isSubscribing = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
console.log(`🔌 Connecting to relay: ${url}`);
|
||||
|
||||
// Create pool ONLY if it doesn't exist
|
||||
if (!relayPool) {
|
||||
console.log('Creating new SimplePool instance');
|
||||
console.log('✨ Creating NEW SimplePool for admin operations');
|
||||
relayPool = new window.NostrTools.SimplePool();
|
||||
|
||||
// Attach WebSocket monitoring to the new pool
|
||||
attachWebSocketMonitoring(relayPool, url);
|
||||
} else {
|
||||
console.log('Reusing existing SimplePool instance');
|
||||
console.log('♻️ Reusing existing SimplePool');
|
||||
}
|
||||
|
||||
subscriptionId = generateSubId();
|
||||
|
||||
console.log(`Generated subscription ID: ${subscriptionId}`);
|
||||
console.log(`User pubkey ${userPubkey}`)
|
||||
console.log(`📝 Generated subscription ID: ${subscriptionId}`);
|
||||
console.log(`👤 User pubkey: ${userPubkey}`);
|
||||
console.log(`🎯 About to call relayPool.subscribeMany with URL: ${url}`);
|
||||
console.log(`📊 relayPool._conn before subscribeMany:`, Object.keys(relayPool._conn || {}));
|
||||
|
||||
// Mark as subscribed BEFORE calling subscribeMany to prevent race conditions
|
||||
isSubscribed = true;
|
||||
|
||||
// Subscribe to kind 23457 events (admin response events), kind 4 (NIP-04 DMs), kind 1059 (NIP-17 GiftWrap), and kind 24567 (ephemeral monitoring events)
|
||||
console.log('🔔 Calling relayPool.subscribeMany...');
|
||||
const subscription = relayPool.subscribeMany([url], [{
|
||||
since: Math.floor(Date.now() / 1000) - 5, // Look back 5 seconds to avoid race condition
|
||||
kinds: [23457],
|
||||
@@ -872,20 +1078,21 @@ async function subscribeToConfiguration() {
|
||||
since: Math.floor(Date.now() / 1000), // Start from current time
|
||||
kinds: [24567], // Real-time ephemeral monitoring events
|
||||
authors: [getRelayPubkey()], // Only listen to monitoring events from the relay
|
||||
"#d": isLoggedIn ? ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "subscription_details"] : ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions"], // Include subscription_details only when authenticated
|
||||
"#d": isLoggedIn ? ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "subscription_details", "cpu_metrics"] : ["event_kinds", "time_stats", "top_pubkeys", "active_subscriptions", "cpu_metrics"], // Include subscription_details only when authenticated, cpu_metrics available to all
|
||||
limit: 50
|
||||
}], {
|
||||
async onevent(event) {
|
||||
console.log('=== EVENT RECEIVED VIA SIMPLEPOOL ===');
|
||||
console.log('Event data:', event);
|
||||
console.log('Event kind:', event.kind);
|
||||
console.log('Event tags:', event.tags);
|
||||
console.log('Event pubkey:', event.pubkey);
|
||||
console.log('=== END EVENT ===');
|
||||
// Simplified logging - one line per event
|
||||
if (event.kind === 24567) {
|
||||
const dTag = event.tags.find(tag => tag[0] === 'd');
|
||||
const dataType = dTag ? dTag[1] : 'unknown';
|
||||
console.log(`📊 Monitoring event: ${dataType}`);
|
||||
} else {
|
||||
console.log(`📨 Event received: kind ${event.kind}`);
|
||||
}
|
||||
|
||||
// Handle NIP-04 DMs
|
||||
if (event.kind === 4) {
|
||||
console.log('=== NIP-04 DM RECEIVED ===');
|
||||
try {
|
||||
// Decrypt the DM content
|
||||
const decryptedContent = await window.nostr.nip04.decrypt(event.pubkey, event.content);
|
||||
@@ -910,7 +1117,6 @@ async function subscribeToConfiguration() {
|
||||
|
||||
// Handle NIP-17 GiftWrap DMs
|
||||
if (event.kind === 1059) {
|
||||
console.log('=== NIP-17 GIFTWRAP RECEIVED ===');
|
||||
try {
|
||||
// Step 1: Unwrap gift wrap to get seal
|
||||
const sealJson = await window.nostr.nip44.decrypt(event.pubkey, event.content);
|
||||
@@ -958,10 +1164,7 @@ async function subscribeToConfiguration() {
|
||||
|
||||
// Handle monitoring events (kind 24567 - ephemeral)
|
||||
if (event.kind === 24567) {
|
||||
console.log('=== MONITORING EVENT RECEIVED ===');
|
||||
console.log('Monitoring event:', event);
|
||||
|
||||
// Process monitoring event
|
||||
// Process monitoring event (logging done above)
|
||||
processMonitoringEvent(event);
|
||||
}
|
||||
},
|
||||
@@ -975,23 +1178,30 @@ async function subscribeToConfiguration() {
|
||||
},
|
||||
onclose(reason) {
|
||||
console.log('Subscription closed:', reason);
|
||||
// Reset subscription state to allow re-subscription
|
||||
isSubscribed = false;
|
||||
isSubscribing = false;
|
||||
isRelayConnected = false;
|
||||
updateConfigStatus(false);
|
||||
log('WebSocket connection closed - subscription state reset', 'WARNING');
|
||||
}
|
||||
});
|
||||
|
||||
// Store subscription for cleanup
|
||||
relayPool.currentSubscription = subscription;
|
||||
|
||||
// Mark as subscribed to prevent duplicate attempts
|
||||
// Mark as subscribed
|
||||
isSubscribed = true;
|
||||
isSubscribing = false;
|
||||
|
||||
console.log('SimplePool subscription established');
|
||||
console.log('✅ Subscription established successfully');
|
||||
return true;
|
||||
|
||||
} catch (error) {
|
||||
console.error('Configuration subscription failed:', error.message);
|
||||
console.error('Configuration subscription failed:', error);
|
||||
console.error('Error stack:', error.stack);
|
||||
isSubscribing = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1087,13 +1297,14 @@ function initializeEventRateChart() {
|
||||
eventRateChart = new ASCIIBarChart('event-rate-chart', {
|
||||
maxHeight: 11, // Chart height in lines
|
||||
maxDataPoints: 76, // Show last 76 bins (5+ minutes of history)
|
||||
title: 'Events', // Chart title
|
||||
title: 'New Events', // Chart title
|
||||
xAxisLabel: '', // No X-axis label
|
||||
yAxisLabel: '', // No Y-axis label
|
||||
autoFitWidth: true, // Enable responsive font sizing
|
||||
useBinMode: true, // Enable time bin aggregation
|
||||
binDuration: 4000, // 4-second time bins
|
||||
xAxisLabelFormat: 'elapsed' // Show elapsed time labels
|
||||
xAxisLabelFormat: 'elapsed', // Show elapsed time labels
|
||||
debug: false // Disable debug logging
|
||||
});
|
||||
|
||||
console.log('ASCIIBarChart instance created:', eventRateChart);
|
||||
@@ -1148,70 +1359,56 @@ function createChartStubElements() {
|
||||
// Handle monitoring events (kind 24567 - ephemeral)
|
||||
async function processMonitoringEvent(event) {
|
||||
try {
|
||||
console.log('=== PROCESSING MONITORING EVENT ===');
|
||||
console.log('Monitoring event:', event);
|
||||
|
||||
// Verify this is a kind 24567 ephemeral monitoring event
|
||||
if (event.kind !== 24567) {
|
||||
console.log('Ignoring non-monitoring event, kind:', event.kind);
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify the event is from the relay
|
||||
const expectedRelayPubkey = getRelayPubkey();
|
||||
if (event.pubkey !== expectedRelayPubkey) {
|
||||
console.log('Ignoring monitoring event from unknown pubkey:', event.pubkey);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check the d-tag to determine which type of monitoring event this is
|
||||
const dTag = event.tags.find(tag => tag[0] === 'd');
|
||||
if (!dTag) {
|
||||
console.log('Ignoring monitoring event without d-tag');
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse the monitoring data (content is JSON, not encrypted for monitoring events)
|
||||
const monitoringData = JSON.parse(event.content);
|
||||
console.log('Parsed monitoring data:', monitoringData);
|
||||
|
||||
// Don't add to chart here - we'll track actual event rate changes in updateStatsFromMonitoringEvent
|
||||
console.log('Monitoring event received - will track rate changes in stats update');
|
||||
|
||||
// Route to appropriate handler based on d-tag
|
||||
// Route to appropriate handler based on d-tag (no verbose logging)
|
||||
switch (dTag[1]) {
|
||||
case 'event_kinds':
|
||||
updateStatsFromMonitoringEvent(monitoringData);
|
||||
log('Real-time event_kinds monitoring data updated', 'INFO');
|
||||
break;
|
||||
|
||||
case 'time_stats':
|
||||
updateStatsFromTimeMonitoringEvent(monitoringData);
|
||||
log('Real-time time_stats monitoring data updated', 'INFO');
|
||||
break;
|
||||
|
||||
case 'top_pubkeys':
|
||||
updateStatsFromTopPubkeysMonitoringEvent(monitoringData);
|
||||
log('Real-time top_pubkeys monitoring data updated', 'INFO');
|
||||
break;
|
||||
|
||||
case 'active_subscriptions':
|
||||
updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData);
|
||||
log('Real-time active_subscriptions monitoring data updated', 'INFO');
|
||||
break;
|
||||
|
||||
case 'subscription_details':
|
||||
// Only process subscription details if user is authenticated
|
||||
if (isLoggedIn) {
|
||||
updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData);
|
||||
log('Real-time subscription_details monitoring data updated', 'INFO');
|
||||
} else {
|
||||
console.log('Ignoring subscription_details monitoring event - user not authenticated');
|
||||
}
|
||||
break;
|
||||
|
||||
case 'cpu_metrics':
|
||||
updateStatsFromCpuMonitoringEvent(monitoringData);
|
||||
break;
|
||||
|
||||
default:
|
||||
console.log('Ignoring monitoring event with unknown d-tag:', dTag[1]);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3770,11 +3967,8 @@ function handleStatsQueryResponse(responseData) {
|
||||
// Update statistics display from real-time monitoring event
|
||||
function updateStatsFromMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
log('Updating stats from monitoring event...', 'INFO');
|
||||
console.log('Monitoring data:', monitoringData);
|
||||
|
||||
if (monitoringData.data_type !== 'event_kinds') {
|
||||
log('Ignoring monitoring event with different data type', 'WARNING');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3801,8 +3995,6 @@ function updateStatsFromMonitoringEvent(monitoringData) {
|
||||
populateStatsKindsFromMonitoring(monitoringData.kinds, monitoringData.total_events);
|
||||
}
|
||||
|
||||
log('Real-time statistics updated from monitoring event', 'INFO');
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating stats from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
@@ -3811,11 +4003,7 @@ function updateStatsFromMonitoringEvent(monitoringData) {
|
||||
// Update statistics display from time_stats monitoring event
|
||||
function updateStatsFromTimeMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
log('Updating time stats from monitoring event...', 'INFO');
|
||||
console.log('Time monitoring data:', monitoringData);
|
||||
|
||||
if (monitoringData.data_type !== 'time_stats') {
|
||||
log('Ignoring time monitoring event with different data type', 'WARNING');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3834,8 +4022,6 @@ function updateStatsFromTimeMonitoringEvent(monitoringData) {
|
||||
populateStatsTime({ time_stats: timeStats });
|
||||
}
|
||||
|
||||
log('Real-time time statistics updated from monitoring event', 'INFO');
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating time stats from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
@@ -3844,11 +4030,7 @@ function updateStatsFromTimeMonitoringEvent(monitoringData) {
|
||||
// Update statistics display from top_pubkeys monitoring event
|
||||
function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
log('Updating top pubkeys from monitoring event...', 'INFO');
|
||||
console.log('Top pubkeys monitoring data:', monitoringData);
|
||||
|
||||
if (monitoringData.data_type !== 'top_pubkeys') {
|
||||
log('Ignoring top pubkeys monitoring event with different data type', 'WARNING');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3858,8 +4040,6 @@ function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
|
||||
populateStatsPubkeysFromMonitoring(monitoringData.pubkeys, monitoringData.total_events || 0);
|
||||
}
|
||||
|
||||
log('Real-time top pubkeys statistics updated from monitoring event', 'INFO');
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating top pubkeys from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
@@ -3868,11 +4048,7 @@ function updateStatsFromTopPubkeysMonitoringEvent(monitoringData) {
|
||||
// Update statistics display from active_subscriptions monitoring event
|
||||
function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
log('Updating active subscriptions from monitoring event...', 'INFO');
|
||||
console.log('Active subscriptions monitoring data:', monitoringData);
|
||||
|
||||
if (monitoringData.data_type !== 'active_subscriptions') {
|
||||
log('Ignoring active subscriptions monitoring event with different data type', 'WARNING');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3882,8 +4058,6 @@ function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
|
||||
updateStatsCell('active-subscriptions', monitoringData.data.total_subscriptions.toString());
|
||||
}
|
||||
|
||||
log('Real-time active subscriptions statistics updated from monitoring event', 'INFO');
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating active subscriptions from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
@@ -3892,11 +4066,7 @@ function updateStatsFromActiveSubscriptionsMonitoringEvent(monitoringData) {
|
||||
// Update statistics display from subscription_details monitoring event
|
||||
function updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
log('Updating subscription details from monitoring event...', 'INFO');
|
||||
console.log('Subscription details monitoring data:', monitoringData);
|
||||
|
||||
if (monitoringData.data_type !== 'subscription_details') {
|
||||
log('Ignoring subscription details monitoring event with different data type', 'WARNING');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3905,13 +4075,43 @@ function updateStatsFromSubscriptionDetailsMonitoringEvent(monitoringData) {
|
||||
populateSubscriptionDetailsTable(monitoringData.data.subscriptions);
|
||||
}
|
||||
|
||||
log('Real-time subscription details statistics updated from monitoring event', 'INFO');
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating subscription details from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
}
|
||||
|
||||
// Update statistics display from CPU metrics monitoring event
|
||||
function updateStatsFromCpuMonitoringEvent(monitoringData) {
|
||||
try {
|
||||
if (monitoringData.data_type !== 'cpu_metrics') {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update CPU metrics in the database statistics table
|
||||
if (monitoringData.process_id !== undefined) {
|
||||
updateStatsCell('process-id', monitoringData.process_id.toString());
|
||||
}
|
||||
|
||||
if (monitoringData.memory_usage_mb !== undefined) {
|
||||
updateStatsCell('memory-usage', monitoringData.memory_usage_mb.toFixed(1) + ' MB');
|
||||
}
|
||||
|
||||
if (monitoringData.current_cpu_core !== undefined) {
|
||||
updateStatsCell('cpu-core', 'Core ' + monitoringData.current_cpu_core);
|
||||
}
|
||||
|
||||
// Calculate CPU usage percentage if we have the data
|
||||
if (monitoringData.process_cpu_time !== undefined && monitoringData.system_cpu_time !== undefined) {
|
||||
// For now, just show the raw process CPU time (simplified)
|
||||
// In a real implementation, you'd calculate deltas over time
|
||||
updateStatsCell('cpu-usage', monitoringData.process_cpu_time + ' ticks');
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
log(`Error updating CPU metrics from monitoring event: ${error.message}`, 'ERROR');
|
||||
}
|
||||
}
|
||||
|
||||
// Populate event kinds table from monitoring data
|
||||
function populateStatsKindsFromMonitoring(kindsData, totalEvents) {
|
||||
const tableBody = document.getElementById('stats-kinds-table-body');
|
||||
@@ -4076,7 +4276,7 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
|
||||
|
||||
if (subscriptionsData.length === 0) {
|
||||
const row = document.createElement('tr');
|
||||
row.innerHTML = '<td colspan="6" style="text-align: center; font-style: italic;">No active subscriptions</td>';
|
||||
row.innerHTML = '<td colspan="5" style="text-align: center; font-style: italic;">No active subscriptions</td>';
|
||||
tableBody.appendChild(row);
|
||||
return;
|
||||
}
|
||||
@@ -4092,8 +4292,8 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
|
||||
// Format client IP (show full IP for admin view)
|
||||
const clientIP = subscription.client_ip || 'unknown';
|
||||
|
||||
// Format status
|
||||
const status = subscription.active ? 'Active' : 'Inactive';
|
||||
// Format wsi_pointer (show full pointer)
|
||||
const wsiPointer = subscription.wsi_pointer || 'N/A';
|
||||
|
||||
// Format filters (show actual filter details)
|
||||
let filtersDisplay = 'None';
|
||||
@@ -4161,9 +4361,8 @@ function populateSubscriptionDetailsTable(subscriptionsData) {
|
||||
row.innerHTML = `
|
||||
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${subscription.id || 'N/A'}</td>
|
||||
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${clientIP}</td>
|
||||
<td style="font-family: 'Courier New', monospace; font-size: 12px;">${wsiPointer}</td>
|
||||
<td>${durationStr}</td>
|
||||
<td>${subscription.events_sent || 0}</td>
|
||||
<td>${status}</td>
|
||||
<td>${filtersDisplay}</td>
|
||||
`;
|
||||
tableBody.appendChild(row);
|
||||
@@ -4383,6 +4582,85 @@ function initializeDarkMode() {
|
||||
}
|
||||
}
|
||||
|
||||
// Side navigation functions
|
||||
function toggleSideNav() {
|
||||
const sideNav = document.getElementById('side-nav');
|
||||
const overlay = document.getElementById('side-nav-overlay');
|
||||
|
||||
if (sideNavOpen) {
|
||||
sideNav.classList.remove('open');
|
||||
overlay.classList.remove('show');
|
||||
sideNavOpen = false;
|
||||
} else {
|
||||
sideNav.classList.add('open');
|
||||
overlay.classList.add('show');
|
||||
sideNavOpen = true;
|
||||
}
|
||||
}
|
||||
|
||||
function closeSideNav() {
|
||||
const sideNav = document.getElementById('side-nav');
|
||||
const overlay = document.getElementById('side-nav-overlay');
|
||||
|
||||
sideNav.classList.remove('open');
|
||||
overlay.classList.remove('show');
|
||||
sideNavOpen = false;
|
||||
}
|
||||
|
||||
function switchPage(pageName) {
|
||||
// Update current page
|
||||
currentPage = pageName;
|
||||
|
||||
// Update navigation active state
|
||||
const navItems = document.querySelectorAll('.nav-item');
|
||||
navItems.forEach(item => {
|
||||
item.classList.remove('active');
|
||||
if (item.getAttribute('data-page') === pageName) {
|
||||
item.classList.add('active');
|
||||
}
|
||||
});
|
||||
|
||||
// Hide all sections
|
||||
const sections = [
|
||||
'databaseStatisticsSection',
|
||||
'subscriptionDetailsSection',
|
||||
'div_config',
|
||||
'authRulesSection',
|
||||
'nip17DMSection',
|
||||
'sqlQuerySection'
|
||||
];
|
||||
|
||||
sections.forEach(sectionId => {
|
||||
const section = document.getElementById(sectionId);
|
||||
if (section) {
|
||||
section.style.display = 'none';
|
||||
}
|
||||
});
|
||||
|
||||
// Show selected section
|
||||
const pageMap = {
|
||||
'statistics': 'databaseStatisticsSection',
|
||||
'subscriptions': 'subscriptionDetailsSection',
|
||||
'configuration': 'div_config',
|
||||
'authorization': 'authRulesSection',
|
||||
'dm': 'nip17DMSection',
|
||||
'database': 'sqlQuerySection'
|
||||
};
|
||||
|
||||
const targetSectionId = pageMap[pageName];
|
||||
if (targetSectionId) {
|
||||
const targetSection = document.getElementById(targetSectionId);
|
||||
if (targetSection) {
|
||||
targetSection.style.display = 'block';
|
||||
}
|
||||
}
|
||||
|
||||
// Close side navigation
|
||||
closeSideNav();
|
||||
|
||||
log(`Switched to page: ${pageName}`, 'INFO');
|
||||
}
|
||||
|
||||
// Initialize the app
|
||||
document.addEventListener('DOMContentLoaded', () => {
|
||||
console.log('C-Relay Admin API interface loaded');
|
||||
@@ -4398,6 +4676,9 @@ document.addEventListener('DOMContentLoaded', () => {
|
||||
initializeEventRateChart();
|
||||
}, 1000); // Delay to ensure text_graph.js is loaded
|
||||
|
||||
// Initialize side navigation
|
||||
initializeSideNavigation();
|
||||
|
||||
// Ensure admin sections are hidden by default on page load
|
||||
updateAdminSectionsVisibility();
|
||||
|
||||
@@ -4408,6 +4689,35 @@ document.addEventListener('DOMContentLoaded', () => {
|
||||
}, 100);
|
||||
});
|
||||
|
||||
// Initialize side navigation event handlers
|
||||
function initializeSideNavigation() {
|
||||
// Header title click handler
|
||||
const headerTitle = document.getElementById('header-title');
|
||||
if (headerTitle) {
|
||||
headerTitle.addEventListener('click', toggleSideNav);
|
||||
}
|
||||
|
||||
// Overlay click handler
|
||||
const overlay = document.getElementById('side-nav-overlay');
|
||||
if (overlay) {
|
||||
overlay.addEventListener('click', closeSideNav);
|
||||
}
|
||||
|
||||
// Navigation item click handlers
|
||||
const navItems = document.querySelectorAll('.nav-item');
|
||||
navItems.forEach(item => {
|
||||
item.addEventListener('click', (e) => {
|
||||
const pageName = e.target.getAttribute('data-page');
|
||||
if (pageName) {
|
||||
switchPage(pageName);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Set initial page
|
||||
switchPage(currentPage);
|
||||
}
|
||||
|
||||
// ================================
|
||||
// SQL QUERY FUNCTIONS
|
||||
// ================================
|
||||
|
||||
@@ -18,6 +18,7 @@ class ASCIIBarChart {
|
||||
* @param {boolean} [options.useBinMode=false] - Enable time bin mode for data aggregation
|
||||
* @param {number} [options.binDuration=10000] - Duration of each time bin in milliseconds (10 seconds default)
|
||||
* @param {string} [options.xAxisLabelFormat='elapsed'] - X-axis label format: 'elapsed', 'bins', 'timestamps', 'ranges'
|
||||
* @param {boolean} [options.debug=false] - Enable debug logging
|
||||
*/
|
||||
constructor(containerId, options = {}) {
|
||||
this.container = document.getElementById(containerId);
|
||||
@@ -29,6 +30,7 @@ class ASCIIBarChart {
|
||||
this.xAxisLabel = options.xAxisLabel || '';
|
||||
this.yAxisLabel = options.yAxisLabel || '';
|
||||
this.autoFitWidth = options.autoFitWidth !== false; // Default to true
|
||||
this.debug = options.debug || false; // Debug logging option
|
||||
|
||||
// Time bin configuration
|
||||
this.useBinMode = options.useBinMode !== false; // Default to true
|
||||
@@ -55,32 +57,21 @@ class ASCIIBarChart {
|
||||
this.initializeBins();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add a new data point to the chart
|
||||
* @param {number} value - The numeric value to add
|
||||
*/
|
||||
addValue(value) {
|
||||
if (this.useBinMode) {
|
||||
// Time bin mode: increment count in current active bin
|
||||
this.checkBinRotation(); // Ensure we have an active bin
|
||||
this.bins[this.currentBinIndex].count++;
|
||||
this.totalDataPoints++;
|
||||
} else {
|
||||
// Legacy mode: add individual values
|
||||
this.data.push(value);
|
||||
this.totalDataPoints++;
|
||||
|
||||
// Keep only the most recent data points
|
||||
if (this.data.length > this.maxDataPoints) {
|
||||
this.data.shift();
|
||||
}
|
||||
}
|
||||
// Time bin mode: add value to current active bin count
|
||||
this.checkBinRotation(); // Ensure we have an active bin
|
||||
this.bins[this.currentBinIndex].count += value; // Changed from ++ to += value
|
||||
this.totalDataPoints++;
|
||||
|
||||
this.render();
|
||||
this.updateInfo();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Clear all data from the chart
|
||||
*/
|
||||
@@ -98,7 +89,7 @@ class ASCIIBarChart {
|
||||
this.render();
|
||||
this.updateInfo();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Calculate the width of the chart in characters
|
||||
* @returns {number} The chart width in characters
|
||||
@@ -119,14 +110,14 @@ class ASCIIBarChart {
|
||||
const totalWidth = yAxisPadding + yAxisNumbers + separator + dataWidth + padding;
|
||||
|
||||
// Only log when width changes
|
||||
if (this.lastChartWidth !== totalWidth) {
|
||||
if (this.debug && this.lastChartWidth !== totalWidth) {
|
||||
console.log('getChartWidth changed:', { dataLength, totalWidth, previous: this.lastChartWidth });
|
||||
this.lastChartWidth = totalWidth;
|
||||
}
|
||||
|
||||
return totalWidth;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adjust font size to fit container width
|
||||
* @private
|
||||
@@ -142,7 +133,7 @@ class ASCIIBarChart {
|
||||
// Calculate optimal font size
|
||||
// For monospace fonts, character width is approximately 0.6 * font size
|
||||
// Use a slightly smaller ratio to fit more content
|
||||
const charWidthRatio = 0.6;
|
||||
const charWidthRatio = 0.7;
|
||||
const padding = 30; // Reduce padding to fit more content
|
||||
const availableWidth = containerWidth - padding;
|
||||
const optimalFontSize = Math.floor((availableWidth / chartWidth) / charWidthRatio);
|
||||
@@ -151,7 +142,7 @@ class ASCIIBarChart {
|
||||
const fontSize = Math.max(4, Math.min(20, optimalFontSize));
|
||||
|
||||
// Only log when font size changes
|
||||
if (this.lastFontSize !== fontSize) {
|
||||
if (this.debug && this.lastFontSize !== fontSize) {
|
||||
console.log('fontSize changed:', { containerWidth, chartWidth, fontSize, previous: this.lastFontSize });
|
||||
this.lastFontSize = fontSize;
|
||||
}
|
||||
@@ -159,7 +150,7 @@ class ASCIIBarChart {
|
||||
this.container.style.fontSize = fontSize + 'px';
|
||||
this.container.style.lineHeight = '1.0';
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render the chart to the container
|
||||
* @private
|
||||
@@ -190,7 +181,9 @@ class ASCIIBarChart {
|
||||
}
|
||||
});
|
||||
|
||||
console.log('render() dataToRender:', dataToRender, 'bins length:', this.bins.length);
|
||||
if (this.debug) {
|
||||
console.log('render() dataToRender:', dataToRender, 'bins length:', this.bins.length);
|
||||
}
|
||||
maxValue = Math.max(...dataToRender);
|
||||
minValue = Math.min(...dataToRender);
|
||||
valueRange = maxValue - minValue;
|
||||
@@ -219,12 +212,12 @@ class ASCIIBarChart {
|
||||
const yAxisPadding = this.yAxisLabel ? ' ' : '';
|
||||
|
||||
// Add title if provided (centered)
|
||||
if (this.title) {
|
||||
// const chartWidth = 4 + this.maxDataPoints * 2; // Y-axis numbers + data columns // TEMP: commented for no-space test
|
||||
const chartWidth = 4 + this.maxDataPoints; // Y-axis numbers + data columns // TEMP: adjusted for no-space columns
|
||||
const titlePadding = Math.floor((chartWidth - this.title.length) / 2);
|
||||
output += yAxisPadding + ' '.repeat(Math.max(0, titlePadding)) + this.title + '\n\n';
|
||||
}
|
||||
if (this.title) {
|
||||
// const chartWidth = 4 + this.maxDataPoints * 2; // Y-axis numbers + data columns // TEMP: commented for no-space test
|
||||
const chartWidth = 4 + this.maxDataPoints; // Y-axis numbers + data columns // TEMP: adjusted for no-space columns
|
||||
const titlePadding = Math.floor((chartWidth - this.title.length) / 2);
|
||||
output += yAxisPadding + ' '.repeat(Math.max(0, titlePadding)) + this.title + '\n\n';
|
||||
}
|
||||
|
||||
// Draw from top to bottom
|
||||
for (let row = scale; row > 0; row--) {
|
||||
@@ -243,8 +236,8 @@ class ASCIIBarChart {
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate the actual count value this row represents (0 at bottom, increasing upward)
|
||||
const rowCount = (row - 1) * scaleFactor;
|
||||
// Calculate the actual count value this row represents (1 at bottom, increasing upward)
|
||||
const rowCount = (row - 1) * scaleFactor + 1;
|
||||
|
||||
// Add Y-axis label (show actual count values)
|
||||
line += String(rowCount).padStart(3, ' ') + ' |';
|
||||
@@ -267,75 +260,75 @@ class ASCIIBarChart {
|
||||
}
|
||||
|
||||
// Draw X-axis
|
||||
// output += yAxisPadding + ' +' + '-'.repeat(this.maxDataPoints * 2) + '\n'; // TEMP: commented out for no-space test
|
||||
output += yAxisPadding + ' +' + '-'.repeat(this.maxDataPoints) + '\n'; // TEMP: back to original length
|
||||
// output += yAxisPadding + ' +' + '-'.repeat(this.maxDataPoints * 2) + '\n'; // TEMP: commented out for no-space test
|
||||
output += yAxisPadding + ' +' + '-'.repeat(this.maxDataPoints) + '\n'; // TEMP: back to original length
|
||||
|
||||
// Draw X-axis labels based on mode and format
|
||||
let xAxisLabels = yAxisPadding + ' '; // Initial padding to align with X-axis
|
||||
let xAxisLabels = yAxisPadding + ' '; // Initial padding to align with X-axis
|
||||
|
||||
// Determine label interval (every 5 columns)
|
||||
const labelInterval = 5;
|
||||
// Determine label interval (every 5 columns)
|
||||
const labelInterval = 5;
|
||||
|
||||
// Generate all labels first and store in array
|
||||
let labels = [];
|
||||
for (let i = 0; i < this.maxDataPoints; i++) {
|
||||
if (i % labelInterval === 0) {
|
||||
let label = '';
|
||||
if (this.useBinMode) {
|
||||
// For bin mode, show labels for all possible positions
|
||||
// i=0 is leftmost (most recent), i=maxDataPoints-1 is rightmost (oldest)
|
||||
const elapsedSec = (i * this.binDuration) / 1000;
|
||||
// Format with appropriate precision for sub-second bins
|
||||
if (this.binDuration < 1000) {
|
||||
// Show decimal seconds for sub-second bins
|
||||
label = elapsedSec.toFixed(1) + 's';
|
||||
} else {
|
||||
// Show whole seconds for 1+ second bins
|
||||
label = String(Math.round(elapsedSec)) + 's';
|
||||
}
|
||||
} else {
|
||||
// For legacy mode, show data point numbers
|
||||
const startIndex = Math.max(1, this.totalDataPoints - this.maxDataPoints + 1);
|
||||
label = String(startIndex + i);
|
||||
}
|
||||
labels.push(label);
|
||||
}
|
||||
}
|
||||
// Generate all labels first and store in array
|
||||
let labels = [];
|
||||
for (let i = 0; i < this.maxDataPoints; i++) {
|
||||
if (i % labelInterval === 0) {
|
||||
let label = '';
|
||||
if (this.useBinMode) {
|
||||
// For bin mode, show labels for all possible positions
|
||||
// i=0 is leftmost (most recent), i=maxDataPoints-1 is rightmost (oldest)
|
||||
const elapsedSec = (i * this.binDuration) / 1000;
|
||||
// Format with appropriate precision for sub-second bins
|
||||
if (this.binDuration < 1000) {
|
||||
// Show decimal seconds for sub-second bins
|
||||
label = elapsedSec.toFixed(1) + 's';
|
||||
} else {
|
||||
// Show whole seconds for 1+ second bins
|
||||
label = String(Math.round(elapsedSec)) + 's';
|
||||
}
|
||||
} else {
|
||||
// For legacy mode, show data point numbers
|
||||
const startIndex = Math.max(1, this.totalDataPoints - this.maxDataPoints + 1);
|
||||
label = String(startIndex + i);
|
||||
}
|
||||
labels.push(label);
|
||||
}
|
||||
}
|
||||
|
||||
// Build the label string with calculated spacing
|
||||
for (let i = 0; i < labels.length; i++) {
|
||||
const label = labels[i];
|
||||
xAxisLabels += label;
|
||||
// Build the label string with calculated spacing
|
||||
for (let i = 0; i < labels.length; i++) {
|
||||
const label = labels[i];
|
||||
xAxisLabels += label;
|
||||
|
||||
// Add spacing: labelInterval - label.length (except for last label)
|
||||
if (i < labels.length - 1) {
|
||||
const spacing = labelInterval - label.length;
|
||||
xAxisLabels += ' '.repeat(spacing);
|
||||
}
|
||||
}
|
||||
// Add spacing: labelInterval - label.length (except for last label)
|
||||
if (i < labels.length - 1) {
|
||||
const spacing = labelInterval - label.length;
|
||||
xAxisLabels += ' '.repeat(spacing);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the label line extends to match the X-axis dash line length
|
||||
// The dash line is this.maxDataPoints characters long, starting after " +"
|
||||
const dashLineLength = this.maxDataPoints;
|
||||
const minLabelLineLength = yAxisPadding.length + 4 + dashLineLength; // 4 for " "
|
||||
if (xAxisLabels.length < minLabelLineLength) {
|
||||
xAxisLabels += ' '.repeat(minLabelLineLength - xAxisLabels.length);
|
||||
}
|
||||
// Ensure the label line extends to match the X-axis dash line length
|
||||
// The dash line is this.maxDataPoints characters long, starting after " +"
|
||||
const dashLineLength = this.maxDataPoints;
|
||||
const minLabelLineLength = yAxisPadding.length + 4 + dashLineLength; // 4 for " "
|
||||
if (xAxisLabels.length < minLabelLineLength) {
|
||||
xAxisLabels += ' '.repeat(minLabelLineLength - xAxisLabels.length);
|
||||
}
|
||||
output += xAxisLabels + '\n';
|
||||
|
||||
// Add X-axis label if provided
|
||||
if (this.xAxisLabel) {
|
||||
// const labelPadding = Math.floor((this.maxDataPoints * 2 - this.xAxisLabel.length) / 2); // TEMP: commented for no-space test
|
||||
const labelPadding = Math.floor((this.maxDataPoints - this.xAxisLabel.length) / 2); // TEMP: adjusted for no-space columns
|
||||
output += '\n' + yAxisPadding + ' ' + ' '.repeat(Math.max(0, labelPadding)) + this.xAxisLabel + '\n';
|
||||
}
|
||||
if (this.xAxisLabel) {
|
||||
// const labelPadding = Math.floor((this.maxDataPoints * 2 - this.xAxisLabel.length) / 2); // TEMP: commented for no-space test
|
||||
const labelPadding = Math.floor((this.maxDataPoints - this.xAxisLabel.length) / 2); // TEMP: adjusted for no-space columns
|
||||
output += '\n' + yAxisPadding + ' ' + ' '.repeat(Math.max(0, labelPadding)) + this.xAxisLabel + '\n';
|
||||
}
|
||||
|
||||
this.container.textContent = output;
|
||||
|
||||
// Adjust font size to fit width (only once at initialization)
|
||||
if (this.autoFitWidth) {
|
||||
this.adjustFontSize();
|
||||
}
|
||||
if (this.autoFitWidth) {
|
||||
this.adjustFontSize();
|
||||
}
|
||||
|
||||
// Update the external info display
|
||||
if (this.useBinMode) {
|
||||
@@ -350,7 +343,7 @@ class ASCIIBarChart {
|
||||
document.getElementById('scale').textContent = `Min: ${minValue}, Max: ${maxValue}, Height: ${scale}`;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update the info display
|
||||
* @private
|
||||
|
||||
298
docs/libwebsockets_proper_pattern.md
Normal file
298
docs/libwebsockets_proper_pattern.md
Normal file
@@ -0,0 +1,298 @@
|
||||
# Libwebsockets Proper Pattern - Message Queue Design
|
||||
|
||||
## Problem Analysis
|
||||
|
||||
### Current Violation
|
||||
We're calling `lws_write()` directly from multiple code paths:
|
||||
1. **Event broadcast** (subscriptions.c:667) - when events arrive
|
||||
2. **OK responses** (websockets.c:855) - when processing EVENT messages
|
||||
3. **EOSE responses** (websockets.c:976) - when processing REQ messages
|
||||
4. **COUNT responses** (websockets.c:1922) - when processing COUNT messages
|
||||
|
||||
This violates libwebsockets' design pattern which requires:
|
||||
- **`lws_write()` ONLY called from `LWS_CALLBACK_SERVER_WRITEABLE`**
|
||||
- Application queues messages and requests writeable callback
|
||||
- Libwebsockets handles write timing and socket buffer management
|
||||
|
||||
### Consequences of Violation
|
||||
1. Partial writes when socket buffer is full
|
||||
2. Multiple concurrent write attempts before callback fires
|
||||
3. "write already pending" errors with single buffer
|
||||
4. Frame corruption from interleaved partial writes
|
||||
5. "Invalid frame header" errors on client side
|
||||
|
||||
## Correct Architecture
|
||||
|
||||
### Message Queue Pattern
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Application Layer │
|
||||
├─────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ Event Arrives → Queue Message → Request Writeable Callback │
|
||||
│ REQ Received → Queue EOSE → Request Writeable Callback │
|
||||
│ EVENT Received→ Queue OK → Request Writeable Callback │
|
||||
│ COUNT Received→ Queue COUNT → Request Writeable Callback │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
↓
|
||||
lws_callback_on_writable(wsi)
|
||||
↓
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ LWS_CALLBACK_SERVER_WRITEABLE │
|
||||
├─────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ 1. Dequeue next message from queue │
|
||||
│ 2. Call lws_write() with message data │
|
||||
│ 3. If queue not empty, request another callback │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
↓
|
||||
libwebsockets handles:
|
||||
- Socket buffer management
|
||||
- Partial write handling
|
||||
- Frame atomicity
|
||||
```
|
||||
|
||||
## Data Structures
|
||||
|
||||
### Message Queue Node
|
||||
```c
|
||||
typedef struct message_queue_node {
|
||||
unsigned char* data; // Message data (with LWS_PRE space)
|
||||
size_t length; // Message length (without LWS_PRE)
|
||||
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
|
||||
struct message_queue_node* next;
|
||||
} message_queue_node_t;
|
||||
```
|
||||
|
||||
### Per-Session Data Updates
|
||||
```c
|
||||
struct per_session_data {
|
||||
// ... existing fields ...
|
||||
|
||||
// Message queue (replaces single buffer)
|
||||
message_queue_node_t* message_queue_head;
|
||||
message_queue_node_t* message_queue_tail;
|
||||
int message_queue_count;
|
||||
int writeable_requested; // Flag to prevent duplicate requests
|
||||
};
|
||||
```
|
||||
|
||||
## Implementation Functions
|
||||
|
||||
### 1. Queue Message (Application Layer)
|
||||
```c
|
||||
int queue_message(struct lws* wsi, struct per_session_data* pss,
|
||||
const char* message, size_t length,
|
||||
enum lws_write_protocol type)
|
||||
{
|
||||
// Allocate node
|
||||
message_queue_node_t* node = malloc(sizeof(message_queue_node_t));
|
||||
|
||||
// Allocate buffer with LWS_PRE space
|
||||
node->data = malloc(LWS_PRE + length);
|
||||
memcpy(node->data + LWS_PRE, message, length);
|
||||
node->length = length;
|
||||
node->type = type;
|
||||
node->next = NULL;
|
||||
|
||||
// Add to queue (FIFO)
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
if (!pss->message_queue_head) {
|
||||
pss->message_queue_head = node;
|
||||
pss->message_queue_tail = node;
|
||||
} else {
|
||||
pss->message_queue_tail->next = node;
|
||||
pss->message_queue_tail = node;
|
||||
}
|
||||
pss->message_queue_count++;
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
// Request writeable callback (only if not already requested)
|
||||
if (!pss->writeable_requested) {
|
||||
pss->writeable_requested = 1;
|
||||
lws_callback_on_writable(wsi);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Process Queue (Writeable Callback)
|
||||
```c
|
||||
int process_message_queue(struct lws* wsi, struct per_session_data* pss)
|
||||
{
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
|
||||
// Get next message from queue
|
||||
message_queue_node_t* node = pss->message_queue_head;
|
||||
if (!node) {
|
||||
pss->writeable_requested = 0;
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
return 0; // Queue empty
|
||||
}
|
||||
|
||||
// Remove from queue
|
||||
pss->message_queue_head = node->next;
|
||||
if (!pss->message_queue_head) {
|
||||
pss->message_queue_tail = NULL;
|
||||
}
|
||||
pss->message_queue_count--;
|
||||
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
// Write message (libwebsockets handles partial writes)
|
||||
int result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
|
||||
|
||||
// Free node
|
||||
free(node->data);
|
||||
free(node);
|
||||
|
||||
// If queue not empty, request another callback
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
if (pss->message_queue_head) {
|
||||
lws_callback_on_writable(wsi);
|
||||
} else {
|
||||
pss->writeable_requested = 0;
|
||||
}
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
return (result < 0) ? -1 : 0;
|
||||
}
|
||||
```
|
||||
|
||||
## Refactoring Changes
|
||||
|
||||
### Before (WRONG - Direct Write)
|
||||
```c
|
||||
// websockets.c:855 - OK response
|
||||
int write_result = lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
|
||||
if (write_result < 0) {
|
||||
DEBUG_ERROR("Write failed");
|
||||
} else if ((size_t)write_result != response_len) {
|
||||
// Partial write - queue remaining data
|
||||
queue_websocket_write(wsi, pss, ...);
|
||||
}
|
||||
```
|
||||
|
||||
### After (CORRECT - Queue Message)
|
||||
```c
|
||||
// websockets.c:855 - OK response
|
||||
queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT);
|
||||
// That's it! Writeable callback will handle the actual write
|
||||
```
|
||||
|
||||
### Before (WRONG - Direct Write in Broadcast)
|
||||
```c
|
||||
// subscriptions.c:667 - EVENT broadcast
|
||||
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
|
||||
if (write_result < 0) {
|
||||
DEBUG_ERROR("Write failed");
|
||||
} else if ((size_t)write_result != msg_len) {
|
||||
queue_websocket_write(...);
|
||||
}
|
||||
```
|
||||
|
||||
### After (CORRECT - Queue Message)
|
||||
```c
|
||||
// subscriptions.c:667 - EVENT broadcast
|
||||
struct per_session_data* pss = lws_wsi_user(current_temp->wsi);
|
||||
queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT);
|
||||
// Writeable callback will handle the actual write
|
||||
```
|
||||
|
||||
## Benefits of Correct Pattern
|
||||
|
||||
1. **No Partial Write Handling Needed**
|
||||
- Libwebsockets handles partial writes internally
|
||||
- We just queue complete messages
|
||||
|
||||
2. **No "Write Already Pending" Errors**
|
||||
- Queue can hold unlimited messages
|
||||
- Each processed sequentially from callback
|
||||
|
||||
3. **Thread Safety**
|
||||
- Queue operations protected by session lock
|
||||
- Write only from single callback thread
|
||||
|
||||
4. **Frame Atomicity**
|
||||
- Libwebsockets ensures complete frame transmission
|
||||
- No interleaved partial writes
|
||||
|
||||
5. **Simpler Code**
|
||||
- No complex partial write state machine
|
||||
- Just queue and forget
|
||||
|
||||
6. **Better Performance**
|
||||
- Libwebsockets optimizes write timing
|
||||
- Batches writes when socket ready
|
||||
|
||||
## Migration Steps
|
||||
|
||||
1. ✅ Identify all `lws_write()` call sites
|
||||
2. ✅ Confirm violation of libwebsockets pattern
|
||||
3. ⏳ Design message queue structure
|
||||
4. ⏳ Implement `queue_message()` function
|
||||
5. ⏳ Implement `process_message_queue()` function
|
||||
6. ⏳ Update `per_session_data` structure
|
||||
7. ⏳ Refactor OK response to use queue
|
||||
8. ⏳ Refactor EOSE response to use queue
|
||||
9. ⏳ Refactor COUNT response to use queue
|
||||
10. ⏳ Refactor EVENT broadcast to use queue
|
||||
11. ⏳ Update `LWS_CALLBACK_SERVER_WRITEABLE` handler
|
||||
12. ⏳ Add queue cleanup in `LWS_CALLBACK_CLOSED`
|
||||
13. ⏳ Remove old partial write code
|
||||
14. ⏳ Test with rapid multiple events
|
||||
15. ⏳ Test with large events (>4KB)
|
||||
16. ⏳ Test under load
|
||||
17. ⏳ Verify no frame errors
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### Test 1: Multiple Rapid Events
|
||||
```bash
|
||||
# Send 10 events rapidly to same client
|
||||
for i in {1..10}; do
|
||||
echo '["EVENT",{"kind":1,"content":"test'$i'","created_at":'$(date +%s)',...}]' | \
|
||||
websocat ws://localhost:8888 &
|
||||
done
|
||||
```
|
||||
|
||||
**Expected**: All events queued and sent sequentially, no errors
|
||||
|
||||
### Test 2: Large Events
|
||||
```bash
|
||||
# Send event >4KB (forces multiple socket writes)
|
||||
nak event --content "$(head -c 5000 /dev/urandom | base64)" | \
|
||||
websocat ws://localhost:8888
|
||||
```
|
||||
|
||||
**Expected**: Event queued, libwebsockets handles partial writes internally
|
||||
|
||||
### Test 3: Concurrent Connections
|
||||
```bash
|
||||
# 100 concurrent connections, each sending events
|
||||
for i in {1..100}; do
|
||||
(echo '["REQ","sub'$i'",{}]'; sleep 1) | websocat ws://localhost:8888 &
|
||||
done
|
||||
```
|
||||
|
||||
**Expected**: All subscriptions work, events broadcast correctly
|
||||
|
||||
## Success Criteria
|
||||
|
||||
- ✅ No `lws_write()` calls outside `LWS_CALLBACK_SERVER_WRITEABLE`
|
||||
- ✅ No "write already pending" errors in logs
|
||||
- ✅ No "Invalid frame header" errors on client side
|
||||
- ✅ All messages delivered in correct order
|
||||
- ✅ Large events (>4KB) handled correctly
|
||||
- ✅ Multiple rapid events to same client work
|
||||
- ✅ Concurrent connections stable under load
|
||||
|
||||
## References
|
||||
|
||||
- [libwebsockets documentation](https://libwebsockets.org/lws-api-doc-main/html/index.html)
|
||||
- [LWS_CALLBACK_SERVER_WRITEABLE](https://libwebsockets.org/lws-api-doc-main/html/group__callback-when-writeable.html)
|
||||
- [lws_callback_on_writable()](https://libwebsockets.org/lws-api-doc-main/html/group__callback-when-writeable.html#ga96f3ad8e1e2c3e0c8e0b0e5e5e5e5e5e)
|
||||
200
docs/websocket_write_queue_design.md
Normal file
200
docs/websocket_write_queue_design.md
Normal file
@@ -0,0 +1,200 @@
|
||||
# WebSocket Write Queue Design
|
||||
|
||||
## Problem Statement
|
||||
|
||||
The current partial write handling implementation uses a single buffer per session, which fails when multiple events need to be sent to the same client in rapid succession. This causes:
|
||||
|
||||
1. First event gets partial write → queued successfully
|
||||
2. Second event tries to write → **FAILS** with "write already pending"
|
||||
3. Subsequent events fail similarly, causing data loss
|
||||
|
||||
### Server Log Evidence
|
||||
```
|
||||
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
|
||||
[TRACE] Queued partial write: len=2123
|
||||
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
|
||||
[WARN] queue_websocket_write: write already pending, cannot queue new write
|
||||
[ERROR] Failed to queue partial EVENT write for sub=1
|
||||
```
|
||||
|
||||
## Root Cause
|
||||
|
||||
WebSocket frames must be sent **atomically** - you cannot interleave multiple frames. The current single-buffer approach correctly enforces this, but it rejects new writes instead of queuing them.
|
||||
|
||||
## Solution: Write Queue Architecture
|
||||
|
||||
### Design Principles
|
||||
|
||||
1. **Frame Atomicity**: Complete one WebSocket frame before starting the next
|
||||
2. **Sequential Processing**: Process queued writes in FIFO order
|
||||
3. **Memory Safety**: Proper cleanup on connection close or errors
|
||||
4. **Thread Safety**: Protect queue operations with existing session lock
|
||||
|
||||
### Data Structures
|
||||
|
||||
#### Write Queue Node
|
||||
```c
|
||||
struct write_queue_node {
|
||||
unsigned char* buffer; // Buffer with LWS_PRE space
|
||||
size_t total_len; // Total length of data to write
|
||||
size_t offset; // How much has been written so far
|
||||
int write_type; // LWS_WRITE_TEXT, etc.
|
||||
struct write_queue_node* next; // Next node in queue
|
||||
};
|
||||
```
|
||||
|
||||
#### Per-Session Write Queue
|
||||
```c
|
||||
struct per_session_data {
|
||||
// ... existing fields ...
|
||||
|
||||
// Write queue for handling multiple pending writes
|
||||
struct write_queue_node* write_queue_head; // First item to write
|
||||
struct write_queue_node* write_queue_tail; // Last item in queue
|
||||
int write_queue_length; // Number of items in queue
|
||||
int write_in_progress; // Flag: 1 if currently writing
|
||||
};
|
||||
```
|
||||
|
||||
### Algorithm Flow
|
||||
|
||||
#### 1. Enqueue Write (`queue_websocket_write`)
|
||||
|
||||
```
|
||||
IF write_queue is empty AND no write in progress:
|
||||
- Attempt immediate write with lws_write()
|
||||
- IF complete:
|
||||
- Return success
|
||||
- ELSE (partial write):
|
||||
- Create queue node with remaining data
|
||||
- Add to queue
|
||||
- Set write_in_progress flag
|
||||
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
||||
ELSE:
|
||||
- Create queue node with full data
|
||||
- Append to queue tail
|
||||
- IF no write in progress:
|
||||
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
||||
```
|
||||
|
||||
#### 2. Process Queue (`process_pending_write`)
|
||||
|
||||
```
|
||||
WHILE write_queue is not empty:
|
||||
- Get head node
|
||||
- Calculate remaining data (total_len - offset)
|
||||
- Attempt write with lws_write()
|
||||
|
||||
IF write fails (< 0):
|
||||
- Log error
|
||||
- Remove and free head node
|
||||
- Continue to next node
|
||||
|
||||
ELSE IF partial write (< remaining):
|
||||
- Update offset
|
||||
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
||||
- Break (wait for next callback)
|
||||
|
||||
ELSE (complete write):
|
||||
- Remove and free head node
|
||||
- Continue to next node
|
||||
|
||||
IF queue is empty:
|
||||
- Clear write_in_progress flag
|
||||
```
|
||||
|
||||
#### 3. Cleanup (`LWS_CALLBACK_CLOSED`)
|
||||
|
||||
```
|
||||
WHILE write_queue is not empty:
|
||||
- Get head node
|
||||
- Free buffer
|
||||
- Free node
|
||||
- Move to next
|
||||
Clear queue pointers
|
||||
```
|
||||
|
||||
### Memory Management
|
||||
|
||||
1. **Allocation**: Each queue node allocates buffer with `LWS_PRE + data_len`
|
||||
2. **Ownership**: Queue owns all buffers until write completes or connection closes
|
||||
3. **Deallocation**: Free buffer and node when:
|
||||
- Write completes successfully
|
||||
- Write fails with error
|
||||
- Connection closes
|
||||
|
||||
### Thread Safety
|
||||
|
||||
- Use existing `pss->session_lock` to protect queue operations
|
||||
- Lock during:
|
||||
- Enqueue operations
|
||||
- Dequeue operations
|
||||
- Queue traversal for cleanup
|
||||
|
||||
### Performance Considerations
|
||||
|
||||
1. **Queue Length Limit**: Implement max queue length (e.g., 100 items) to prevent memory exhaustion
|
||||
2. **Memory Pressure**: Monitor total queued bytes per session
|
||||
3. **Backpressure**: If queue exceeds limit, close connection with NOTICE
|
||||
|
||||
### Error Handling
|
||||
|
||||
1. **Allocation Failure**: Return error, log, send NOTICE to client
|
||||
2. **Write Failure**: Remove failed frame, continue with next
|
||||
3. **Queue Overflow**: Close connection with appropriate NOTICE
|
||||
|
||||
## Implementation Plan
|
||||
|
||||
### Phase 1: Data Structure Changes
|
||||
1. Add `write_queue_node` structure to `websockets.h`
|
||||
2. Update `per_session_data` with queue fields
|
||||
3. Remove old single-buffer fields
|
||||
|
||||
### Phase 2: Queue Operations
|
||||
1. Implement `enqueue_write()` helper
|
||||
2. Implement `dequeue_write()` helper
|
||||
3. Update `queue_websocket_write()` to use queue
|
||||
4. Update `process_pending_write()` to process queue
|
||||
|
||||
### Phase 3: Integration
|
||||
1. Update all `lws_write()` call sites
|
||||
2. Update `LWS_CALLBACK_CLOSED` cleanup
|
||||
3. Add queue length monitoring
|
||||
|
||||
### Phase 4: Testing
|
||||
1. Test with rapid multiple events to same client
|
||||
2. Test with large events (>4KB)
|
||||
3. Test under load with concurrent connections
|
||||
4. Verify no "Invalid frame header" errors
|
||||
|
||||
## Expected Outcomes
|
||||
|
||||
1. **No More Rejections**: All writes queued successfully
|
||||
2. **Frame Integrity**: Complete frames sent atomically
|
||||
3. **Memory Safety**: Proper cleanup on all paths
|
||||
4. **Performance**: Minimal overhead for queue management
|
||||
|
||||
## Metrics to Monitor
|
||||
|
||||
1. Average queue length per session
|
||||
2. Maximum queue length observed
|
||||
3. Queue overflow events (if limit implemented)
|
||||
4. Write completion rate
|
||||
5. Partial write frequency
|
||||
|
||||
## Alternative Approaches Considered
|
||||
|
||||
### 1. Larger Single Buffer
|
||||
**Rejected**: Doesn't solve the fundamental problem of multiple concurrent writes
|
||||
|
||||
### 2. Immediate Write Retry
|
||||
**Rejected**: Could cause busy-waiting and CPU waste
|
||||
|
||||
### 3. Drop Frames on Conflict
|
||||
**Rejected**: Violates reliability requirements
|
||||
|
||||
## References
|
||||
|
||||
- libwebsockets documentation on `lws_write()` and `LWS_CALLBACK_SERVER_WRITEABLE`
|
||||
- WebSocket RFC 6455 on frame structure
|
||||
- Nostr NIP-01 on relay-to-client communication
|
||||
11
notes.txt
11
notes.txt
@@ -39,6 +39,11 @@ Even simpler: Use this one-liner
|
||||
cd /usr/local/bin/c_relay
|
||||
sudo -u c-relay ./c_relay --debug-level=5 & sleep 2 && sudo gdb -p $(pgrep c_relay)
|
||||
|
||||
Inside gdb, after attaching:
|
||||
|
||||
(gdb) continue
|
||||
Or shorter:
|
||||
(gdb) c
|
||||
|
||||
|
||||
How to View the Logs
|
||||
@@ -75,4 +80,8 @@ sudo systemctl status rsyslog
|
||||
|
||||
sudo -u c-relay ./c_relay --debug-level=5 -r 85d0b37e2ae822966dcadd06b2dc9368cde73865f90ea4d44f8b57d47ef0820a -a 1ec454734dcbf6fe54901ce25c0c7c6bca5edd89443416761fadc321d38df139
|
||||
|
||||
./c_relay_static_x86_64 -p 7889 --debug-level=5 -r 85d0b37e2ae822966dcadd06b2dc9368cde73865f90ea4d44f8b57d47ef0820a -a 1ec454734dcbf6fe54901ce25c0c7c6bca5edd89443416761fadc321d38df139
|
||||
./c_relay_static_x86_64 -p 7889 --debug-level=5 -r 85d0b37e2ae822966dcadd06b2dc9368cde73865f90ea4d44f8b57d47ef0820a -a 1ec454734dcbf6fe54901ce25c0c7c6bca5edd89443416761fadc321d38df139
|
||||
|
||||
|
||||
sudo ufw allow 8888/tcp
|
||||
sudo ufw delete allow 8888/tcp
|
||||
|
||||
156
src/api.c
156
src/api.c
@@ -45,6 +45,9 @@ int update_config_in_table(const char* key, const char* value);
|
||||
// Forward declaration for monitoring helper function
|
||||
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void));
|
||||
|
||||
// Forward declaration for CPU metrics query function
|
||||
cJSON* query_cpu_metrics(void);
|
||||
|
||||
// Monitoring system helper functions
|
||||
int get_monitoring_throttle_seconds(void) {
|
||||
return get_config_int("kind_24567_reporting_throttle_sec", 5);
|
||||
@@ -236,7 +239,7 @@ cJSON* query_active_subscriptions(void) {
|
||||
const char* sql =
|
||||
"SELECT COUNT(*) as total_subs, "
|
||||
"COUNT(DISTINCT client_ip) as client_count "
|
||||
"FROM subscription_events "
|
||||
"FROM subscriptions "
|
||||
"WHERE event_type = 'created' AND ended_at IS NULL";
|
||||
|
||||
if (sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL) != SQLITE_OK) {
|
||||
@@ -258,7 +261,7 @@ cJSON* query_active_subscriptions(void) {
|
||||
const char* max_sql =
|
||||
"SELECT MAX(sub_count) FROM ("
|
||||
" SELECT COUNT(*) as sub_count "
|
||||
" FROM subscription_events "
|
||||
" FROM subscriptions "
|
||||
" WHERE event_type = 'created' AND ended_at IS NULL "
|
||||
" GROUP BY client_ip"
|
||||
")";
|
||||
@@ -294,7 +297,7 @@ cJSON* query_active_subscriptions(void) {
|
||||
}
|
||||
|
||||
// Query detailed subscription information from database log (ADMIN ONLY)
|
||||
// Uses subscription_events table instead of in-memory iteration to avoid mutex contention
|
||||
// Uses subscriptions table instead of in-memory iteration to avoid mutex contention
|
||||
cJSON* query_subscription_details(void) {
|
||||
extern sqlite3* g_db;
|
||||
if (!g_db) {
|
||||
@@ -302,13 +305,13 @@ cJSON* query_subscription_details(void) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Query active subscriptions directly from subscription_events table
|
||||
// Query active subscriptions directly from subscriptions table
|
||||
// Get subscriptions that were created but not yet closed/expired/disconnected
|
||||
sqlite3_stmt* stmt;
|
||||
const char* sql =
|
||||
"SELECT subscription_id, client_ip, filter_json, events_sent, "
|
||||
"SELECT subscription_id, client_ip, wsi_pointer, filter_json, events_sent, "
|
||||
"created_at, (strftime('%s', 'now') - created_at) as duration_seconds "
|
||||
"FROM subscription_events "
|
||||
"FROM subscriptions "
|
||||
"WHERE event_type = 'created' AND ended_at IS NULL "
|
||||
"ORDER BY created_at DESC LIMIT 100";
|
||||
|
||||
@@ -332,14 +335,16 @@ cJSON* query_subscription_details(void) {
|
||||
// Extract subscription data from database
|
||||
const char* sub_id = (const char*)sqlite3_column_text(stmt, 0);
|
||||
const char* client_ip = (const char*)sqlite3_column_text(stmt, 1);
|
||||
const char* filter_json = (const char*)sqlite3_column_text(stmt, 2);
|
||||
long long events_sent = sqlite3_column_int64(stmt, 3);
|
||||
long long created_at = sqlite3_column_int64(stmt, 4);
|
||||
long long duration_seconds = sqlite3_column_int64(stmt, 5);
|
||||
const char* wsi_pointer = (const char*)sqlite3_column_text(stmt, 2);
|
||||
const char* filter_json = (const char*)sqlite3_column_text(stmt, 3);
|
||||
long long events_sent = sqlite3_column_int64(stmt, 4);
|
||||
long long created_at = sqlite3_column_int64(stmt, 5);
|
||||
long long duration_seconds = sqlite3_column_int64(stmt, 6);
|
||||
|
||||
// Add basic subscription info
|
||||
cJSON_AddStringToObject(sub_obj, "id", sub_id ? sub_id : "");
|
||||
cJSON_AddStringToObject(sub_obj, "client_ip", client_ip ? client_ip : "");
|
||||
cJSON_AddStringToObject(sub_obj, "wsi_pointer", wsi_pointer ? wsi_pointer : "");
|
||||
cJSON_AddNumberToObject(sub_obj, "created_at", (double)created_at);
|
||||
cJSON_AddNumberToObject(sub_obj, "duration_seconds", (double)duration_seconds);
|
||||
cJSON_AddNumberToObject(sub_obj, "events_sent", events_sent);
|
||||
@@ -372,8 +377,8 @@ cJSON* query_subscription_details(void) {
|
||||
return subscriptions_data;
|
||||
}
|
||||
|
||||
// Generate and broadcast monitoring event
|
||||
int generate_monitoring_event(void) {
|
||||
// Generate event-driven monitoring events (triggered by event storage)
|
||||
int generate_event_driven_monitoring(void) {
|
||||
// Generate event_kinds monitoring event
|
||||
if (generate_monitoring_event_for_type("event_kinds", query_event_kind_distribution) != 0) {
|
||||
DEBUG_ERROR("Failed to generate event_kinds monitoring event");
|
||||
@@ -398,16 +403,45 @@ int generate_monitoring_event(void) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Generate CPU metrics monitoring event (also triggered by event storage)
|
||||
if (generate_monitoring_event_for_type("cpu_metrics", query_cpu_metrics) != 0) {
|
||||
DEBUG_ERROR("Failed to generate cpu_metrics monitoring event");
|
||||
return -1;
|
||||
}
|
||||
|
||||
DEBUG_INFO("Generated and broadcast event-driven monitoring events");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Generate subscription-driven monitoring events (triggered by subscription changes)
|
||||
int generate_subscription_driven_monitoring(void) {
|
||||
// Generate active_subscriptions monitoring event (subscription changes affect this)
|
||||
if (generate_monitoring_event_for_type("active_subscriptions", query_active_subscriptions) != 0) {
|
||||
DEBUG_ERROR("Failed to generate active_subscriptions monitoring event");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Generate subscription_details monitoring event (admin-only)
|
||||
if (generate_monitoring_event_for_type("subscription_details", query_subscription_details) != 0) {
|
||||
DEBUG_ERROR("Failed to generate subscription_details monitoring event");
|
||||
return -1;
|
||||
}
|
||||
|
||||
DEBUG_INFO("Generated and broadcast all monitoring events");
|
||||
// Generate CPU metrics monitoring event (also triggered by subscription changes)
|
||||
if (generate_monitoring_event_for_type("cpu_metrics", query_cpu_metrics) != 0) {
|
||||
DEBUG_ERROR("Failed to generate cpu_metrics monitoring event");
|
||||
return -1;
|
||||
}
|
||||
|
||||
DEBUG_INFO("Generated and broadcast subscription-driven monitoring events");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Generate and broadcast monitoring event (legacy function - now calls event-driven version)
|
||||
int generate_monitoring_event(void) {
|
||||
return generate_event_driven_monitoring();
|
||||
}
|
||||
|
||||
// Helper function to generate monitoring event for a specific type
|
||||
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void)) {
|
||||
// Query the monitoring data
|
||||
@@ -500,20 +534,42 @@ void monitoring_on_event_stored(void) {
|
||||
static time_t last_monitoring_time = 0;
|
||||
time_t current_time = time(NULL);
|
||||
int throttle_seconds = get_monitoring_throttle_seconds();
|
||||
|
||||
|
||||
if (current_time - last_monitoring_time < throttle_seconds) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Check if anyone is subscribed to monitoring events (kind 24567)
|
||||
// This is the ONLY activation check needed - if someone subscribes, they want monitoring
|
||||
if (!has_subscriptions_for_kind(24567)) {
|
||||
return; // No subscribers = no expensive operations
|
||||
}
|
||||
|
||||
// Generate monitoring events only when someone is listening
|
||||
|
||||
// Generate event-driven monitoring events only when someone is listening
|
||||
last_monitoring_time = current_time;
|
||||
generate_monitoring_event();
|
||||
generate_event_driven_monitoring();
|
||||
}
|
||||
|
||||
// Monitoring hook called when subscriptions change (create/close)
|
||||
void monitoring_on_subscription_change(void) {
|
||||
// Check throttling first (cheapest check)
|
||||
static time_t last_monitoring_time = 0;
|
||||
time_t current_time = time(NULL);
|
||||
int throttle_seconds = get_monitoring_throttle_seconds();
|
||||
|
||||
if (current_time - last_monitoring_time < throttle_seconds) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if anyone is subscribed to monitoring events (kind 24567)
|
||||
// This is the ONLY activation check needed - if someone subscribes, they want monitoring
|
||||
if (!has_subscriptions_for_kind(24567)) {
|
||||
return; // No subscribers = no expensive operations
|
||||
}
|
||||
|
||||
// Generate subscription-driven monitoring events only when someone is listening
|
||||
last_monitoring_time = current_time;
|
||||
generate_subscription_driven_monitoring();
|
||||
}
|
||||
|
||||
// Forward declaration for known_configs (defined in config.c)
|
||||
@@ -1105,6 +1161,68 @@ int handle_embedded_file_writeable(struct lws* wsi) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
// Query CPU usage metrics
|
||||
cJSON* query_cpu_metrics(void) {
|
||||
cJSON* cpu_stats = cJSON_CreateObject();
|
||||
cJSON_AddStringToObject(cpu_stats, "data_type", "cpu_metrics");
|
||||
cJSON_AddNumberToObject(cpu_stats, "timestamp", (double)time(NULL));
|
||||
|
||||
// Read process CPU times from /proc/self/stat
|
||||
FILE* proc_stat = fopen("/proc/self/stat", "r");
|
||||
if (proc_stat) {
|
||||
unsigned long utime, stime; // user and system CPU time in clock ticks
|
||||
if (fscanf(proc_stat, "%*d %*s %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %lu %lu", &utime, &stime) == 2) {
|
||||
unsigned long total_proc_time = utime + stime;
|
||||
|
||||
// Get system CPU times from /proc/stat
|
||||
FILE* sys_stat = fopen("/proc/stat", "r");
|
||||
if (sys_stat) {
|
||||
unsigned long user, nice, system, idle, iowait, irq, softirq;
|
||||
if (fscanf(sys_stat, "cpu %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &system, &idle, &iowait, &irq, &softirq) == 7) {
|
||||
unsigned long total_sys_time = user + nice + system + idle + iowait + irq + softirq;
|
||||
|
||||
// Calculate CPU percentages (simplified - would need deltas for accuracy)
|
||||
// For now, just store the raw values - frontend can calculate deltas
|
||||
cJSON_AddNumberToObject(cpu_stats, "process_cpu_time", (double)total_proc_time);
|
||||
cJSON_AddNumberToObject(cpu_stats, "system_cpu_time", (double)total_sys_time);
|
||||
cJSON_AddNumberToObject(cpu_stats, "system_idle_time", (double)idle);
|
||||
}
|
||||
fclose(sys_stat);
|
||||
}
|
||||
|
||||
// Get current CPU core the process is running on
|
||||
int current_core = sched_getcpu();
|
||||
if (current_core >= 0) {
|
||||
cJSON_AddNumberToObject(cpu_stats, "current_cpu_core", current_core);
|
||||
}
|
||||
}
|
||||
fclose(proc_stat);
|
||||
}
|
||||
|
||||
// Get process ID
|
||||
pid_t pid = getpid();
|
||||
cJSON_AddNumberToObject(cpu_stats, "process_id", (double)pid);
|
||||
|
||||
// Get memory usage from /proc/self/status
|
||||
FILE* mem_stat = fopen("/proc/self/status", "r");
|
||||
if (mem_stat) {
|
||||
char line[256];
|
||||
while (fgets(line, sizeof(line), mem_stat)) {
|
||||
if (strncmp(line, "VmRSS:", 6) == 0) {
|
||||
unsigned long rss_kb;
|
||||
if (sscanf(line, "VmRSS: %lu kB", &rss_kb) == 1) {
|
||||
double rss_mb = rss_kb / 1024.0;
|
||||
cJSON_AddNumberToObject(cpu_stats, "memory_usage_mb", rss_mb);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
fclose(mem_stat);
|
||||
}
|
||||
|
||||
return cpu_stats;
|
||||
}
|
||||
|
||||
// Generate stats JSON from database queries
|
||||
char* generate_stats_json(void) {
|
||||
extern sqlite3* g_db;
|
||||
@@ -2262,7 +2380,7 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
|
||||
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
|
||||
}
|
||||
} else {
|
||||
char response_content[512];
|
||||
char response_content[1024];
|
||||
snprintf(response_content, sizeof(response_content),
|
||||
"❌ Unknown monitoring command: %s\n\n"
|
||||
"Available command:\n"
|
||||
|
||||
@@ -61,6 +61,7 @@ int handle_sql_query_unified(cJSON* event, const char* query, char* error_messag
|
||||
|
||||
// Monitoring system functions
|
||||
void monitoring_on_event_stored(void);
|
||||
void monitoring_on_subscription_change(void);
|
||||
int get_monitoring_throttle_seconds(void);
|
||||
|
||||
#endif // API_H
|
||||
File diff suppressed because one or more lines are too long
@@ -10,10 +10,10 @@
|
||||
#define MAIN_H
|
||||
|
||||
// Version information (auto-updated by build system)
|
||||
#define VERSION "v0.7.33"
|
||||
#define VERSION "v0.7.36"
|
||||
#define VERSION_MAJOR 0
|
||||
#define VERSION_MINOR 7
|
||||
#define VERSION_PATCH 33
|
||||
#define VERSION_PATCH 36
|
||||
|
||||
// Relay metadata (authoritative source for NIP-11 information)
|
||||
#define RELAY_NAME "C-Relay"
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
/* Embedded SQL Schema for C Nostr Relay
|
||||
* Generated from db/schema.sql - Do not edit manually
|
||||
* Schema Version: 7
|
||||
* Schema Version: 8
|
||||
*/
|
||||
#ifndef SQL_SCHEMA_H
|
||||
#define SQL_SCHEMA_H
|
||||
|
||||
/* Schema version constant */
|
||||
#define EMBEDDED_SCHEMA_VERSION "7"
|
||||
#define EMBEDDED_SCHEMA_VERSION "8"
|
||||
|
||||
/* Embedded SQL schema as C string literal */
|
||||
static const char* const EMBEDDED_SCHEMA_SQL =
|
||||
@@ -15,7 +15,7 @@ static const char* const EMBEDDED_SCHEMA_SQL =
|
||||
-- Configuration system using config table\n\
|
||||
\n\
|
||||
-- Schema version tracking\n\
|
||||
PRAGMA user_version = 7;\n\
|
||||
PRAGMA user_version = 8;\n\
|
||||
\n\
|
||||
-- Enable foreign key support\n\
|
||||
PRAGMA foreign_keys = ON;\n\
|
||||
@@ -58,8 +58,8 @@ CREATE TABLE schema_info (\n\
|
||||
\n\
|
||||
-- Insert schema metadata\n\
|
||||
INSERT INTO schema_info (key, value) VALUES\n\
|
||||
('version', '7'),\n\
|
||||
('description', 'Hybrid Nostr relay schema with event-based and table-based configuration'),\n\
|
||||
('version', '8'),\n\
|
||||
('description', 'Hybrid Nostr relay schema with subscription deduplication support'),\n\
|
||||
('created_at', strftime('%s', 'now'));\n\
|
||||
\n\
|
||||
-- Helper views for common queries\n\
|
||||
@@ -181,17 +181,19 @@ END;\n\
|
||||
-- Persistent Subscriptions Logging Tables (Phase 2)\n\
|
||||
-- Optional database logging for subscription analytics and debugging\n\
|
||||
\n\
|
||||
-- Subscription events log\n\
|
||||
CREATE TABLE subscription_events (\n\
|
||||
-- Subscriptions log (renamed from subscription_events for clarity)\n\
|
||||
CREATE TABLE subscriptions (\n\
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,\n\
|
||||
subscription_id TEXT NOT NULL, -- Subscription ID from client\n\
|
||||
wsi_pointer TEXT NOT NULL, -- WebSocket pointer address (hex string)\n\
|
||||
client_ip TEXT NOT NULL, -- Client IP address\n\
|
||||
event_type TEXT NOT NULL CHECK (event_type IN ('created', 'closed', 'expired', 'disconnected')),\n\
|
||||
filter_json TEXT, -- JSON representation of filters (for created events)\n\
|
||||
events_sent INTEGER DEFAULT 0, -- Number of events sent to this subscription\n\
|
||||
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),\n\
|
||||
ended_at INTEGER, -- When subscription ended (for closed/expired/disconnected)\n\
|
||||
duration INTEGER -- Computed: ended_at - created_at\n\
|
||||
duration INTEGER, -- Computed: ended_at - created_at\n\
|
||||
UNIQUE(subscription_id, wsi_pointer) -- Prevent duplicate subscriptions per connection\n\
|
||||
);\n\
|
||||
\n\
|
||||
-- Subscription metrics summary\n\
|
||||
@@ -218,10 +220,11 @@ CREATE TABLE event_broadcasts (\n\
|
||||
);\n\
|
||||
\n\
|
||||
-- Indexes for subscription logging performance\n\
|
||||
CREATE INDEX idx_subscription_events_id ON subscription_events(subscription_id);\n\
|
||||
CREATE INDEX idx_subscription_events_type ON subscription_events(event_type);\n\
|
||||
CREATE INDEX idx_subscription_events_created ON subscription_events(created_at DESC);\n\
|
||||
CREATE INDEX idx_subscription_events_client ON subscription_events(client_ip);\n\
|
||||
CREATE INDEX idx_subscriptions_id ON subscriptions(subscription_id);\n\
|
||||
CREATE INDEX idx_subscriptions_type ON subscriptions(event_type);\n\
|
||||
CREATE INDEX idx_subscriptions_created ON subscriptions(created_at DESC);\n\
|
||||
CREATE INDEX idx_subscriptions_client ON subscriptions(client_ip);\n\
|
||||
CREATE INDEX idx_subscriptions_wsi ON subscriptions(wsi_pointer);\n\
|
||||
\n\
|
||||
CREATE INDEX idx_subscription_metrics_date ON subscription_metrics(date DESC);\n\
|
||||
\n\
|
||||
@@ -231,10 +234,10 @@ CREATE INDEX idx_event_broadcasts_time ON event_broadcasts(broadcast_at DESC);\n
|
||||
\n\
|
||||
-- Trigger to update subscription duration when ended\n\
|
||||
CREATE TRIGGER update_subscription_duration\n\
|
||||
AFTER UPDATE OF ended_at ON subscription_events\n\
|
||||
AFTER UPDATE OF ended_at ON subscriptions\n\
|
||||
WHEN NEW.ended_at IS NOT NULL AND OLD.ended_at IS NULL\n\
|
||||
BEGIN\n\
|
||||
UPDATE subscription_events\n\
|
||||
UPDATE subscriptions\n\
|
||||
SET duration = NEW.ended_at - NEW.created_at\n\
|
||||
WHERE id = NEW.id;\n\
|
||||
END;\n\
|
||||
@@ -249,7 +252,7 @@ SELECT\n\
|
||||
MAX(events_sent) as max_events_sent,\n\
|
||||
AVG(events_sent) as avg_events_sent,\n\
|
||||
COUNT(DISTINCT client_ip) as unique_clients\n\
|
||||
FROM subscription_events\n\
|
||||
FROM subscriptions\n\
|
||||
GROUP BY date(created_at, 'unixepoch')\n\
|
||||
ORDER BY date DESC;\n\
|
||||
\n\
|
||||
@@ -262,10 +265,10 @@ SELECT\n\
|
||||
events_sent,\n\
|
||||
created_at,\n\
|
||||
(strftime('%s', 'now') - created_at) as duration_seconds\n\
|
||||
FROM subscription_events\n\
|
||||
FROM subscriptions\n\
|
||||
WHERE event_type = 'created'\n\
|
||||
AND subscription_id NOT IN (\n\
|
||||
SELECT subscription_id FROM subscription_events\n\
|
||||
SELECT subscription_id FROM subscriptions\n\
|
||||
WHERE event_type IN ('closed', 'expired', 'disconnected')\n\
|
||||
);\n\
|
||||
\n\
|
||||
|
||||
@@ -238,27 +238,81 @@ void free_subscription(subscription_t* sub) {
|
||||
// Add subscription to global manager (thread-safe)
|
||||
int add_subscription_to_manager(subscription_t* sub) {
|
||||
if (!sub) return -1;
|
||||
|
||||
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
// Check global limits
|
||||
if (g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
|
||||
|
||||
// Check for existing subscription with same ID and WebSocket connection
|
||||
// Remove it first to prevent duplicates (implements subscription replacement per NIP-01)
|
||||
subscription_t** current = &g_subscription_manager.active_subscriptions;
|
||||
int found_duplicate = 0;
|
||||
subscription_t* duplicate_old = NULL;
|
||||
|
||||
while (*current) {
|
||||
subscription_t* existing = *current;
|
||||
|
||||
// Match by subscription ID and WebSocket pointer
|
||||
if (strcmp(existing->id, sub->id) == 0 && existing->wsi == sub->wsi) {
|
||||
// Found duplicate: mark inactive and unlink from global list under lock
|
||||
existing->active = 0;
|
||||
*current = existing->next;
|
||||
g_subscription_manager.total_subscriptions--;
|
||||
found_duplicate = 1;
|
||||
duplicate_old = existing; // defer free until after per-session unlink
|
||||
break;
|
||||
}
|
||||
|
||||
current = &(existing->next);
|
||||
}
|
||||
|
||||
// Check global limits (only if not replacing an existing subscription)
|
||||
if (!found_duplicate && g_subscription_manager.total_subscriptions >= g_subscription_manager.max_total_subscriptions) {
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
DEBUG_ERROR("Maximum total subscriptions reached");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
// Add to global list
|
||||
sub->next = g_subscription_manager.active_subscriptions;
|
||||
g_subscription_manager.active_subscriptions = sub;
|
||||
g_subscription_manager.total_subscriptions++;
|
||||
g_subscription_manager.total_created++;
|
||||
|
||||
|
||||
// Only increment total_created if this is a new subscription (not a replacement)
|
||||
if (!found_duplicate) {
|
||||
g_subscription_manager.total_created++;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
// Log subscription creation to database
|
||||
|
||||
// If we replaced an existing subscription, unlink it from the per-session list before freeing
|
||||
if (duplicate_old) {
|
||||
// Obtain per-session data for this wsi
|
||||
struct per_session_data* pss = (struct per_session_data*) lws_wsi_user(duplicate_old->wsi);
|
||||
if (pss) {
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
struct subscription** scur = &pss->subscriptions;
|
||||
while (*scur) {
|
||||
if (*scur == duplicate_old) {
|
||||
// Unlink by pointer identity to avoid removing the newly-added one
|
||||
*scur = duplicate_old->session_next;
|
||||
if (pss->subscription_count > 0) {
|
||||
pss->subscription_count--;
|
||||
}
|
||||
break;
|
||||
}
|
||||
scur = &((*scur)->session_next);
|
||||
}
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
}
|
||||
// Now safe to free the old subscription
|
||||
free_subscription(duplicate_old);
|
||||
}
|
||||
|
||||
// Log subscription creation to database (INSERT OR REPLACE handles duplicates)
|
||||
log_subscription_created(sub);
|
||||
|
||||
// Trigger monitoring update for subscription changes
|
||||
monitoring_on_subscription_change();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -306,6 +360,9 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
|
||||
// Update events sent counter before freeing
|
||||
update_subscription_events_sent(sub_id_copy, events_sent_copy);
|
||||
|
||||
// Trigger monitoring update for subscription changes
|
||||
monitoring_on_subscription_change();
|
||||
|
||||
free_subscription(sub);
|
||||
return 0;
|
||||
}
|
||||
@@ -324,10 +381,7 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
|
||||
|
||||
// Check if an event matches a subscription filter
|
||||
int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
|
||||
DEBUG_TRACE("Checking event against subscription filter");
|
||||
|
||||
if (!event || !filter) {
|
||||
DEBUG_TRACE("Exiting event_matches_filter - null parameters");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -503,7 +557,6 @@ int event_matches_filter(cJSON* event, subscription_filter_t* filter) {
|
||||
}
|
||||
}
|
||||
|
||||
DEBUG_TRACE("Exiting event_matches_filter - match found");
|
||||
return 1; // All filters passed
|
||||
}
|
||||
|
||||
@@ -526,10 +579,7 @@ int event_matches_subscription(cJSON* event, subscription_t* subscription) {
|
||||
|
||||
// Broadcast event to all matching subscriptions (thread-safe)
|
||||
int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
DEBUG_TRACE("Broadcasting event to subscriptions");
|
||||
|
||||
if (!event) {
|
||||
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions - null event");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -611,12 +661,19 @@ int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
if (buf) {
|
||||
memcpy(buf + LWS_PRE, msg_str, msg_len);
|
||||
|
||||
// Send to WebSocket connection with error checking
|
||||
// Note: lws_write can fail if connection is closed, but won't crash
|
||||
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
|
||||
if (write_result >= 0) {
|
||||
// DEBUG: Log WebSocket frame details before sending
|
||||
DEBUG_TRACE("WS_FRAME_SEND: type=EVENT sub=%s len=%zu data=%.100s%s",
|
||||
current_temp->id,
|
||||
msg_len,
|
||||
msg_str,
|
||||
msg_len > 100 ? "..." : "");
|
||||
|
||||
// Queue message for proper libwebsockets pattern
|
||||
struct per_session_data* pss = (struct per_session_data*)lws_wsi_user(current_temp->wsi);
|
||||
if (queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT) == 0) {
|
||||
// Message queued successfully
|
||||
broadcasts++;
|
||||
|
||||
|
||||
// Update events sent counter for this subscription
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
subscription_t* update_sub = g_subscription_manager.active_subscriptions;
|
||||
@@ -630,12 +687,14 @@ int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
update_sub = update_sub->next;
|
||||
}
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
|
||||
// Log event broadcast to database (optional - can be disabled for performance)
|
||||
cJSON* event_id_obj = cJSON_GetObjectItem(event, "id");
|
||||
if (event_id_obj && cJSON_IsString(event_id_obj)) {
|
||||
log_event_broadcast(cJSON_GetStringValue(event_id_obj), current_temp->id, current_temp->client_ip);
|
||||
}
|
||||
} else {
|
||||
DEBUG_ERROR("Failed to queue EVENT message for sub=%s", current_temp->id);
|
||||
}
|
||||
|
||||
free(buf);
|
||||
@@ -660,7 +719,6 @@ int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
DEBUG_LOG("Event broadcast complete: %d subscriptions matched", broadcasts);
|
||||
DEBUG_TRACE("Exiting broadcast_event_to_subscriptions");
|
||||
return broadcasts;
|
||||
}
|
||||
|
||||
@@ -707,6 +765,10 @@ int has_subscriptions_for_kind(int event_kind) {
|
||||
void log_subscription_created(const subscription_t* sub) {
|
||||
if (!g_db || !sub) return;
|
||||
|
||||
// Convert wsi pointer to string
|
||||
char wsi_str[32];
|
||||
snprintf(wsi_str, sizeof(wsi_str), "%p", (void*)sub->wsi);
|
||||
|
||||
// Create filter JSON for logging
|
||||
char* filter_json = NULL;
|
||||
if (sub->filters) {
|
||||
@@ -753,16 +815,18 @@ void log_subscription_created(const subscription_t* sub) {
|
||||
cJSON_Delete(filters_array);
|
||||
}
|
||||
|
||||
// Use INSERT OR REPLACE to handle duplicates automatically
|
||||
const char* sql =
|
||||
"INSERT INTO subscription_events (subscription_id, client_ip, event_type, filter_json) "
|
||||
"VALUES (?, ?, 'created', ?)";
|
||||
"INSERT OR REPLACE INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type, filter_json) "
|
||||
"VALUES (?, ?, ?, 'created', ?)";
|
||||
|
||||
sqlite3_stmt* stmt;
|
||||
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
||||
if (rc == SQLITE_OK) {
|
||||
sqlite3_bind_text(stmt, 1, sub->id, -1, SQLITE_STATIC);
|
||||
sqlite3_bind_text(stmt, 2, sub->client_ip, -1, SQLITE_STATIC);
|
||||
sqlite3_bind_text(stmt, 3, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
|
||||
sqlite3_bind_text(stmt, 2, wsi_str, -1, SQLITE_TRANSIENT);
|
||||
sqlite3_bind_text(stmt, 3, sub->client_ip, -1, SQLITE_STATIC);
|
||||
sqlite3_bind_text(stmt, 4, filter_json ? filter_json : "[]", -1, SQLITE_TRANSIENT);
|
||||
|
||||
sqlite3_step(stmt);
|
||||
sqlite3_finalize(stmt);
|
||||
@@ -777,8 +841,8 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
|
||||
if (!g_db || !sub_id) return;
|
||||
|
||||
const char* sql =
|
||||
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
||||
"VALUES (?, ?, 'closed')";
|
||||
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
|
||||
"VALUES (?, '', ?, 'closed')";
|
||||
|
||||
sqlite3_stmt* stmt;
|
||||
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
|
||||
@@ -792,7 +856,7 @@ void log_subscription_closed(const char* sub_id, const char* client_ip, const ch
|
||||
|
||||
// Update the corresponding 'created' entry with end time and events sent
|
||||
const char* update_sql =
|
||||
"UPDATE subscription_events "
|
||||
"UPDATE subscriptions "
|
||||
"SET ended_at = strftime('%s', 'now') "
|
||||
"WHERE subscription_id = ? AND event_type = 'created' AND ended_at IS NULL";
|
||||
|
||||
@@ -810,7 +874,7 @@ void log_subscription_disconnected(const char* client_ip) {
|
||||
|
||||
// Mark all active subscriptions for this client as disconnected
|
||||
const char* sql =
|
||||
"UPDATE subscription_events "
|
||||
"UPDATE subscriptions "
|
||||
"SET ended_at = strftime('%s', 'now') "
|
||||
"WHERE client_ip = ? AND event_type = 'created' AND ended_at IS NULL";
|
||||
|
||||
@@ -825,8 +889,8 @@ void log_subscription_disconnected(const char* client_ip) {
|
||||
if (changes > 0) {
|
||||
// Log a disconnection event
|
||||
const char* insert_sql =
|
||||
"INSERT INTO subscription_events (subscription_id, client_ip, event_type) "
|
||||
"VALUES ('disconnect', ?, 'disconnected')";
|
||||
"INSERT INTO subscriptions (subscription_id, wsi_pointer, client_ip, event_type) "
|
||||
"VALUES ('disconnect', '', ?, 'disconnected')";
|
||||
|
||||
rc = sqlite3_prepare_v2(g_db, insert_sql, -1, &stmt, NULL);
|
||||
if (rc == SQLITE_OK) {
|
||||
@@ -863,7 +927,7 @@ void update_subscription_events_sent(const char* sub_id, int events_sent) {
|
||||
if (!g_db || !sub_id) return;
|
||||
|
||||
const char* sql =
|
||||
"UPDATE subscription_events "
|
||||
"UPDATE subscriptions "
|
||||
"SET events_sent = ? "
|
||||
"WHERE subscription_id = ? AND event_type = 'created'";
|
||||
|
||||
|
||||
263
src/websockets.c
263
src/websockets.c
@@ -108,6 +108,136 @@ struct subscription_manager g_subscription_manager;
|
||||
|
||||
|
||||
|
||||
// Message queue functions for proper libwebsockets pattern
|
||||
|
||||
/**
|
||||
* Queue a message for WebSocket writing following libwebsockets' proper pattern.
|
||||
* This function adds messages to a per-session queue and requests writeable callback.
|
||||
*
|
||||
* @param wsi WebSocket instance
|
||||
* @param pss Per-session data containing message queue
|
||||
* @param message Message string to write
|
||||
* @param length Length of message string
|
||||
* @param type LWS_WRITE_* type (LWS_WRITE_TEXT, etc.)
|
||||
* @return 0 on success, -1 on error
|
||||
*/
|
||||
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type) {
|
||||
if (!wsi || !pss || !message || length == 0) {
|
||||
DEBUG_ERROR("queue_message: invalid parameters");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Allocate message queue node
|
||||
struct message_queue_node* node = malloc(sizeof(struct message_queue_node));
|
||||
if (!node) {
|
||||
DEBUG_ERROR("queue_message: failed to allocate queue node");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Allocate buffer with LWS_PRE space
|
||||
size_t buffer_size = LWS_PRE + length;
|
||||
unsigned char* buffer = malloc(buffer_size);
|
||||
if (!buffer) {
|
||||
DEBUG_ERROR("queue_message: failed to allocate message buffer");
|
||||
free(node);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Copy message to buffer with LWS_PRE offset
|
||||
memcpy(buffer + LWS_PRE, message, length);
|
||||
|
||||
// Initialize node
|
||||
node->data = buffer;
|
||||
node->length = length;
|
||||
node->type = type;
|
||||
node->next = NULL;
|
||||
|
||||
// Add to queue (thread-safe)
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
|
||||
if (!pss->message_queue_head) {
|
||||
// Queue was empty
|
||||
pss->message_queue_head = node;
|
||||
pss->message_queue_tail = node;
|
||||
} else {
|
||||
// Add to end of queue
|
||||
pss->message_queue_tail->next = node;
|
||||
pss->message_queue_tail = node;
|
||||
}
|
||||
pss->message_queue_count++;
|
||||
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
// Request writeable callback (only if not already requested)
|
||||
if (!pss->writeable_requested) {
|
||||
pss->writeable_requested = 1;
|
||||
lws_callback_on_writable(wsi);
|
||||
}
|
||||
|
||||
DEBUG_TRACE("Queued message: len=%zu, queue_count=%d", length, pss->message_queue_count);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process message queue when the socket becomes writeable.
|
||||
* This function is called from LWS_CALLBACK_SERVER_WRITEABLE.
|
||||
*
|
||||
* @param wsi WebSocket instance
|
||||
* @param pss Per-session data containing message queue
|
||||
* @return 0 on success, -1 on error
|
||||
*/
|
||||
int process_message_queue(struct lws* wsi, struct per_session_data* pss) {
|
||||
if (!wsi || !pss) {
|
||||
DEBUG_ERROR("process_message_queue: invalid parameters");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get next message from queue (thread-safe)
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
|
||||
struct message_queue_node* node = pss->message_queue_head;
|
||||
if (!node) {
|
||||
// Queue is empty
|
||||
pss->writeable_requested = 0;
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Remove from queue
|
||||
pss->message_queue_head = node->next;
|
||||
if (!pss->message_queue_head) {
|
||||
pss->message_queue_tail = NULL;
|
||||
}
|
||||
pss->message_queue_count--;
|
||||
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
// Write message (libwebsockets handles partial writes internally)
|
||||
int write_result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
|
||||
|
||||
// Free node resources
|
||||
free(node->data);
|
||||
free(node);
|
||||
|
||||
if (write_result < 0) {
|
||||
DEBUG_ERROR("process_message_queue: write failed, result=%d", write_result);
|
||||
return -1;
|
||||
}
|
||||
|
||||
DEBUG_TRACE("Processed message: wrote %d bytes, remaining in queue: %d", write_result, pss->message_queue_count);
|
||||
|
||||
// If queue not empty, request another callback
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
if (pss->message_queue_head) {
|
||||
lws_callback_on_writable(wsi);
|
||||
} else {
|
||||
pss->writeable_requested = 0;
|
||||
}
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
// WEBSOCKET PROTOCOL
|
||||
@@ -719,16 +849,22 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
cJSON_AddItemToArray(response, cJSON_CreateString(cJSON_GetStringValue(event_id)));
|
||||
cJSON_AddItemToArray(response, cJSON_CreateBool(result == 0));
|
||||
cJSON_AddItemToArray(response, cJSON_CreateString(strlen(error_message) > 0 ? error_message : ""));
|
||||
|
||||
|
||||
char *response_str = cJSON_Print(response);
|
||||
if (response_str) {
|
||||
size_t response_len = strlen(response_str);
|
||||
unsigned char *buf = malloc(LWS_PRE + response_len);
|
||||
if (buf) {
|
||||
memcpy(buf + LWS_PRE, response_str, response_len);
|
||||
lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
|
||||
free(buf);
|
||||
|
||||
// DEBUG: Log WebSocket frame details before sending
|
||||
DEBUG_TRACE("WS_FRAME_SEND: type=OK len=%zu data=%.100s%s",
|
||||
response_len,
|
||||
response_str,
|
||||
response_len > 100 ? "..." : "");
|
||||
|
||||
// Queue message for proper libwebsockets pattern
|
||||
if (queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT) != 0) {
|
||||
DEBUG_ERROR("Failed to queue OK response message");
|
||||
}
|
||||
|
||||
free(response_str);
|
||||
}
|
||||
cJSON_Delete(response);
|
||||
@@ -823,12 +959,18 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
char *eose_str = cJSON_Print(eose_response);
|
||||
if (eose_str) {
|
||||
size_t eose_len = strlen(eose_str);
|
||||
unsigned char *buf = malloc(LWS_PRE + eose_len);
|
||||
if (buf) {
|
||||
memcpy(buf + LWS_PRE, eose_str, eose_len);
|
||||
lws_write(wsi, buf + LWS_PRE, eose_len, LWS_WRITE_TEXT);
|
||||
free(buf);
|
||||
|
||||
// DEBUG: Log WebSocket frame details before sending
|
||||
DEBUG_TRACE("WS_FRAME_SEND: type=EOSE len=%zu data=%.100s%s",
|
||||
eose_len,
|
||||
eose_str,
|
||||
eose_len > 100 ? "..." : "");
|
||||
|
||||
// Queue message for proper libwebsockets pattern
|
||||
if (queue_message(wsi, pss, eose_str, eose_len, LWS_WRITE_TEXT) != 0) {
|
||||
DEBUG_ERROR("Failed to queue EOSE message");
|
||||
}
|
||||
|
||||
free(eose_str);
|
||||
}
|
||||
cJSON_Delete(eose_response);
|
||||
@@ -908,9 +1050,22 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
return 0;
|
||||
}
|
||||
|
||||
// CRITICAL FIX: Remove from session list FIRST (while holding lock)
|
||||
// to prevent race condition where global manager frees the subscription
|
||||
// while we're still iterating through the session list
|
||||
// CRITICAL FIX: Mark subscription as inactive in global manager FIRST
|
||||
// This prevents other threads from accessing it during removal
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
subscription_t* target_sub = g_subscription_manager.active_subscriptions;
|
||||
while (target_sub) {
|
||||
if (strcmp(target_sub->id, subscription_id) == 0 && target_sub->wsi == wsi) {
|
||||
target_sub->active = 0; // Mark as inactive immediately
|
||||
break;
|
||||
}
|
||||
target_sub = target_sub->next;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
// Now safe to remove from session list
|
||||
if (pss) {
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
|
||||
@@ -928,8 +1083,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
}
|
||||
|
||||
// Remove from global manager AFTER removing from session list
|
||||
// This prevents use-after-free when iterating session subscriptions
|
||||
// Finally remove from global manager (which will free it)
|
||||
remove_subscription_from_manager(subscription_id, wsi);
|
||||
|
||||
// Subscription closed
|
||||
@@ -972,6 +1126,13 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
}
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
// Handle message queue when socket becomes writeable
|
||||
if (pss) {
|
||||
process_message_queue(wsi, pss);
|
||||
}
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
DEBUG_TRACE("WebSocket connection closed");
|
||||
|
||||
@@ -1005,20 +1166,66 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
|
||||
auth_status,
|
||||
reason);
|
||||
|
||||
// Clean up session subscriptions
|
||||
// Clean up message queue to prevent memory leaks
|
||||
while (pss->message_queue_head) {
|
||||
struct message_queue_node* node = pss->message_queue_head;
|
||||
pss->message_queue_head = node->next;
|
||||
free(node->data);
|
||||
free(node);
|
||||
}
|
||||
pss->message_queue_tail = NULL;
|
||||
pss->message_queue_count = 0;
|
||||
pss->writeable_requested = 0;
|
||||
|
||||
// Clean up session subscriptions - copy IDs first to avoid use-after-free
|
||||
pthread_mutex_lock(&pss->session_lock);
|
||||
|
||||
// First pass: collect subscription IDs safely
|
||||
typedef struct temp_sub_id {
|
||||
char id[SUBSCRIPTION_ID_MAX_LENGTH];
|
||||
struct temp_sub_id* next;
|
||||
} temp_sub_id_t;
|
||||
|
||||
temp_sub_id_t* temp_ids = NULL;
|
||||
temp_sub_id_t* temp_tail = NULL;
|
||||
int temp_count = 0;
|
||||
|
||||
struct subscription* sub = pss->subscriptions;
|
||||
while (sub) {
|
||||
struct subscription* next = sub->session_next;
|
||||
remove_subscription_from_manager(sub->id, wsi);
|
||||
sub = next;
|
||||
if (sub->active) { // Only process active subscriptions
|
||||
temp_sub_id_t* temp = malloc(sizeof(temp_sub_id_t));
|
||||
if (temp) {
|
||||
memcpy(temp->id, sub->id, SUBSCRIPTION_ID_MAX_LENGTH);
|
||||
temp->id[SUBSCRIPTION_ID_MAX_LENGTH - 1] = '\0';
|
||||
temp->next = NULL;
|
||||
|
||||
if (!temp_ids) {
|
||||
temp_ids = temp;
|
||||
temp_tail = temp;
|
||||
} else {
|
||||
temp_tail->next = temp;
|
||||
temp_tail = temp;
|
||||
}
|
||||
temp_count++;
|
||||
}
|
||||
}
|
||||
sub = sub->session_next;
|
||||
}
|
||||
|
||||
// Clear session list immediately
|
||||
pss->subscriptions = NULL;
|
||||
pss->subscription_count = 0;
|
||||
|
||||
pthread_mutex_unlock(&pss->session_lock);
|
||||
|
||||
// Second pass: remove from global manager using copied IDs
|
||||
temp_sub_id_t* current_temp = temp_ids;
|
||||
while (current_temp) {
|
||||
temp_sub_id_t* next_temp = current_temp->next;
|
||||
remove_subscription_from_manager(current_temp->id, wsi);
|
||||
free(current_temp);
|
||||
current_temp = next_temp;
|
||||
}
|
||||
pthread_mutex_destroy(&pss->session_lock);
|
||||
} else {
|
||||
DEBUG_LOG("WebSocket CLOSED: ip=unknown duration=0s subscriptions=0 authenticated=no reason=unknown");
|
||||
@@ -1685,12 +1892,18 @@ int handle_count_message(const char* sub_id, cJSON* filters, struct lws *wsi, st
|
||||
char *count_str = cJSON_Print(count_response);
|
||||
if (count_str) {
|
||||
size_t count_len = strlen(count_str);
|
||||
unsigned char *buf = malloc(LWS_PRE + count_len);
|
||||
if (buf) {
|
||||
memcpy(buf + LWS_PRE, count_str, count_len);
|
||||
lws_write(wsi, buf + LWS_PRE, count_len, LWS_WRITE_TEXT);
|
||||
free(buf);
|
||||
|
||||
// DEBUG: Log WebSocket frame details before sending
|
||||
DEBUG_TRACE("WS_FRAME_SEND: type=COUNT len=%zu data=%.100s%s",
|
||||
count_len,
|
||||
count_str,
|
||||
count_len > 100 ? "..." : "");
|
||||
|
||||
// Queue message for proper libwebsockets pattern
|
||||
if (queue_message(wsi, pss, count_str, count_len, LWS_WRITE_TEXT) != 0) {
|
||||
DEBUG_ERROR("Failed to queue COUNT message");
|
||||
}
|
||||
|
||||
free(count_str);
|
||||
}
|
||||
cJSON_Delete(count_response);
|
||||
|
||||
@@ -31,6 +31,14 @@
|
||||
#define MAX_SEARCH_LENGTH 256
|
||||
#define MAX_TAG_VALUE_LENGTH 1024
|
||||
|
||||
// Message queue node for proper libwebsockets pattern
|
||||
struct message_queue_node {
|
||||
unsigned char* data; // Message data (with LWS_PRE space)
|
||||
size_t length; // Message length (without LWS_PRE)
|
||||
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
|
||||
struct message_queue_node* next; // Next node in queue
|
||||
};
|
||||
|
||||
// Enhanced per-session data with subscription management, NIP-42 authentication, and rate limiting
|
||||
struct per_session_data {
|
||||
int authenticated;
|
||||
@@ -59,6 +67,12 @@ struct per_session_data {
|
||||
int malformed_request_count; // Count of malformed requests in current hour
|
||||
time_t malformed_request_window_start; // Start of current hour window
|
||||
time_t malformed_request_blocked_until; // Time until blocked for malformed requests
|
||||
|
||||
// Message queue for proper libwebsockets pattern (replaces single buffer)
|
||||
struct message_queue_node* message_queue_head; // Head of message queue
|
||||
struct message_queue_node* message_queue_tail; // Tail of message queue
|
||||
int message_queue_count; // Number of messages in queue
|
||||
int writeable_requested; // Flag: 1 if writeable callback requested
|
||||
};
|
||||
|
||||
// NIP-11 HTTP session data structure for managing buffer lifetime
|
||||
@@ -73,6 +87,10 @@ struct nip11_session_data {
|
||||
// Function declarations
|
||||
int start_websocket_relay(int port_override, int strict_port);
|
||||
|
||||
// Message queue functions for proper libwebsockets pattern
|
||||
int queue_message(struct lws* wsi, struct per_session_data* pss, const char* message, size_t length, enum lws_write_protocol type);
|
||||
int process_message_queue(struct lws* wsi, struct per_session_data* pss);
|
||||
|
||||
// Auth rules checking function from request_validator.c
|
||||
int check_database_auth_rules(const char *pubkey, const char *operation, const char *resource_hash);
|
||||
|
||||
|
||||
63
tests/large_event_test.sh
Executable file
63
tests/large_event_test.sh
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Test script for posting large events (>4KB) to test partial write handling
|
||||
# Uses nak to properly sign events with large content
|
||||
|
||||
RELAY_URL="ws://localhost:8888"
|
||||
|
||||
# Check if nak is installed
|
||||
if ! command -v nak &> /dev/null; then
|
||||
echo "Error: nak is not installed. Install with: go install github.com/fiatjaf/nak@latest"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Generate a test private key if not set
|
||||
if [ -z "$NOSTR_PRIVATE_KEY" ]; then
|
||||
echo "Generating temporary test key..."
|
||||
export NOSTR_PRIVATE_KEY=$(nak key generate)
|
||||
fi
|
||||
|
||||
echo "=== Large Event Test ==="
|
||||
echo "Testing partial write handling with events >4KB"
|
||||
echo "Relay: $RELAY_URL"
|
||||
echo ""
|
||||
|
||||
# Test 1: 5KB event
|
||||
echo "Test 1: Posting 5KB event..."
|
||||
CONTENT_5KB=$(python3 -c "print('A' * 5000)")
|
||||
echo "$CONTENT_5KB" | nak event -k 1 --content - $RELAY_URL
|
||||
sleep 1
|
||||
|
||||
# Test 2: 10KB event
|
||||
echo ""
|
||||
echo "Test 2: Posting 10KB event..."
|
||||
CONTENT_10KB=$(python3 -c "print('B' * 10000)")
|
||||
echo "$CONTENT_10KB" | nak event -k 1 --content - $RELAY_URL
|
||||
sleep 1
|
||||
|
||||
# Test 3: 20KB event
|
||||
echo ""
|
||||
echo "Test 3: Posting 20KB event..."
|
||||
CONTENT_20KB=$(python3 -c "print('C' * 20000)")
|
||||
echo "$CONTENT_20KB" | nak event -k 1 --content - $RELAY_URL
|
||||
sleep 1
|
||||
|
||||
# Test 4: 50KB event (very large)
|
||||
echo ""
|
||||
echo "Test 4: Posting 50KB event..."
|
||||
CONTENT_50KB=$(python3 -c "print('D' * 50000)")
|
||||
echo "$CONTENT_50KB" | nak event -k 1 --content - $RELAY_URL
|
||||
|
||||
echo ""
|
||||
echo "=== Test Complete ==="
|
||||
echo ""
|
||||
echo "Check relay.log for:"
|
||||
echo " - 'Queued partial write' messages (indicates buffering is working)"
|
||||
echo " - 'write completed' messages (indicates retry succeeded)"
|
||||
echo " - No 'Invalid frame header' errors"
|
||||
echo ""
|
||||
echo "To view logs in real-time:"
|
||||
echo " tail -f relay.log | grep -E '(partial|write completed|Invalid frame)'"
|
||||
echo ""
|
||||
echo "To check if events were stored:"
|
||||
echo " sqlite3 build/*.db 'SELECT id, length(content) as content_size FROM events ORDER BY created_at DESC LIMIT 4;'"
|
||||
@@ -3,6 +3,19 @@
|
||||
# Test script to post kind 1 events to the relay every second
|
||||
# Cycles through three different secret keys
|
||||
# Content includes current timestamp
|
||||
#
|
||||
# Usage: ./post_events.sh <relay_url>
|
||||
# Example: ./post_events.sh ws://localhost:8888
|
||||
# Example: ./post_events.sh wss://relay.laantungir.net
|
||||
|
||||
# Check if relay URL is provided
|
||||
if [ -z "$1" ]; then
|
||||
echo "Error: Relay URL is required"
|
||||
echo "Usage: $0 <relay_url>"
|
||||
echo "Example: $0 ws://localhost:8888"
|
||||
echo "Example: $0 wss://relay.laantungir.net"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Array of secret keys to cycle through
|
||||
SECRET_KEYS=(
|
||||
@@ -11,7 +24,7 @@ SECRET_KEYS=(
|
||||
"1618aaa21f5bd45c5ffede0d9a60556db67d4a046900e5f66b0bae5c01c801fb"
|
||||
)
|
||||
|
||||
RELAY_URL="ws://localhost:8888"
|
||||
RELAY_URL="$1"
|
||||
KEY_INDEX=0
|
||||
|
||||
echo "Starting event posting test to $RELAY_URL"
|
||||
@@ -36,5 +49,5 @@ while true; do
|
||||
KEY_INDEX=$(( (KEY_INDEX + 1) % ${#SECRET_KEYS[@]} ))
|
||||
|
||||
# Wait 1 second
|
||||
sleep 1
|
||||
sleep .2
|
||||
done
|
||||
@@ -1,203 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Rate Limiting Test Suite for C-Relay
|
||||
# Tests rate limiting and abuse prevention mechanisms
|
||||
|
||||
set -e
|
||||
|
||||
# Configuration
|
||||
RELAY_HOST="127.0.0.1"
|
||||
RELAY_PORT="8888"
|
||||
TEST_TIMEOUT=15
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Test counters
|
||||
TOTAL_TESTS=0
|
||||
PASSED_TESTS=0
|
||||
FAILED_TESTS=0
|
||||
|
||||
# Function to test rate limiting
|
||||
test_rate_limiting() {
|
||||
local description="$1"
|
||||
local message="$2"
|
||||
local burst_count="${3:-10}"
|
||||
local expected_limited="${4:-false}"
|
||||
|
||||
TOTAL_TESTS=$((TOTAL_TESTS + 1))
|
||||
|
||||
echo -n "Testing $description... "
|
||||
|
||||
local rate_limited=false
|
||||
local success_count=0
|
||||
local error_count=0
|
||||
|
||||
# Send burst of messages
|
||||
for i in $(seq 1 "$burst_count"); do
|
||||
local response
|
||||
response=$(echo "$message" | timeout 2 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT 2>/dev/null | head -1 || echo 'TIMEOUT')
|
||||
|
||||
if [[ "$response" == *"rate limit"* ]] || [[ "$response" == *"too many"* ]] || [[ "$response" == *"TOO_MANY"* ]]; then
|
||||
rate_limited=true
|
||||
elif [[ "$response" == *"EOSE"* ]] || [[ "$response" == *"EVENT"* ]] || [[ "$response" == *"OK"* ]]; then
|
||||
((success_count++))
|
||||
else
|
||||
((error_count++))
|
||||
fi
|
||||
|
||||
# Small delay between requests
|
||||
sleep 0.05
|
||||
done
|
||||
|
||||
if [[ "$expected_limited" == "true" ]]; then
|
||||
if [[ "$rate_limited" == "true" ]]; then
|
||||
echo -e "${GREEN}PASSED${NC} - Rate limiting triggered as expected"
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1))
|
||||
return 0
|
||||
else
|
||||
echo -e "${RED}FAILED${NC} - Rate limiting not triggered (expected)"
|
||||
FAILED_TESTS=$((FAILED_TESTS + 1))
|
||||
return 1
|
||||
fi
|
||||
else
|
||||
if [[ "$rate_limited" == "false" ]]; then
|
||||
echo -e "${GREEN}PASSED${NC} - No rate limiting for normal traffic"
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1))
|
||||
return 0
|
||||
else
|
||||
echo -e "${YELLOW}UNCERTAIN${NC} - Unexpected rate limiting"
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1)) # Count as passed since it's conservative
|
||||
return 0
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
# Function to test sustained load
|
||||
test_sustained_load() {
|
||||
local description="$1"
|
||||
local message="$2"
|
||||
local duration="${3:-10}"
|
||||
|
||||
TOTAL_TESTS=$((TOTAL_TESTS + 1))
|
||||
|
||||
echo -n "Testing $description... "
|
||||
|
||||
local start_time
|
||||
start_time=$(date +%s)
|
||||
local rate_limited=false
|
||||
local total_requests=0
|
||||
local successful_requests=0
|
||||
|
||||
while [[ $(($(date +%s) - start_time)) -lt duration ]]; do
|
||||
((total_requests++))
|
||||
local response
|
||||
response=$(echo "$message" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT 2>/dev/null | head -1 || echo 'TIMEOUT')
|
||||
|
||||
if [[ "$response" == *"rate limit"* ]] || [[ "$response" == *"too many"* ]] || [[ "$response" == *"TOO_MANY"* ]]; then
|
||||
rate_limited=true
|
||||
elif [[ "$response" == *"EOSE"* ]] || [[ "$response" == *"EVENT"* ]] || [[ "$response" == *"OK"* ]]; then
|
||||
((successful_requests++))
|
||||
fi
|
||||
|
||||
# Small delay to avoid overwhelming
|
||||
sleep 0.1
|
||||
done
|
||||
|
||||
local success_rate=0
|
||||
if [[ $total_requests -gt 0 ]]; then
|
||||
success_rate=$((successful_requests * 100 / total_requests))
|
||||
fi
|
||||
|
||||
if [[ "$rate_limited" == "true" ]]; then
|
||||
echo -e "${GREEN}PASSED${NC} - Rate limiting activated under sustained load (${success_rate}% success rate)"
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1))
|
||||
return 0
|
||||
else
|
||||
echo -e "${YELLOW}UNCERTAIN${NC} - No rate limiting detected (${success_rate}% success rate)"
|
||||
# This might be acceptable if rate limiting is very permissive
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1))
|
||||
return 0
|
||||
fi
|
||||
}
|
||||
|
||||
echo "=========================================="
|
||||
echo "C-Relay Rate Limiting Test Suite"
|
||||
echo "=========================================="
|
||||
echo "Testing rate limiting against relay at ws://$RELAY_HOST:$RELAY_PORT"
|
||||
echo ""
|
||||
|
||||
# Test basic connectivity first
|
||||
echo "=== Basic Connectivity Test ==="
|
||||
test_rate_limiting "Basic connectivity" '["REQ","rate_test",{}]' 1 false
|
||||
echo ""
|
||||
|
||||
echo "=== Burst Request Testing ==="
|
||||
# Test rapid succession of requests
|
||||
test_rate_limiting "Rapid REQ messages" '["REQ","burst_req_'$(date +%s%N)'",{}]' 20 true
|
||||
test_rate_limiting "Rapid COUNT messages" '["COUNT","burst_count_'$(date +%s%N)'",{}]' 20 true
|
||||
test_rate_limiting "Rapid CLOSE messages" '["CLOSE","burst_close"]' 20 true
|
||||
echo ""
|
||||
|
||||
echo "=== Malformed Message Rate Limiting ==="
|
||||
# Test if malformed messages trigger rate limiting faster
|
||||
test_rate_limiting "Malformed JSON burst" '["REQ","malformed"' 15 true
|
||||
test_rate_limiting "Invalid message type burst" '["INVALID","test",{}]' 15 true
|
||||
test_rate_limiting "Empty message burst" '[]' 15 true
|
||||
echo ""
|
||||
|
||||
echo "=== Sustained Load Testing ==="
|
||||
# Test sustained moderate load
|
||||
test_sustained_load "Sustained REQ load" '["REQ","sustained_'$(date +%s%N)'",{}]' 10
|
||||
test_sustained_load "Sustained COUNT load" '["COUNT","sustained_count_'$(date +%s%N)'",{}]' 10
|
||||
echo ""
|
||||
|
||||
echo "=== Filter Complexity Testing ==="
|
||||
# Test if complex filters trigger rate limiting
|
||||
test_rate_limiting "Complex filter burst" '["REQ","complex_'$(date +%s%N)'",{"authors":["a","b","c"],"kinds":[1,2,3],"#e":["x","y","z"],"#p":["m","n","o"],"since":1000000000,"until":2000000000,"limit":100}]' 10 true
|
||||
echo ""
|
||||
|
||||
echo "=== Subscription Management Testing ==="
|
||||
# Test subscription creation/deletion rate limiting
|
||||
echo -n "Testing subscription churn... "
|
||||
local churn_test_passed=true
|
||||
for i in $(seq 1 25); do
|
||||
# Create subscription
|
||||
echo "[\"REQ\",\"churn_${i}_$(date +%s%N)\",{}]" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1 || true
|
||||
|
||||
# Close subscription
|
||||
echo "[\"CLOSE\",\"churn_${i}_*\"]" | timeout 1 websocat -B 1048576 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1 || true
|
||||
|
||||
sleep 0.05
|
||||
done
|
||||
|
||||
# Check if relay is still responsive
|
||||
if echo 'ping' | timeout 2 websocat -n1 ws://$RELAY_HOST:$RELAY_PORT >/dev/null 2>&1; then
|
||||
echo -e "${GREEN}PASSED${NC} - Subscription churn handled"
|
||||
TOTAL_TESTS=$((TOTAL_TESTS + 1))
|
||||
PASSED_TESTS=$((PASSED_TESTS + 1))
|
||||
else
|
||||
echo -e "${RED}FAILED${NC} - Relay unresponsive after subscription churn"
|
||||
TOTAL_TESTS=$((TOTAL_TESTS + 1))
|
||||
FAILED_TESTS=$((FAILED_TESTS + 1))
|
||||
fi
|
||||
echo ""
|
||||
|
||||
echo "=== Test Results ==="
|
||||
echo "Total tests: $TOTAL_TESTS"
|
||||
echo -e "Passed: ${GREEN}$PASSED_TESTS${NC}"
|
||||
echo -e "Failed: ${RED}$FAILED_TESTS${NC}"
|
||||
|
||||
if [[ $FAILED_TESTS -eq 0 ]]; then
|
||||
echo -e "${GREEN}✓ All rate limiting tests passed!${NC}"
|
||||
echo "Rate limiting appears to be working correctly."
|
||||
exit 0
|
||||
else
|
||||
echo -e "${RED}✗ Some rate limiting tests failed!${NC}"
|
||||
echo "Rate limiting may not be properly configured."
|
||||
exit 1
|
||||
fi
|
||||
Submodule text_graph updated: 0762bfbd1e...bf1785f372
Reference in New Issue
Block a user