diff --git a/abstract-pool.ts b/abstract-pool.ts index 468409b..ba6f8c7 100644 --- a/abstract-pool.ts +++ b/abstract-pool.ts @@ -23,6 +23,7 @@ export type AbstractPoolConstructorOptions = AbstractRelayConstructorOptions & { export type SubscribeManyParams = Omit & { maxWait?: number + abort?: AbortSignal onclose?: (reasons: string[]) => void onauth?: (event: EventTemplate) => Promise id?: string @@ -50,7 +51,13 @@ export class AbstractSimplePool { this.automaticallyAuth = opts.automaticallyAuth } - async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise { + async ensureRelay( + url: string, + params?: { + connectionTimeout?: number + abort?: AbortSignal + }, + ): Promise { url = normalizeURL(url) let relay = this.relays.get(url) @@ -66,7 +73,6 @@ export class AbstractSimplePool { this.relays.delete(url) } } - if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout this.relays.set(url, relay) } @@ -77,7 +83,10 @@ export class AbstractSimplePool { } } - await relay.connect() + await relay.connect({ + timeout: params?.connectionTimeout, + abort: params?.abort, + }) return relay } @@ -176,6 +185,7 @@ export class AbstractSimplePool { try { relay = await this.ensureRelay(url, { connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined, + abort: params.abort, }) } catch (err) { handleClose(i, (err as any)?.message || String(err)) @@ -198,6 +208,7 @@ export class AbstractSimplePool { }, alreadyHaveEvent: localAlreadyHaveEventHandler, eoseTimeout: params.maxWait, + abort: params.abort, }) }) .catch(err => { @@ -209,6 +220,7 @@ export class AbstractSimplePool { }, alreadyHaveEvent: localAlreadyHaveEventHandler, eoseTimeout: params.maxWait, + abort: params.abort, }) subs.push(subscription) diff --git a/abstract-relay.ts b/abstract-relay.ts index b49929d..ba3e490 100644 --- a/abstract-relay.ts +++ b/abstract-relay.ts @@ -35,7 +35,6 @@ export class AbstractRelay { public onauth: undefined | ((evt: EventTemplate) => Promise) public baseEoseTimeout: number = 4400 - public connectionTimeout: number = 4400 public publishTimeout: number = 4400 public pingFrequency: number = 29000 public pingTimeout: number = 20000 @@ -43,7 +42,6 @@ export class AbstractRelay { public openSubs: Map = new Map() public enablePing: boolean | undefined public enableReconnect: boolean - private connectionTimeoutHandle: ReturnType | undefined private reconnectTimeoutHandle: ReturnType | undefined private pingIntervalHandle: ReturnType | undefined private reconnectAttempts: number = 0 @@ -70,9 +68,12 @@ export class AbstractRelay { this.enableReconnect = opts.enableReconnect || false } - static async connect(url: string, opts: AbstractRelayConstructorOptions): Promise { + static async connect( + url: string, + opts: AbstractRelayConstructorOptions & Parameters[0], + ): Promise { const relay = new AbstractRelay(url, opts) - await relay.connect() + await relay.connect(opts) return relay } @@ -131,23 +132,31 @@ export class AbstractRelay { } } - public async connect(): Promise { + public async connect(opts?: { timeout?: number; abort?: AbortSignal }): Promise { + let connectionTimeoutHandle: ReturnType | undefined + if (this.connectionPromise) return this.connectionPromise this.challenge = undefined this.authPromise = undefined this.connectionPromise = new Promise((resolve, reject) => { - this.connectionTimeoutHandle = setTimeout(() => { - reject('connection timed out') - this.connectionPromise = undefined - this.onclose?.() - this.closeAllSubscriptions('relay connection timed out') - }, this.connectionTimeout) + if (opts?.timeout) { + connectionTimeoutHandle = setTimeout(() => { + reject('connection timed out') + this.connectionPromise = undefined + this.onclose?.() + this.closeAllSubscriptions('relay connection timed out') + }, opts.timeout) + } + + if (opts?.abort) { + opts.abort.onabort = reject + } try { this.ws = new this._WebSocket(this.url) } catch (err) { - clearTimeout(this.connectionTimeoutHandle) + clearTimeout(connectionTimeoutHandle) reject(err) return } @@ -157,7 +166,7 @@ export class AbstractRelay { clearTimeout(this.reconnectTimeoutHandle) this.reconnectTimeoutHandle = undefined } - clearTimeout(this.connectionTimeoutHandle) + clearTimeout(connectionTimeoutHandle) this._connected = true const isReconnection = this.reconnectAttempts > 0 @@ -183,13 +192,13 @@ export class AbstractRelay { } this.ws.onerror = ev => { - clearTimeout(this.connectionTimeoutHandle) + clearTimeout(connectionTimeoutHandle) reject((ev as any).message || 'websocket error') this.handleHardClose('relay connection errored') } this.ws.onclose = ev => { - clearTimeout(this.connectionTimeoutHandle) + clearTimeout(connectionTimeoutHandle) reject((ev as any).message || 'websocket closed') this.handleHardClose('relay connection closed') } @@ -437,6 +446,11 @@ export class AbstractRelay { ): Subscription { const sub = this.prepareSubscription(filters, params) sub.fire() + + if (params.abort) { + params.abort.onabort = () => sub.close(String(params.abort!.reason || '')) + } + return sub } @@ -563,6 +577,7 @@ export type SubscriptionParams = { alreadyHaveEvent?: (id: string) => boolean receivedEvent?: (relay: AbstractRelay, id: string) => void eoseTimeout?: number + abort?: AbortSignal } export type CountResolver = {