change the way eose and connection timeouts work.

This commit is contained in:
fiatjaf 2023-12-18 17:11:16 -03:00
parent 965ebdb6d1
commit 804403f574
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
3 changed files with 85 additions and 49 deletions

View File

@ -74,7 +74,7 @@ test('same with double querying', async () => {
})
test('querySync()', async () => {
let events = await pool.querySync([...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'], {
let events = await pool.querySync([...relays.slice(2), 'wss://offchain.pub', 'wss://eden.nostr.land'], {
authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'],
kinds: [1],
limit: 2,

50
pool.ts
View File

@ -1,4 +1,4 @@
import { relayConnect, type Relay, SubscriptionParams, Subscription } from './relay.ts'
import { Relay, SubscriptionParams, Subscription } from './relay.ts'
import { normalizeURL } from './utils.ts'
import type { Event } from './event.ts'
@ -7,7 +7,7 @@ import { type Filter } from './filter.ts'
export type SubCloser = { close: () => void }
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
eoseSubTimeout?: number
maxWait?: number
onclose?: (reasons: string[]) => void
id?: string
}
@ -17,13 +17,18 @@ export class SimplePool {
public seenOn = new Map<string, Set<Relay>>()
public trackRelays: boolean = false
async ensureRelay(url: string): Promise<Relay> {
public trustedRelayURLs = new Set<string>()
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<Relay> {
url = normalizeURL(url)
let relay = this.relays.get(url)
if (!relay) {
relay = relayConnect(url)
relay = new Relay(url)
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
if (this.trustedRelayURLs.has(relay.url)) relay.trusted = true
this.relays.set(url, relay)
await relay.connect()
}
return relay
@ -45,26 +50,22 @@ export class SimplePool {
const subs: Subscription[] = []
// batch all EOSEs into a single
let eosesMissing = relays.length
let handleEose = () => {
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
const eosesReceived: boolean[] = []
let handleEose = (i: number) => {
eosesReceived[i] = true
if (eosesReceived.filter(a => a).length === relays.length) {
params.oneose?.()
handleEose = () => {}
}
}
const eoseTimeout = setTimeout(() => {
handleEose = () => {}
params.oneose?.()
}, params.eoseSubTimeout || 3400)
// batch all closes into a single
const closesReceived: string[] = []
const handleClose = (i: number, reason: string) => {
handleEose()
let handleClose = (i: number, reason: string) => {
handleEose(i)
closesReceived[i] = reason
if (closesReceived.length === relays.length) {
if (closesReceived.filter(a => a).length === relays.length) {
params.onclose?.(closesReceived)
handleClose = () => {}
}
}
@ -88,17 +89,20 @@ export class SimplePool {
let relay: Relay
try {
relay = await this.ensureRelay(url)
relay = await this.ensureRelay(url, {
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
})
} catch (err) {
handleEose()
handleClose(i, (err as any)?.message || String(err))
return
}
let subscription = await relay.subscribe(filters, {
...params,
oneose: handleEose,
oneose: () => handleEose(i),
onclose: reason => handleClose(i, reason),
alreadyHaveEvent: localAlreadyHaveEventHandler,
eoseTimeout: params.maxWait,
})
subs.push(subscription)
@ -118,7 +122,7 @@ export class SimplePool {
subscribeManyEose(
relays: string[],
filters: Filter[],
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'eoseSubTimeout'>,
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'maxWait'>,
): SubCloser {
const subcloser = this.subscribeMany(relays, filters, {
...params,
@ -132,7 +136,7 @@ export class SimplePool {
async querySync(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event[]> {
return new Promise(async resolve => {
const events: Event[] = []
@ -151,7 +155,7 @@ export class SimplePool {
async get(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event | null> {
filter.limit = 1
const events = await this.querySync(relays, filter, params)

View File

@ -18,7 +18,11 @@ export class Relay {
public trusted: boolean = false
public onclose: (() => void) | null = null
public onnotice: (msg: string) => void = msg => console.log(`NOTICE from ${this.url}: ${msg}`)
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
public baseEoseTimeout: number = 4400
public connectionTimeout: number = 8800
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private connectionPromise: Promise<void> | undefined
private openSubs = new Map<string, Subscription>()
@ -60,6 +64,13 @@ export class Relay {
this.challenge = undefined
this.connectionPromise = new Promise((resolve, reject) => {
this.connectionTimeoutHandle = setTimeout(() => {
reject('connection timed out')
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection timed out')
}, this.connectionTimeout)
try {
this.ws = new WebSocket(this.url)
} catch (err) {
@ -68,6 +79,7 @@ export class Relay {
}
this.ws.onopen = () => {
clearTimeout(this.connectionTimeoutHandle)
this._connected = true
resolve()
}
@ -166,9 +178,8 @@ export class Relay {
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so || so.eosed) return
so.eosed = true
so.oneose?.()
if (!so) return
so.receivedEose()
return
}
case 'OK': {
@ -204,7 +215,6 @@ export class Relay {
}
public async send(message: string) {
await this.connect()
this.ws?.send(message)
}
@ -237,20 +247,16 @@ export class Relay {
public async subscribe(filters: Filter[], params: Partial<SubscriptionParams>): Promise<Subscription> {
await this.connect()
const subscription = this.prepareSubscription(filters, params)
subscription.fire()
return subscription
}
public prepareSubscription(filters: Filter[], params: Partial<SubscriptionParams> & { id?: string }): Subscription {
this.serial++
const id = params.id || 'sub:' + this.serial
const subscription = new Subscription(this, filters, {
onevent: event => {
console.warn(
`onevent() callback not defined for subscription '${id}' in relay ${this.url}. event received:`,
event,
)
},
...params,
id,
})
const subscription = new Subscription(this, id, filters, params)
this.openSubs.set(id, subscription)
this.send('["REQ","' + id + '",' + JSON.stringify(filters).substring(1))
return subscription
}
@ -264,26 +270,52 @@ export class Relay {
export class Subscription {
public readonly relay: Relay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
public filters: Filter[]
public alreadyHaveEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((relay: Relay, id: string) => void) | undefined
public readonly filters: Filter[]
public onevent: (evt: Event) => void
public oneose: (() => void) | undefined
public onclose: ((reason: string) => void) | undefined
constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) {
public eoseTimeout: number
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined
constructor(relay: Relay, id: string, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = params.id
this.onevent = params.onevent
this.oneose = params.oneose
this.onclose = params.onclose
this.id = id
this.alreadyHaveEvent = params.alreadyHaveEvent
this.receivedEvent = params.receivedEvent
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout
this.oneose = params.oneose
this.onclose = params.onclose
this.onevent =
params.onevent ||
(event => {
console.warn(
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
event,
)
})
}
public fire() {
this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1))
// only now we start counting the eoseTimeout
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout)
}
public receivedEose() {
if (this.eosed) return
clearTimeout(this.eoseTimeoutHandle)
this.eosed = true
this.oneose?.()
}
public close(reason: string = 'closed by caller') {
@ -298,12 +330,12 @@ export class Subscription {
}
export type SubscriptionParams = {
id: string
onevent: (evt: Event) => void
onevent?: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent?: (id: string) => boolean
receivedEvent?: (relay: Relay, id: string) => void
eoseTimeout?: number
}
export type CountResolver = {