10 KiB
10 KiB
Libwebsockets Proper Pattern - Message Queue Design
Problem Analysis
Current Violation
We're calling lws_write() directly from multiple code paths:
- Event broadcast (subscriptions.c:667) - when events arrive
- OK responses (websockets.c:855) - when processing EVENT messages
- EOSE responses (websockets.c:976) - when processing REQ messages
- COUNT responses (websockets.c:1922) - when processing COUNT messages
This violates libwebsockets' design pattern which requires:
lws_write()ONLY called fromLWS_CALLBACK_SERVER_WRITEABLE- Application queues messages and requests writeable callback
- Libwebsockets handles write timing and socket buffer management
Consequences of Violation
- Partial writes when socket buffer is full
- Multiple concurrent write attempts before callback fires
- "write already pending" errors with single buffer
- Frame corruption from interleaved partial writes
- "Invalid frame header" errors on client side
Correct Architecture
Message Queue Pattern
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────────────────────────┤
│ │
│ Event Arrives → Queue Message → Request Writeable Callback │
│ REQ Received → Queue EOSE → Request Writeable Callback │
│ EVENT Received→ Queue OK → Request Writeable Callback │
│ COUNT Received→ Queue COUNT → Request Writeable Callback │
│ │
└─────────────────────────────────────────────────────────────┘
↓
lws_callback_on_writable(wsi)
↓
┌─────────────────────────────────────────────────────────────┐
│ LWS_CALLBACK_SERVER_WRITEABLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Dequeue next message from queue │
│ 2. Call lws_write() with message data │
│ 3. If queue not empty, request another callback │
│ │
└─────────────────────────────────────────────────────────────┘
↓
libwebsockets handles:
- Socket buffer management
- Partial write handling
- Frame atomicity
Data Structures
Message Queue Node
typedef struct message_queue_node {
unsigned char* data; // Message data (with LWS_PRE space)
size_t length; // Message length (without LWS_PRE)
enum lws_write_protocol type; // LWS_WRITE_TEXT, etc.
struct message_queue_node* next;
} message_queue_node_t;
Per-Session Data Updates
struct per_session_data {
// ... existing fields ...
// Message queue (replaces single buffer)
message_queue_node_t* message_queue_head;
message_queue_node_t* message_queue_tail;
int message_queue_count;
int writeable_requested; // Flag to prevent duplicate requests
};
Implementation Functions
1. Queue Message (Application Layer)
int queue_message(struct lws* wsi, struct per_session_data* pss,
const char* message, size_t length,
enum lws_write_protocol type)
{
// Allocate node
message_queue_node_t* node = malloc(sizeof(message_queue_node_t));
// Allocate buffer with LWS_PRE space
node->data = malloc(LWS_PRE + length);
memcpy(node->data + LWS_PRE, message, length);
node->length = length;
node->type = type;
node->next = NULL;
// Add to queue (FIFO)
pthread_mutex_lock(&pss->session_lock);
if (!pss->message_queue_head) {
pss->message_queue_head = node;
pss->message_queue_tail = node;
} else {
pss->message_queue_tail->next = node;
pss->message_queue_tail = node;
}
pss->message_queue_count++;
pthread_mutex_unlock(&pss->session_lock);
// Request writeable callback (only if not already requested)
if (!pss->writeable_requested) {
pss->writeable_requested = 1;
lws_callback_on_writable(wsi);
}
return 0;
}
2. Process Queue (Writeable Callback)
int process_message_queue(struct lws* wsi, struct per_session_data* pss)
{
pthread_mutex_lock(&pss->session_lock);
// Get next message from queue
message_queue_node_t* node = pss->message_queue_head;
if (!node) {
pss->writeable_requested = 0;
pthread_mutex_unlock(&pss->session_lock);
return 0; // Queue empty
}
// Remove from queue
pss->message_queue_head = node->next;
if (!pss->message_queue_head) {
pss->message_queue_tail = NULL;
}
pss->message_queue_count--;
pthread_mutex_unlock(&pss->session_lock);
// Write message (libwebsockets handles partial writes)
int result = lws_write(wsi, node->data + LWS_PRE, node->length, node->type);
// Free node
free(node->data);
free(node);
// If queue not empty, request another callback
pthread_mutex_lock(&pss->session_lock);
if (pss->message_queue_head) {
lws_callback_on_writable(wsi);
} else {
pss->writeable_requested = 0;
}
pthread_mutex_unlock(&pss->session_lock);
return (result < 0) ? -1 : 0;
}
Refactoring Changes
Before (WRONG - Direct Write)
// websockets.c:855 - OK response
int write_result = lws_write(wsi, buf + LWS_PRE, response_len, LWS_WRITE_TEXT);
if (write_result < 0) {
DEBUG_ERROR("Write failed");
} else if ((size_t)write_result != response_len) {
// Partial write - queue remaining data
queue_websocket_write(wsi, pss, ...);
}
After (CORRECT - Queue Message)
// websockets.c:855 - OK response
queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT);
// That's it! Writeable callback will handle the actual write
Before (WRONG - Direct Write in Broadcast)
// subscriptions.c:667 - EVENT broadcast
int write_result = lws_write(current_temp->wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
if (write_result < 0) {
DEBUG_ERROR("Write failed");
} else if ((size_t)write_result != msg_len) {
queue_websocket_write(...);
}
After (CORRECT - Queue Message)
// subscriptions.c:667 - EVENT broadcast
struct per_session_data* pss = lws_wsi_user(current_temp->wsi);
queue_message(current_temp->wsi, pss, msg_str, msg_len, LWS_WRITE_TEXT);
// Writeable callback will handle the actual write
Benefits of Correct Pattern
-
No Partial Write Handling Needed
- Libwebsockets handles partial writes internally
- We just queue complete messages
-
No "Write Already Pending" Errors
- Queue can hold unlimited messages
- Each processed sequentially from callback
-
Thread Safety
- Queue operations protected by session lock
- Write only from single callback thread
-
Frame Atomicity
- Libwebsockets ensures complete frame transmission
- No interleaved partial writes
-
Simpler Code
- No complex partial write state machine
- Just queue and forget
-
Better Performance
- Libwebsockets optimizes write timing
- Batches writes when socket ready
Migration Steps
- ✅ Identify all
lws_write()call sites - ✅ Confirm violation of libwebsockets pattern
- ⏳ Design message queue structure
- ⏳ Implement
queue_message()function - ⏳ Implement
process_message_queue()function - ⏳ Update
per_session_datastructure - ⏳ Refactor OK response to use queue
- ⏳ Refactor EOSE response to use queue
- ⏳ Refactor COUNT response to use queue
- ⏳ Refactor EVENT broadcast to use queue
- ⏳ Update
LWS_CALLBACK_SERVER_WRITEABLEhandler - ⏳ Add queue cleanup in
LWS_CALLBACK_CLOSED - ⏳ Remove old partial write code
- ⏳ Test with rapid multiple events
- ⏳ Test with large events (>4KB)
- ⏳ Test under load
- ⏳ Verify no frame errors
Testing Strategy
Test 1: Multiple Rapid Events
# Send 10 events rapidly to same client
for i in {1..10}; do
echo '["EVENT",{"kind":1,"content":"test'$i'","created_at":'$(date +%s)',...}]' | \
websocat ws://localhost:8888 &
done
Expected: All events queued and sent sequentially, no errors
Test 2: Large Events
# Send event >4KB (forces multiple socket writes)
nak event --content "$(head -c 5000 /dev/urandom | base64)" | \
websocat ws://localhost:8888
Expected: Event queued, libwebsockets handles partial writes internally
Test 3: Concurrent Connections
# 100 concurrent connections, each sending events
for i in {1..100}; do
(echo '["REQ","sub'$i'",{}]'; sleep 1) | websocat ws://localhost:8888 &
done
Expected: All subscriptions work, events broadcast correctly
Success Criteria
- ✅ No
lws_write()calls outsideLWS_CALLBACK_SERVER_WRITEABLE - ✅ No "write already pending" errors in logs
- ✅ No "Invalid frame header" errors on client side
- ✅ All messages delivered in correct order
- ✅ Large events (>4KB) handled correctly
- ✅ Multiple rapid events to same client work
- ✅ Concurrent connections stable under load