pool: deprecate subscribeManyMap and introduce subscribe/subscribeEose methods that take a single filter.

This commit is contained in:
fiatjaf 2025-04-02 10:37:10 -03:00
parent 4a738c93d0
commit 303c35120c
1 changed files with 135 additions and 4 deletions

View File

@ -62,10 +62,127 @@ export class AbstractSimplePool {
})
}
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
return this.subscribeManyMap(Object.fromEntries(relays.map(url => [url, filters])), params)
subscribe(relays: string[], filter: Filter, params: SubscribeManyParams): SubCloser {
return this.subscribeMap(
relays.map(url => ({ url, filter })),
params,
)
}
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
return this.subscribeMap(
relays.flatMap(url => filters.map(filter => ({ url, filter }))),
params,
)
}
subscribeMap(requests: { url: string; filter: Filter }[], params: SubscribeManyParams): SubCloser {
if (this.trackRelays) {
params.receivedEvent = (relay: AbstractRelay, id: string) => {
let set = this.seenOn.get(id)
if (!set) {
set = new Set()
this.seenOn.set(id, set)
}
set.add(relay)
}
}
const _knownIds = new Set<string>()
const subs: Subscription[] = []
// batch all EOSEs into a single
const eosesReceived: boolean[] = []
let handleEose = (i: number) => {
if (eosesReceived[i]) return // do not act twice for the same relay
eosesReceived[i] = true
if (eosesReceived.filter(a => a).length === requests.length) {
params.oneose?.()
handleEose = () => {}
}
}
// batch all closes into a single
const closesReceived: string[] = []
let handleClose = (i: number, reason: string) => {
if (closesReceived[i]) return // do not act twice for the same relay
handleEose(i)
closesReceived[i] = reason
if (closesReceived.filter(a => a).length === requests.length) {
params.onclose?.(closesReceived)
handleClose = () => {}
}
}
const localAlreadyHaveEventHandler = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
// open a subscription in all given relays
const allOpened = Promise.all(
requests.map(async ({ url, filter }, i) => {
url = normalizeURL(url)
let relay: AbstractRelay
try {
relay = await this.ensureRelay(url, {
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
})
} catch (err) {
handleClose(i, (err as any)?.message || String(err))
return
}
let subscription = relay.subscribe([filter], {
...params,
oneose: () => handleEose(i),
onclose: reason => {
if (reason.startsWith('auth-required:') && params.doauth) {
relay
.auth(params.doauth)
.then(() => {
relay.subscribe([filter], {
...params,
oneose: () => handleEose(i),
onclose: reason => {
handleClose(i, reason) // the second time we won't try to auth anymore
},
alreadyHaveEvent: localAlreadyHaveEventHandler,
eoseTimeout: params.maxWait,
})
})
.catch(err => {
handleClose(i, `auth was required and attempted, but failed with: ${err}`)
})
} else {
handleClose(i, reason)
}
},
alreadyHaveEvent: localAlreadyHaveEventHandler,
eoseTimeout: params.maxWait,
})
subs.push(subscription)
}),
)
return {
async close() {
await allOpened
subs.forEach(sub => {
sub.close()
})
},
}
}
/**
* @deprecated Use subscribeMap instead.
*/
subscribeManyMap(requests: { [relay: string]: Filter[] }, params: SubscribeManyParams): SubCloser {
if (this.trackRelays) {
params.receivedEvent = (relay: AbstractRelay, id: string) => {
@ -178,10 +295,24 @@ export class AbstractSimplePool {
}
}
subscribeEose(
relays: string[],
filter: Filter,
params: Pick<SubscribeManyParams, 'label' | 'id' | 'onevent' | 'onclose' | 'maxWait' | 'doauth'>,
): SubCloser {
const subcloser = this.subscribe(relays, filter, {
...params,
oneose() {
subcloser.close()
},
})
return subcloser
}
subscribeManyEose(
relays: string[],
filters: Filter[],
params: Pick<SubscribeManyParams, 'label' | 'id' | 'onevent' | 'onclose' | 'maxWait'>,
params: Pick<SubscribeManyParams, 'label' | 'id' | 'onevent' | 'onclose' | 'maxWait' | 'doauth'>,
): SubCloser {
const subcloser = this.subscribeMany(relays, filters, {
...params,
@ -199,7 +330,7 @@ export class AbstractSimplePool {
): Promise<Event[]> {
return new Promise(async resolve => {
const events: Event[] = []
this.subscribeManyEose(relays, [filter], {
this.subscribeEose(relays, filter, {
...params,
onevent(event: Event) {
events.push(event)