This commit is contained in:
Chris McCormick 2025-09-30 00:18:46 +00:00 committed by GitHub
commit 6c445fa031
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 431 additions and 36 deletions

View File

@ -133,7 +133,9 @@ import WebSocket from 'ws'
useWebSocketImplementation(WebSocket)
```
You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms don't report websocket disconnections due to network issues, and enabling this can increase reliability.
#### enablePing
You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms, like Node.js, don't report websocket disconnections due to network issues, and enabling this can increase the reliability of the `onclose` event.
```js
import { SimplePool } from 'nostr-tools/pool'
@ -141,6 +143,34 @@ import { SimplePool } from 'nostr-tools/pool'
const pool = new SimplePool({ enablePing: true })
```
#### enableReconnect
You can also enable automatic reconnection with the `enableReconnect` option. This will make the pool try to reconnect to relays with an exponential backoff delay if the connection is lost unexpectedly.
```js
import { SimplePool } from 'nostr-tools/pool'
const pool = new SimplePool({ enableReconnect: true })
```
Using both `enablePing: true` and `enableReconnect: true` is recommended as it will improve the reliability and timeliness of the reconnection (at the expense of slighly higher bandwidth due to the ping messages).
```js
// on Node.js
const pool = new SimplePool({ enablePing: true, enableReconnect: true })
```
The `enableReconnect` option can also be a callback function which will receive the current subscription filters and should return a new set of filters. This is useful if you want to modify the subscription on reconnect, for example, to update the `since` parameter to fetch only new events.
```js
const pool = new SimplePool({
enableReconnect: (filters) => {
const newSince = Math.floor(Date.now() / 1000)
return filters.map(filter => ({ ...filter, since: newSince }))
}
})
```
### Parsing references (mentions) from a content based on NIP-27
```js

View File

@ -33,6 +33,7 @@ export class AbstractSimplePool {
public verifyEvent: Nostr['verifyEvent']
public enablePing: boolean | undefined
public enableReconnect: boolean | ((filters: Filter[]) => Filter[]) | undefined
public trustedRelayURLs: Set<string> = new Set()
private _WebSocket?: typeof WebSocket
@ -41,6 +42,7 @@ export class AbstractSimplePool {
this.verifyEvent = opts.verifyEvent
this._WebSocket = opts.websocketImplementation
this.enablePing = opts.enablePing
this.enableReconnect = opts.enableReconnect
}
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
@ -52,10 +54,13 @@ export class AbstractSimplePool {
verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent,
websocketImplementation: this._WebSocket,
enablePing: this.enablePing,
enableReconnect: this.enableReconnect,
})
relay.onclose = () => {
if (relay && !relay.enableReconnect) {
this.relays.delete(url)
}
}
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
this.relays.set(url, relay)
}

View File

