Files
c-relay/docs/websocket_write_queue_design.md

6.1 KiB

WebSocket Write Queue Design

Problem Statement

The current partial write handling implementation uses a single buffer per session, which fails when multiple events need to be sent to the same client in rapid succession. This causes:

  1. First event gets partial write → queued successfully
  2. Second event tries to write → FAILS with "write already pending"
  3. Subsequent events fail similarly, causing data loss

Server Log Evidence

[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
[TRACE] Queued partial write: len=2123
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
[WARN] queue_websocket_write: write already pending, cannot queue new write
[ERROR] Failed to queue partial EVENT write for sub=1

Root Cause

WebSocket frames must be sent atomically - you cannot interleave multiple frames. The current single-buffer approach correctly enforces this, but it rejects new writes instead of queuing them.

Solution: Write Queue Architecture

Design Principles

  1. Frame Atomicity: Complete one WebSocket frame before starting the next
  2. Sequential Processing: Process queued writes in FIFO order
  3. Memory Safety: Proper cleanup on connection close or errors
  4. Thread Safety: Protect queue operations with existing session lock

Data Structures

Write Queue Node

struct write_queue_node {
    unsigned char* buffer;           // Buffer with LWS_PRE space
    size_t total_len;                // Total length of data to write
    size_t offset;                   // How much has been written so far
    int write_type;                  // LWS_WRITE_TEXT, etc.
    struct write_queue_node* next;   // Next node in queue
};

Per-Session Write Queue

struct per_session_data {
    // ... existing fields ...
    
    // Write queue for handling multiple pending writes
    struct write_queue_node* write_queue_head;  // First item to write
    struct write_queue_node* write_queue_tail;  // Last item in queue
    int write_queue_length;                     // Number of items in queue
    int write_in_progress;                      // Flag: 1 if currently writing
};

Algorithm Flow

1. Enqueue Write (queue_websocket_write)

IF write_queue is empty AND no write in progress:
    - Attempt immediate write with lws_write()
    - IF complete:
        - Return success
    - ELSE (partial write):
        - Create queue node with remaining data
        - Add to queue
        - Set write_in_progress flag
        - Request LWS_CALLBACK_SERVER_WRITEABLE
ELSE:
    - Create queue node with full data
    - Append to queue tail
    - IF no write in progress:
        - Request LWS_CALLBACK_SERVER_WRITEABLE

2. Process Queue (process_pending_write)

WHILE write_queue is not empty:
    - Get head node
    - Calculate remaining data (total_len - offset)
    - Attempt write with lws_write()
    
    IF write fails (< 0):
        - Log error
        - Remove and free head node
        - Continue to next node
    
    ELSE IF partial write (< remaining):
        - Update offset
        - Request LWS_CALLBACK_SERVER_WRITEABLE
        - Break (wait for next callback)
    
    ELSE (complete write):
        - Remove and free head node
        - Continue to next node

IF queue is empty:
    - Clear write_in_progress flag

3. Cleanup (LWS_CALLBACK_CLOSED)

WHILE write_queue is not empty:
    - Get head node
    - Free buffer
    - Free node
    - Move to next
Clear queue pointers

Memory Management

  1. Allocation: Each queue node allocates buffer with LWS_PRE + data_len
  2. Ownership: Queue owns all buffers until write completes or connection closes
  3. Deallocation: Free buffer and node when:
    • Write completes successfully
    • Write fails with error
    • Connection closes

Thread Safety

  • Use existing pss->session_lock to protect queue operations
  • Lock during:
    • Enqueue operations
    • Dequeue operations
    • Queue traversal for cleanup

Performance Considerations

  1. Queue Length Limit: Implement max queue length (e.g., 100 items) to prevent memory exhaustion
  2. Memory Pressure: Monitor total queued bytes per session
  3. Backpressure: If queue exceeds limit, close connection with NOTICE

Error Handling

  1. Allocation Failure: Return error, log, send NOTICE to client
  2. Write Failure: Remove failed frame, continue with next
  3. Queue Overflow: Close connection with appropriate NOTICE

Implementation Plan

Phase 1: Data Structure Changes

  1. Add write_queue_node structure to websockets.h
  2. Update per_session_data with queue fields
  3. Remove old single-buffer fields

Phase 2: Queue Operations

  1. Implement enqueue_write() helper
  2. Implement dequeue_write() helper
  3. Update queue_websocket_write() to use queue
  4. Update process_pending_write() to process queue

Phase 3: Integration

  1. Update all lws_write() call sites
  2. Update LWS_CALLBACK_CLOSED cleanup
  3. Add queue length monitoring

Phase 4: Testing

  1. Test with rapid multiple events to same client
  2. Test with large events (>4KB)
  3. Test under load with concurrent connections
  4. Verify no "Invalid frame header" errors

Expected Outcomes

  1. No More Rejections: All writes queued successfully
  2. Frame Integrity: Complete frames sent atomically
  3. Memory Safety: Proper cleanup on all paths
  4. Performance: Minimal overhead for queue management

Metrics to Monitor

  1. Average queue length per session
  2. Maximum queue length observed
  3. Queue overflow events (if limit implemented)
  4. Write completion rate
  5. Partial write frequency

Alternative Approaches Considered

1. Larger Single Buffer

Rejected: Doesn't solve the fundamental problem of multiple concurrent writes

2. Immediate Write Retry

Rejected: Could cause busy-waiting and CPU waste

3. Drop Frames on Conflict

Rejected: Violates reliability requirements

References

  • libwebsockets documentation on lws_write() and LWS_CALLBACK_SERVER_WRITEABLE
  • WebSocket RFC 6455 on frame structure
  • Nostr NIP-01 on relay-to-client communication