200 lines
6.1 KiB
Markdown
200 lines
6.1 KiB
Markdown
# WebSocket Write Queue Design
|
|
|
|
## Problem Statement
|
|
|
|
The current partial write handling implementation uses a single buffer per session, which fails when multiple events need to be sent to the same client in rapid succession. This causes:
|
|
|
|
1. First event gets partial write → queued successfully
|
|
2. Second event tries to write → **FAILS** with "write already pending"
|
|
3. Subsequent events fail similarly, causing data loss
|
|
|
|
### Server Log Evidence
|
|
```
|
|
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
|
|
[TRACE] Queued partial write: len=2123
|
|
[WARN] WS_FRAME_PARTIAL: EVENT partial write, sub=1 sent=3210 expected=5333
|
|
[WARN] queue_websocket_write: write already pending, cannot queue new write
|
|
[ERROR] Failed to queue partial EVENT write for sub=1
|
|
```
|
|
|
|
## Root Cause
|
|
|
|
WebSocket frames must be sent **atomically** - you cannot interleave multiple frames. The current single-buffer approach correctly enforces this, but it rejects new writes instead of queuing them.
|
|
|
|
## Solution: Write Queue Architecture
|
|
|
|
### Design Principles
|
|
|
|
1. **Frame Atomicity**: Complete one WebSocket frame before starting the next
|
|
2. **Sequential Processing**: Process queued writes in FIFO order
|
|
3. **Memory Safety**: Proper cleanup on connection close or errors
|
|
4. **Thread Safety**: Protect queue operations with existing session lock
|
|
|
|
### Data Structures
|
|
|
|
#### Write Queue Node
|
|
```c
|
|
struct write_queue_node {
|
|
unsigned char* buffer; // Buffer with LWS_PRE space
|
|
size_t total_len; // Total length of data to write
|
|
size_t offset; // How much has been written so far
|
|
int write_type; // LWS_WRITE_TEXT, etc.
|
|
struct write_queue_node* next; // Next node in queue
|
|
};
|
|
```
|
|
|
|
#### Per-Session Write Queue
|
|
```c
|
|
struct per_session_data {
|
|
// ... existing fields ...
|
|
|
|
// Write queue for handling multiple pending writes
|
|
struct write_queue_node* write_queue_head; // First item to write
|
|
struct write_queue_node* write_queue_tail; // Last item in queue
|
|
int write_queue_length; // Number of items in queue
|
|
int write_in_progress; // Flag: 1 if currently writing
|
|
};
|
|
```
|
|
|
|
### Algorithm Flow
|
|
|
|
#### 1. Enqueue Write (`queue_websocket_write`)
|
|
|
|
```
|
|
IF write_queue is empty AND no write in progress:
|
|
- Attempt immediate write with lws_write()
|
|
- IF complete:
|
|
- Return success
|
|
- ELSE (partial write):
|
|
- Create queue node with remaining data
|
|
- Add to queue
|
|
- Set write_in_progress flag
|
|
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
|
ELSE:
|
|
- Create queue node with full data
|
|
- Append to queue tail
|
|
- IF no write in progress:
|
|
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
|
```
|
|
|
|
#### 2. Process Queue (`process_pending_write`)
|
|
|
|
```
|
|
WHILE write_queue is not empty:
|
|
- Get head node
|
|
- Calculate remaining data (total_len - offset)
|
|
- Attempt write with lws_write()
|
|
|
|
IF write fails (< 0):
|
|
- Log error
|
|
- Remove and free head node
|
|
- Continue to next node
|
|
|
|
ELSE IF partial write (< remaining):
|
|
- Update offset
|
|
- Request LWS_CALLBACK_SERVER_WRITEABLE
|
|
- Break (wait for next callback)
|
|
|
|
ELSE (complete write):
|
|
- Remove and free head node
|
|
- Continue to next node
|
|
|
|
IF queue is empty:
|
|
- Clear write_in_progress flag
|
|
```
|
|
|
|
#### 3. Cleanup (`LWS_CALLBACK_CLOSED`)
|
|
|
|
```
|
|
WHILE write_queue is not empty:
|
|
- Get head node
|
|
- Free buffer
|
|
- Free node
|
|
- Move to next
|
|
Clear queue pointers
|
|
```
|
|
|
|
### Memory Management
|
|
|
|
1. **Allocation**: Each queue node allocates buffer with `LWS_PRE + data_len`
|
|
2. **Ownership**: Queue owns all buffers until write completes or connection closes
|
|
3. **Deallocation**: Free buffer and node when:
|
|
- Write completes successfully
|
|
- Write fails with error
|
|
- Connection closes
|
|
|
|
### Thread Safety
|
|
|
|
- Use existing `pss->session_lock` to protect queue operations
|
|
- Lock during:
|
|
- Enqueue operations
|
|
- Dequeue operations
|
|
- Queue traversal for cleanup
|
|
|
|
### Performance Considerations
|
|
|
|
1. **Queue Length Limit**: Implement max queue length (e.g., 100 items) to prevent memory exhaustion
|
|
2. **Memory Pressure**: Monitor total queued bytes per session
|
|
3. **Backpressure**: If queue exceeds limit, close connection with NOTICE
|
|
|
|
### Error Handling
|
|
|
|
1. **Allocation Failure**: Return error, log, send NOTICE to client
|
|
2. **Write Failure**: Remove failed frame, continue with next
|
|
3. **Queue Overflow**: Close connection with appropriate NOTICE
|
|
|
|
## Implementation Plan
|
|
|
|
### Phase 1: Data Structure Changes
|
|
1. Add `write_queue_node` structure to `websockets.h`
|
|
2. Update `per_session_data` with queue fields
|
|
3. Remove old single-buffer fields
|
|
|
|
### Phase 2: Queue Operations
|
|
1. Implement `enqueue_write()` helper
|
|
2. Implement `dequeue_write()` helper
|
|
3. Update `queue_websocket_write()` to use queue
|
|
4. Update `process_pending_write()` to process queue
|
|
|
|
### Phase 3: Integration
|
|
1. Update all `lws_write()` call sites
|
|
2. Update `LWS_CALLBACK_CLOSED` cleanup
|
|
3. Add queue length monitoring
|
|
|
|
### Phase 4: Testing
|
|
1. Test with rapid multiple events to same client
|
|
2. Test with large events (>4KB)
|
|
3. Test under load with concurrent connections
|
|
4. Verify no "Invalid frame header" errors
|
|
|
|
## Expected Outcomes
|
|
|
|
1. **No More Rejections**: All writes queued successfully
|
|
2. **Frame Integrity**: Complete frames sent atomically
|
|
3. **Memory Safety**: Proper cleanup on all paths
|
|
4. **Performance**: Minimal overhead for queue management
|
|
|
|
## Metrics to Monitor
|
|
|
|
1. Average queue length per session
|
|
2. Maximum queue length observed
|
|
3. Queue overflow events (if limit implemented)
|
|
4. Write completion rate
|
|
5. Partial write frequency
|
|
|
|
## Alternative Approaches Considered
|
|
|
|
### 1. Larger Single Buffer
|
|
**Rejected**: Doesn't solve the fundamental problem of multiple concurrent writes
|
|
|
|
### 2. Immediate Write Retry
|
|
**Rejected**: Could cause busy-waiting and CPU waste
|
|
|
|
### 3. Drop Frames on Conflict
|
|
**Rejected**: Violates reliability requirements
|
|
|
|
## References
|
|
|
|
- libwebsockets documentation on `lws_write()` and `LWS_CALLBACK_SERVER_WRITEABLE`
|
|
- WebSocket RFC 6455 on frame structure
|
|
- Nostr NIP-01 on relay-to-client communication |