diff --git a/README.md b/README.md index 5fdc50e..97ba24e 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,9 @@ import WebSocket from 'ws' useWebSocketImplementation(WebSocket) ``` -You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms don't report websocket disconnections due to network issues, and enabling this can increase reliability. +#### enablePing + +You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms, like Node.js, don't report websocket disconnections due to network issues, and enabling this can increase the reliability of the `onclose` event. ```js import { SimplePool } from 'nostr-tools/pool' @@ -141,6 +143,34 @@ import { SimplePool } from 'nostr-tools/pool' const pool = new SimplePool({ enablePing: true }) ``` +#### enableReconnect + +You can also enable automatic reconnection with the `enableReconnect` option. This will make the pool try to reconnect to relays with an exponential backoff delay if the connection is lost unexpectedly. + +```js +import { SimplePool } from 'nostr-tools/pool' + +const pool = new SimplePool({ enableReconnect: true }) +``` + +Using both `enablePing: true` and `enableReconnect: true` is recommended as it will improve the reliability and timeliness of the reconnection (at the expense of slighly higher bandwidth due to the ping messages). + +```js +// on Node.js +const pool = new SimplePool({ enablePing: true, enableReconnect: true }) +``` + +The `enableReconnect` option can also be a callback function which will receive the current subscription filters and should return a new set of filters. This is useful if you want to modify the subscription on reconnect, for example, to update the `since` parameter to fetch only new events. + +```js +const pool = new SimplePool({ + enableReconnect: (filters) => { + const newSince = Math.floor(Date.now() / 1000) + return filters.map(filter => ({ ...filter, since: newSince })) + } +}) +``` + ### Parsing references (mentions) from a content based on NIP-27 ```js diff --git a/abstract-pool.ts b/abstract-pool.ts index 19a60b2..ab13c0c 100644 --- a/abstract-pool.ts +++ b/abstract-pool.ts @@ -33,6 +33,7 @@ export class AbstractSimplePool { public verifyEvent: Nostr['verifyEvent'] public enablePing: boolean | undefined + public enableReconnect: boolean | ((filters: Filter[]) => Filter[]) | undefined public trustedRelayURLs: Set = new Set() private _WebSocket?: typeof WebSocket @@ -41,6 +42,7 @@ export class AbstractSimplePool { this.verifyEvent = opts.verifyEvent this._WebSocket = opts.websocketImplementation this.enablePing = opts.enablePing + this.enableReconnect = opts.enableReconnect } async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise { @@ -52,9 +54,12 @@ export class AbstractSimplePool { verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent, websocketImplementation: this._WebSocket, enablePing: this.enablePing, + enableReconnect: this.enableReconnect, }) relay.onclose = () => { - this.relays.delete(url) + if (relay && !relay.enableReconnect) { + this.relays.delete(url) + } } if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout this.relays.set(url, relay) diff --git a/abstract-relay.ts b/abstract-relay.ts index 4e09c5e..0e36e7d 100644 --- a/abstract-relay.ts +++ b/abstract-relay.ts @@ -16,6 +16,7 @@ export type AbstractRelayConstructorOptions = { verifyEvent: Nostr['verifyEvent'] websocketImplementation?: typeof WebSocket enablePing?: boolean + enableReconnect?: boolean | ((filters: Filter[]) => Filter[]) } export class SendingOnClosedConnection extends Error { @@ -37,9 +38,15 @@ export class AbstractRelay { public publishTimeout: number = 4400 public pingFrequency: number = 20000 public pingTimeout: number = 20000 + public resubscribeBackoff: number[] = [10000, 10000, 10000, 20000, 20000, 30000, 60000] public openSubs: Map = new Map() public enablePing: boolean | undefined + public enableReconnect: boolean | ((filters: Filter[]) => Filter[]) private connectionTimeoutHandle: ReturnType | undefined + private reconnectTimeoutHandle: ReturnType | undefined + private pingTimeoutHandle: ReturnType | undefined + private reconnectAttempts: number = 0 + private closedIntentionally: boolean = false private connectionPromise: Promise | undefined private openCountRequests = new Map() @@ -59,6 +66,7 @@ export class AbstractRelay { this.verifyEvent = opts.verifyEvent this._WebSocket = opts.websocketImplementation || WebSocket this.enablePing = opts.enablePing + this.enableReconnect = opts.enableReconnect || false } static async connect(url: string, opts: AbstractRelayConstructorOptions): Promise { @@ -88,6 +96,40 @@ export class AbstractRelay { return this._connected } + private async reconnect(): Promise { + const backoff = this.resubscribeBackoff[Math.min(this.reconnectAttempts, this.resubscribeBackoff.length - 1)] + this.reconnectAttempts++ + + this.reconnectTimeoutHandle = setTimeout(async () => { + try { + await this.connect() + } catch (err) { + // this will be called again through onclose/onerror + } + }, backoff) + } + + private handleHardClose(reason: string) { + if (this.pingTimeoutHandle) { + clearTimeout(this.pingTimeoutHandle) + this.pingTimeoutHandle = undefined + } + + this._connected = false + this.connectionPromise = undefined + + const wasIntentional = this.closedIntentionally + this.closedIntentionally = false // reset for next time + + this.onclose?.() + + if (this.enableReconnect && !wasIntentional) { + this.reconnect() + } else { + this.closeAllSubscriptions(reason) + } + } + public async connect(): Promise { if (this.connectionPromise) return this.connectionPromise @@ -110,8 +152,23 @@ export class AbstractRelay { } this.ws.onopen = () => { + if (this.reconnectTimeoutHandle) { + clearTimeout(this.reconnectTimeoutHandle) + this.reconnectTimeoutHandle = undefined + } clearTimeout(this.connectionTimeoutHandle) this._connected = true + this.reconnectAttempts = 0 + + // resubscribe to all open subscriptions + for (const sub of this.openSubs.values()) { + sub.eosed = false + if (typeof this.enableReconnect === 'function') { + sub.filters = this.enableReconnect(sub.filters) + } + sub.fire() + } + if (this.enablePing) { this.pingpong() } @@ -121,19 +178,13 @@ export class AbstractRelay { this.ws.onerror = ev => { clearTimeout(this.connectionTimeoutHandle) reject((ev as any).message || 'websocket error') - this._connected = false - this.connectionPromise = undefined - this.onclose?.() - this.closeAllSubscriptions('relay connection errored') + this.handleHardClose('relay connection errored') } this.ws.onclose = ev => { clearTimeout(this.connectionTimeoutHandle) reject((ev as any).message || 'websocket closed') - this._connected = false - this.connectionPromise = undefined - this.onclose?.() - this.closeAllSubscriptions('relay connection closed') + this.handleHardClose('relay connection closed') } this.ws.onmessage = this._onmessage.bind(this) @@ -145,7 +196,7 @@ export class AbstractRelay { private async waitForPingPong() { return new Promise((res, err) => { // listen for pong - ;(this.ws && this.ws.on && this.ws.on('pong', () => res(true))) || err("ws can't listen for pong") + this.ws && this.ws.on ? this.ws.on('pong', () => res(true)) : err("ws can't listen for pong") // send a ping this.ws && this.ws.ping && this.ws.ping() }) @@ -178,13 +229,12 @@ export class AbstractRelay { ]) if (result) { // schedule another pingpong - setTimeout(() => this.pingpong(), this.pingFrequency) + this.pingTimeoutHandle = setTimeout(() => this.pingpong(), this.pingFrequency) } else { // pingpong closing socket - this.closeAllSubscriptions('pingpong timed out') - this._connected = false - this.onclose?.() - this.ws?.close() + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws?.close() + } } } } @@ -372,10 +422,21 @@ export class AbstractRelay { } public close() { + this.closedIntentionally = true + if (this.reconnectTimeoutHandle) { + clearTimeout(this.reconnectTimeoutHandle) + this.reconnectTimeoutHandle = undefined + } + if (this.pingTimeoutHandle) { + clearTimeout(this.pingTimeoutHandle) + this.pingTimeoutHandle = undefined + } this.closeAllSubscriptions('relay connection closed by us') this._connected = false this.onclose?.() - this.ws?.close() + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws?.close() + } } // this is the function assigned to this.ws.onmessage diff --git a/pool.test.ts b/pool.test.ts index 07c9a79..4942182 100644 --- a/pool.test.ts +++ b/pool.test.ts @@ -253,6 +253,120 @@ test('ping-pong timeout in pool', async () => { expect(closed).toBeTrue() }) +test('reconnect on disconnect in pool', async () => { + const mockRelay = mockRelays[0] + pool = new SimplePool({ enablePing: true, enableReconnect: true }) + const relay = await pool.ensureRelay(mockRelay.url) + relay.pingTimeout = 50 + relay.pingFrequency = 50 + relay.resubscribeBackoff = [50, 100] + + let closes = 0 + relay.onclose = () => { + closes++ + } + + expect(relay.connected).toBeTrue() + + // wait for the first ping to succeed + await new Promise(resolve => setTimeout(resolve, 75)) + expect(closes).toBe(0) + + // now make it unresponsive + mockRelay.unresponsive = true + + // wait for the second ping to fail, which will trigger a close + await new Promise(resolve => { + const interval = setInterval(() => { + if (closes > 0) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + expect(closes).toBe(1) + expect(relay.connected).toBeFalse() + + // now make it responsive again + mockRelay.unresponsive = false + + // wait for reconnect + await new Promise(resolve => { + const interval = setInterval(() => { + if (relay.connected) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + + expect(relay.connected).toBeTrue() + expect(closes).toBe(1) +}) + +test('reconnect with filter update in pool', async () => { + const mockRelay = mockRelays[0] + const newSince = Math.floor(Date.now() / 1000) + pool = new SimplePool({ + enablePing: true, + enableReconnect: filters => { + return filters.map(f => ({ ...f, since: newSince })) + }, + }) + const relay = await pool.ensureRelay(mockRelay.url) + relay.pingTimeout = 50 + relay.pingFrequency = 50 + relay.resubscribeBackoff = [50, 100] + + let closes = 0 + relay.onclose = () => { + closes++ + } + + expect(relay.connected).toBeTrue() + + const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} }) + expect(sub.filters[0].since).toBe(0) + + // wait for the first ping to succeed + await new Promise(resolve => setTimeout(resolve, 75)) + expect(closes).toBe(0) + + // now make it unresponsive + mockRelay.unresponsive = true + + // wait for the second ping to fail, which will trigger a close + await new Promise(resolve => { + const interval = setInterval(() => { + if (closes > 0) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + expect(closes).toBe(1) + expect(relay.connected).toBeFalse() + + // now make it responsive again + mockRelay.unresponsive = false + + // wait for reconnect + await new Promise(resolve => { + const interval = setInterval(() => { + if (relay.connected) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + + expect(relay.connected).toBeTrue() + expect(closes).toBe(1) + + // check if filter was updated + expect(sub.filters[0].since).toBe(newSince) +}) + test('track relays when publishing', async () => { let event1 = finalizeEvent( { diff --git a/pool.ts b/pool.ts index fb457e8..8c1d3eb 100644 --- a/pool.ts +++ b/pool.ts @@ -1,7 +1,7 @@ /* global WebSocket */ import { verifyEvent } from './pure.ts' -import { AbstractSimplePool } from './abstract-pool.ts' +import { AbstractSimplePool, type AbstractPoolConstructorOptions } from './abstract-pool.ts' var _WebSocket: typeof WebSocket @@ -14,7 +14,7 @@ export function useWebSocketImplementation(websocketImplementation: any) { } export class SimplePool extends AbstractSimplePool { - constructor(options?: { enablePing?: boolean }) { + constructor(options?: Pick) { super({ verifyEvent, websocketImplementation: _WebSocket, ...options }) } } diff --git a/relay.test.ts b/relay.test.ts index 844b4c5..3620185 100644 --- a/relay.test.ts +++ b/relay.test.ts @@ -118,33 +118,215 @@ test('publish timeout', async () => { ).rejects.toThrow('publish timed out') }) -test('ping-pong timeout', async () => { +test('ping-pong timeout (with native ping)', async () => { const mockRelay = new MockRelay() - const relay = new Relay(mockRelay.url, { enablePing: true }) + let pingCalled = false + + // mock a native ping/pong mechanism + ;(MockWebSocketClient.prototype as any).ping = function (this: any) { + pingCalled = true + if (!mockRelay.unresponsive) { + this.dispatchEvent(new Event('pong')) + } + } + ;(MockWebSocketClient.prototype as any).on = function (this: any, event: string, listener: () => void) { + if (event === 'pong') { + this.addEventListener('pong', listener) + } + } + + try { + const relay = new Relay(mockRelay.url, { enablePing: true }) + relay.pingTimeout = 50 + relay.pingFrequency = 50 + + let closed = false + const closedPromise = new Promise(resolve => { + relay.onclose = () => { + closed = true + resolve() + } + }) + + await relay.connect() + expect(relay.connected).toBeTrue() + + // wait for the first ping to succeed + await new Promise(resolve => setTimeout(resolve, 75)) + expect(pingCalled).toBeTrue() + expect(closed).toBeFalse() + + // now make it unresponsive + mockRelay.unresponsive = true + + // wait for the second ping to fail + await closedPromise + + expect(relay.connected).toBeFalse() + expect(closed).toBeTrue() + } finally { + delete (MockWebSocketClient.prototype as any).ping + delete (MockWebSocketClient.prototype as any).on + } +}) + +test('ping-pong timeout (no-ping browser environment)', async () => { + // spy on send to ensure the fallback dummy REQ is used, since MockWebSocketClient has no ping + const originalSend = MockWebSocketClient.prototype.send + let dummyReqSent = false + + try { + MockWebSocketClient.prototype.send = function (message: string) { + if (message.includes('REQ') && message.includes('a'.repeat(64))) { + dummyReqSent = true + } + originalSend.call(this, message) + } + + const mockRelay = new MockRelay() + const relay = new Relay(mockRelay.url, { enablePing: true }) + relay.pingTimeout = 50 + relay.pingFrequency = 50 + + let closed = false + const closedPromise = new Promise(resolve => { + relay.onclose = () => { + closed = true + resolve() + } + }) + + await relay.connect() + expect(relay.connected).toBeTrue() + + // wait for the first ping to succeed + await new Promise(resolve => setTimeout(resolve, 75)) + expect(dummyReqSent).toBeTrue() + expect(closed).toBeFalse() + + // now make it unresponsive + mockRelay.unresponsive = true + + // wait for the second ping to fail + await closedPromise + + expect(relay.connected).toBeFalse() + expect(closed).toBeTrue() + } finally { + MockWebSocketClient.prototype.send = originalSend + } +}) + +test('reconnect on disconnect', async () => { + const mockRelay = new MockRelay() + const relay = new Relay(mockRelay.url, { enablePing: true, enableReconnect: true }) relay.pingTimeout = 50 relay.pingFrequency = 50 + relay.resubscribeBackoff = [50, 100] // short backoff for testing - let closed = false - const closedPromise = new Promise(resolve => { - relay.onclose = () => { - closed = true - resolve() - } - }) + let closes = 0 + relay.onclose = () => { + closes++ + } await relay.connect() expect(relay.connected).toBeTrue() // wait for the first ping to succeed await new Promise(resolve => setTimeout(resolve, 75)) - expect(closed).toBeFalse() + expect(closes).toBe(0) // now make it unresponsive mockRelay.unresponsive = true - // wait for the second ping to fail - await closedPromise - + // wait for the second ping to fail, which will trigger a close + await new Promise(resolve => { + const interval = setInterval(() => { + if (closes > 0) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + expect(closes).toBe(1) expect(relay.connected).toBeFalse() - expect(closed).toBeTrue() + + // now make it responsive again + mockRelay.unresponsive = false + + // wait for reconnect + await new Promise(resolve => { + const interval = setInterval(() => { + if (relay.connected) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + + expect(relay.connected).toBeTrue() + expect(closes).toBe(1) // should not have closed again +}) + +test('reconnect with filter update', async () => { + const mockRelay = new MockRelay() + const newSince = Math.floor(Date.now() / 1000) + const relay = new Relay(mockRelay.url, { + enablePing: true, + enableReconnect: filters => { + return filters.map(f => ({ ...f, since: newSince })) + }, + }) + relay.pingTimeout = 50 + relay.pingFrequency = 50 + relay.resubscribeBackoff = [50, 100] + + let closes = 0 + relay.onclose = () => { + closes++ + } + + await relay.connect() + expect(relay.connected).toBeTrue() + + const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} }) + expect(sub.filters[0].since).toBe(0) + + // wait for the first ping to succeed + await new Promise(resolve => setTimeout(resolve, 75)) + expect(closes).toBe(0) + + // now make it unresponsive + mockRelay.unresponsive = true + + // wait for the second ping to fail, which will trigger a close + await new Promise(resolve => { + const interval = setInterval(() => { + if (closes > 0) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + expect(closes).toBe(1) + expect(relay.connected).toBeFalse() + + // now make it responsive again + mockRelay.unresponsive = false + + // wait for reconnect + await new Promise(resolve => { + const interval = setInterval(() => { + if (relay.connected) { + clearInterval(interval) + resolve(null) + } + }, 10) + }) + + expect(relay.connected).toBeTrue() + expect(closes).toBe(1) + + // check if filter was updated + expect(sub.filters[0].since).toBe(newSince) }) diff --git a/relay.ts b/relay.ts index be6bcdb..6561cac 100644 --- a/relay.ts +++ b/relay.ts @@ -1,7 +1,7 @@ /* global WebSocket */ import { verifyEvent } from './pure.ts' -import { AbstractRelay } from './abstract-relay.ts' +import { AbstractRelay, type AbstractRelayConstructorOptions } from './abstract-relay.ts' var _WebSocket: typeof WebSocket @@ -14,11 +14,14 @@ export function useWebSocketImplementation(websocketImplementation: any) { } export class Relay extends AbstractRelay { - constructor(url: string, options?: { enablePing?: boolean }) { + constructor(url: string, options?: Pick) { super(url, { verifyEvent, websocketImplementation: _WebSocket, ...options }) } - static async connect(url: string, options?: { enablePing?: boolean }): Promise { + static async connect( + url: string, + options?: Pick, + ): Promise { const relay = new Relay(url, options) await relay.connect() return relay