@ -16,6 +16,7 @@ export type AbstractRelayConstructorOptions = {
verifyEvent: Nostr['verifyEvent']
websocketImplementation?: typeof WebSocket
enablePing?: boolean
enableReconnect?: boolean | ((filters: Filter[]) => Filter[])
}
export class SendingOnClosedConnection extends Error {
@ -37,9 +38,15 @@ export class AbstractRelay {
public publishTimeout: number = 4400
public pingFrequency: number = 20000
public pingTimeout: number = 20000
public resubscribeBackoff: number[] = [10000, 10000, 10000, 20000, 20000, 30000, 60000]
public openSubs: Map<string, Subscription> = new Map()
public enablePing: boolean | undefined
public enableReconnect: boolean | ((filters: Filter[]) => Filter[])
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private reconnectTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private pingTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private reconnectAttempts: number = 0
private closedIntentionally: boolean = false
private connectionPromise: Promise<void> | undefined
private openCountRequests = new Map<string, CountResolver>()
@ -59,6 +66,7 @@ export class AbstractRelay {
this.verifyEvent = opts.verifyEvent
this._WebSocket = opts.websocketImplementation || WebSocket
this.enablePing = opts.enablePing
this.enableReconnect = opts.enableReconnect || false
}
static async connect(url: string, opts: AbstractRelayConstructorOptions): Promise<AbstractRelay> {
@ -88,6 +96,40 @@ export class AbstractRelay {
return this._connected
}
private async reconnect(): Promise<void> {
const backoff = this.resubscribeBackoff[Math.min(this.reconnectAttempts, this.resubscribeBackoff.length - 1)]
this.reconnectAttempts++
this.reconnectTimeoutHandle = setTimeout(async () => {
try {
await this.connect()
} catch (err) {
// this will be called again through onclose/onerror
}
}, backoff)
}
private handleHardClose(reason: string) {
if (this.pingTimeoutHandle) {
clearTimeout(this.pingTimeoutHandle)
this.pingTimeoutHandle = undefined
}
this._connected = false
this.connectionPromise = undefined
const wasIntentional = this.closedIntentionally
this.closedIntentionally = false // reset for next time
this.onclose?.()
if (this.enableReconnect && !wasIntentional) {
this.reconnect()
} else {
this.closeAllSubscriptions(reason)
}
}
public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise
@ -110,8 +152,23 @@ export class AbstractRelay {
}
this.ws.onopen = () => {
if (this.reconnectTimeoutHandle) {
clearTimeout(this.reconnectTimeoutHandle)
this.reconnectTimeoutHandle = undefined
}
clearTimeout(this.connectionTimeoutHandle)
this._connected = true
this.reconnectAttempts = 0
// resubscribe to all open subscriptions
for (const sub of this.openSubs.values()) {
sub.eosed = false
if (typeof this.enableReconnect === 'function') {
sub.filters = this.enableReconnect(sub.filters)
}
sub.fire()
}
if (this.enablePing) {
this.pingpong()
}
@ -121,19 +178,13 @@ export class AbstractRelay {
this.ws.onerror = ev => {
clearTimeout(this.connectionTimeoutHandle)
reject((ev as any).message || 'websocket error')
this._connected = false
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection errored')
this.handleHardClose('relay connection errored')
}
this.ws.onclose = ev => {
clearTimeout(this.connectionTimeoutHandle)
reject((ev as any).message || 'websocket closed')
this._connected = false
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection closed')
this.handleHardClose('relay connection closed')
}
this.ws.onmessage = this._onmessage.bind(this)
@ -145,7 +196,7 @@ export class AbstractRelay {
private async waitForPingPong() {
return new Promise((res, err) => {
// listen for pong
;(this.ws && this.ws.on && this.ws.on('pong', () => res(true))) || err("ws can't listen for pong")
this.ws && this.ws.on ? this.ws.on('pong', () => res(true)) : err("ws can't listen for pong")
// send a ping
this.ws && this.ws.ping && this.ws.ping()
})
@ -178,16 +229,15 @@ export class AbstractRelay {
])
if (result) {
// schedule another pingpong
setTimeout(() => this.pingpong(), this.pingFrequency)
this.pingTimeoutHandle = setTimeout(() => this.pingpong(), this.pingFrequency)
} else {
// pingpong closing socket
this.closeAllSubscriptions('pingpong timed out')
this._connected = false
this.onclose?.()
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws?.close()
}
}
}
}
private async runQueue() {
this.queueRunning = true
@ -372,11 +422,22 @@ export class AbstractRelay {
}
public close() {
this.closedIntentionally = true
if (this.reconnectTimeoutHandle) {
clearTimeout(this.reconnectTimeoutHandle)
this.reconnectTimeoutHandle = undefined
}
if (this.pingTimeoutHandle) {
clearTimeout(this.pingTimeoutHandle)
this.pingTimeoutHandle = undefined
}
this.closeAllSubscriptions('relay connection closed by us')
this._connected = false
this.onclose?.()
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws?.close()
}
}
// this is the function assigned to this.ws.onmessage
// it's exposed for testing and debugging purposes

View File

@ -253,6 +253,120 @@ test('ping-pong timeout in pool', async () => {
expect(closed).toBeTrue()
})
test('reconnect on disconnect in pool', async () => {
const mockRelay = mockRelays[0]
pool = new SimplePool({ enablePing: true, enableReconnect: true })
const relay = await pool.ensureRelay(mockRelay.url)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]
let closes = 0
relay.onclose = () => {
closes++
}
expect(relay.connected).toBeTrue()
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)
// now make it unresponsive
mockRelay.unresponsive = true
// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()
// now make it responsive again
mockRelay.unresponsive = false
// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(relay.connected).toBeTrue()
expect(closes).toBe(1)
})
test('reconnect with filter update in pool', async () => {
const mockRelay = mockRelays[0]
const newSince = Math.floor(Date.now() / 1000)
pool = new SimplePool({
enablePing: true,
enableReconnect: filters => {
return filters.map(f => ({ ...f, since: newSince }))
},
})
const relay = await pool.ensureRelay(mockRelay.url)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]
let closes = 0
relay.onclose = () => {
closes++
}
expect(relay.connected).toBeTrue()
const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} })
expect(sub.filters[0].since).toBe(0)
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)
// now make it unresponsive
mockRelay.unresponsive = true
// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()
// now make it responsive again
mockRelay.unresponsive = false
// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(relay.connected).toBeTrue()
expect(closes).toBe(1)
// check if filter was updated
expect(sub.filters[0].since).toBe(newSince)
})
test('track relays when publishing', async () => {
let event1 = finalizeEvent(
{

View File

@ -1,7 +1,7 @@
/* global WebSocket */
import { verifyEvent } from './pure.ts'
import { AbstractSimplePool } from './abstract-pool.ts'
import { AbstractSimplePool, type AbstractPoolConstructorOptions } from './abstract-pool.ts'
var _WebSocket: typeof WebSocket
@ -14,7 +14,7 @@ export function useWebSocketImplementation(websocketImplementation: any) {
}
export class SimplePool extends AbstractSimplePool {
constructor(options?: { enablePing?: boolean }) {
constructor(options?: Pick<AbstractPoolConstructorOptions, 'enablePing' | 'enableReconnect'>) {
super({ verifyEvent, websocketImplementation: _WebSocket, ...options })
}
}

View File

@ -118,7 +118,71 @@ test('publish timeout', async () => {
).rejects.toThrow('publish timed out')
})
test('ping-pong timeout', async () => {
test('ping-pong timeout (with native ping)', async () => {
const mockRelay = new MockRelay()
let pingCalled = false
// mock a native ping/pong mechanism
;(MockWebSocketClient.prototype as any).ping = function (this: any) {
pingCalled = true
if (!mockRelay.unresponsive) {
this.dispatchEvent(new Event('pong'))
}
}
;(MockWebSocketClient.prototype as any).on = function (this: any, event: string, listener: () => void) {
if (event === 'pong') {
this.addEventListener('pong', listener)
}
}
try {
const relay = new Relay(mockRelay.url, { enablePing: true })
relay.pingTimeout = 50
relay.pingFrequency = 50
let closed = false
const closedPromise = new Promise<void>(resolve => {
relay.onclose = () => {
closed = true
resolve()
}
})
await relay.connect()
expect(relay.connected).toBeTrue()
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(pingCalled).toBeTrue()
expect(closed).toBeFalse()
// now make it unresponsive
mockRelay.unresponsive = true
// wait for the second ping to fail
await closedPromise
expect(relay.connected).toBeFalse()
expect(closed).toBeTrue()
} finally {
delete (MockWebSocketClient.prototype as any).ping
delete (MockWebSocketClient.prototype as any).on
}
})
test('ping-pong timeout (no-ping browser environment)', async () => {
// spy on send to ensure the fallback dummy REQ is used, since MockWebSocketClient has no ping
const originalSend = MockWebSocketClient.prototype.send
let dummyReqSent = false
try {
MockWebSocketClient.prototype.send = function (message: string) {
if (message.includes('REQ') && message.includes('a'.repeat(64))) {
dummyReqSent = true
}
originalSend.call(this, message)
}
const mockRelay = new MockRelay()
const relay = new Relay(mockRelay.url, { enablePing: true })
relay.pingTimeout = 50
@ -137,6 +201,7 @@ test('ping-pong timeout', async () => {
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(dummyReqSent).toBeTrue()
expect(closed).toBeFalse()
// now make it unresponsive
@ -147,4 +212,121 @@ test('ping-pong timeout', async () => {
expect(relay.connected).toBeFalse()
expect(closed).toBeTrue()
} finally {
MockWebSocketClient.prototype.send = originalSend
}
})
test('reconnect on disconnect', async () => {
const mockRelay = new MockRelay()
const relay = new Relay(mockRelay.url, { enablePing: true, enableReconnect: true })
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100] // short backoff for testing
let closes = 0
relay.onclose = () => {
closes++
}
await relay.connect()
expect(relay.connected).toBeTrue()
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)
// now make it unresponsive
mockRelay.unresponsive = true
// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()
// now make it responsive again
mockRelay.unresponsive = false
// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(relay.connected).toBeTrue()
expect(closes).toBe(1) // should not have closed again
})
test('reconnect with filter update', async () => {
const mockRelay = new MockRelay()
const newSince = Math.floor(Date.now() / 1000)
const relay = new Relay(mockRelay.url, {
enablePing: true,
enableReconnect: filters => {
return filters.map(f => ({ ...f, since: newSince }))
},
})
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]
let closes = 0
relay.onclose = () => {
closes++
}
await relay.connect()
expect(relay.connected).toBeTrue()
const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} })
expect(sub.filters[0].since).toBe(0)
// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)
// now make it unresponsive
mockRelay.unresponsive = true
// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()
// now make it responsive again
mockRelay.unresponsive = false
// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(relay.connected).toBeTrue()
expect(closes).toBe(1)
// check if filter was updated
expect(sub.filters[0].since).toBe(newSince)
})

View File

@ -1,7 +1,7 @@
/* global WebSocket */
import { verifyEvent } from './pure.ts'
import { AbstractRelay } from './abstract-relay.ts'
import { AbstractRelay, type AbstractRelayConstructorOptions } from './abstract-relay.ts'
var _WebSocket: typeof WebSocket
@ -14,11 +14,14 @@ export function useWebSocketImplementation(websocketImplementation: any) {
}
export class Relay extends AbstractRelay {
constructor(url: string, options?: { enablePing?: boolean }) {
constructor(url: string, options?: Pick<AbstractRelayConstructorOptions, 'enablePing' | 'enableReconnect'>) {
super(url, { verifyEvent, websocketImplementation: _WebSocket, ...options })
}
static async connect(url: string, options?: { enablePing?: boolean }): Promise<Relay> {
static async connect(
url: string,
options?: Pick<AbstractRelayConstructorOptions, 'enablePing' | 'enableReconnect'>,
): Promise<Relay> {
const relay = new Relay(url, options)
await relay.connect()
return relay