diff --git a/pool.ts b/pool.ts index 2370739..fa66f62 100644 --- a/pool.ts +++ b/pool.ts @@ -1,237 +1,167 @@ -import { eventsGenerator, relayInit, type Relay, type Sub, type SubscriptionOptions } from './relay.ts' +import { relayConnect, type Relay, SubscriptionParams, Subscription } from './relay.ts' import { normalizeURL } from './utils.ts' import type { Event } from './event.ts' -import { matchFilters, mergeFilters, type Filter } from './filter.ts' +import { type Filter } from './filter.ts' -type BatchedRequest = { - filters: Filter[] - relays: string[] - resolve: (events: Event[]) => void - events: Event[] +export type SubscribeManyParams = Omit & { + eoseSubTimeout: number + onclose?: (reasons: string[]) => void } export class SimplePool { - private _conn: { [url: string]: Relay } - private _seenOn: { [id: string]: Set } = {} // a map of all events we've seen in each relay - private batchedByKey: { [batchKey: string]: BatchedRequest[] } = {} - - private eoseSubTimeout: number - private getTimeout: number - private seenOnEnabled: boolean = true - private batchInterval: number = 100 - - constructor( - options: { - eoseSubTimeout?: number - getTimeout?: number - seenOnEnabled?: boolean - batchInterval?: number - } = {}, - ) { - this._conn = {} - this.eoseSubTimeout = options.eoseSubTimeout || 3400 - this.getTimeout = options.getTimeout || 3400 - this.seenOnEnabled = options.seenOnEnabled !== false - this.batchInterval = options.batchInterval || 100 - } - - close(relays: string[]): void { - relays.forEach(url => { - let relay = this._conn[normalizeURL(url)] - if (relay) relay.close() - }) - } + private relays = new Map() async ensureRelay(url: string): Promise { - const nm = normalizeURL(url) + url = normalizeURL(url) - if (!this._conn[nm]) { - this._conn[nm] = relayInit(nm, { - getTimeout: this.getTimeout * 0.9, - listTimeout: this.getTimeout * 0.9, - }) + let relay = this.relays.get(url) + if (!relay) { + relay = relayConnect(url) + this.relays.set(url, relay) } - const relay = this._conn[nm] - await relay.connect() return relay } - sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub { - let _knownIds: Set = new Set() - let modifiedOpts = { ...(opts || {}) } - modifiedOpts.alreadyHaveEvent = (id, url) => { - if (opts?.alreadyHaveEvent?.(id, url)) { + async subscribeMany( + relays: string[], + filters: Filter[], + params: SubscribeManyParams, + ): Promise<{ close: () => void }> { + const _knownIds = new Set() + params.alreadyHaveEvent = (id: string) => { + if (params.alreadyHaveEvent?.(id)) { return true } - if (this.seenOnEnabled) { - let set = this._seenOn[id] || new Set() - set.add(url) - this._seenOn[id] = set - } - return _knownIds.has(id) + const have = _knownIds.has(id) + _knownIds.add(id) + return have } - let subs: Sub[] = [] - let eventListeners: Set = new Set() - let eoseListeners: Set<() => void> = new Set() + const subs: Subscription[] = [] + + // batch all EOSEs into a single let eosesMissing = relays.length + let handleEose = () => { + eosesMissing-- + if (eosesMissing === 0) { + clearTimeout(eoseTimeout) + params.oneose?.() + } + } + const eoseTimeout = setTimeout(() => { + handleEose = () => {} + params.oneose?.() + }, params.eoseSubTimeout || 5400) - let eoseSent = false - let eoseTimeout = setTimeout( - () => { - eoseSent = true - for (let cb of eoseListeners.values()) cb() - }, - opts?.eoseSubTimeout || this.eoseSubTimeout, - ) + // batch all closes into a single + const closesReceived: string[] = [] + const handleClose = (i: number, reason: string) => { + handleEose() + closesReceived[i] = reason + if (closesReceived.length === relays.length) { + params.onclose?.(closesReceived) + } + } - relays - .filter((r, i, a) => a.indexOf(r) === i) - .forEach(async relay => { - let r + // open a subscription in all given relays + await Promise.all( + relays.map(normalizeURL).map(async (url, i) => { + if (relays.indexOf(url) !== i) { + // duplicate + handleClose(i, 'duplicate') + return + } + + let relay: Relay try { - r = await this.ensureRelay(relay) + relay = await this.ensureRelay(url) } catch (err) { handleEose() return } - if (!r) return - let s = r.sub(filters, modifiedOpts) - s.on('event', event => { - _knownIds.add(event.id as string) - for (let cb of eventListeners.values()) cb(event) - }) - s.on('eose', () => { - if (eoseSent) return - handleEose() - }) - subs.push(s) - function handleEose() { - eosesMissing-- - if (eosesMissing === 0) { - clearTimeout(eoseTimeout) - for (let cb of eoseListeners.values()) cb() - } - } - }) + let subscription = await relay.subscribe(filters, { + ...params, + oneose: handleEose, + onclose: reason => handleClose(i, reason), + }) - let greaterSub: Sub = { - sub(filters, opts) { - subs.forEach(sub => sub.sub(filters, opts)) - return greaterSub as any - }, - unsub() { - subs.forEach(sub => sub.unsub()) - }, - on(type, cb) { - if (type === 'event') { - eventListeners.add(cb) - } else if (type === 'eose') { - eoseListeners.add(cb as () => void | Promise) - } - }, - off(type, cb) { - if (type === 'event') { - eventListeners.delete(cb) - } else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise) - }, - get events() { - return eventsGenerator(greaterSub) + subs.push(subscription) + }), + ) + + return { + close() { + subs.forEach(sub => { + sub.close() + }) }, } - - return greaterSub } - get(relays: string[], filter: Filter, opts?: SubscriptionOptions): Promise { - return new Promise(resolve => { - let sub = this.sub(relays, [filter], opts) - let timeout = setTimeout(() => { - sub.unsub() - resolve(null) - }, this.getTimeout) - sub.on('event', event => { - resolve(event) - clearTimeout(timeout) - sub.unsub() + async subscribeManyEose( + relays: string[], + filters: Filter[], + params: Pick, + ): Promise<{ close: () => void }> { + const sub = await this.subscribeMany(relays, filters, { + ...params, + oneose() { + sub.close() + }, + }) + return sub + } + + get( + relays: string[], + filter: Filter, + params: Pick, + ): Promise { + return new Promise(async (resolve, reject) => { + const sub = await this.subscribeManyEose(relays, [filter], { + ...params, + onevent(event: Event) { + resolve(event) + sub.close() + }, + onclose(reasons: string[]) { + const err = new Error('subscriptions closed') + err.cause = reasons + reject(err) + }, }) }) } - list(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Promise { - return new Promise(resolve => { - let events: Event[] = [] - let sub = this.sub(relays, filters, opts) - - sub.on('event', event => { - events.push(event) - }) - - // we can rely on an eose being emitted here because pool.sub() will fake one - sub.on('eose', () => { - sub.unsub() - resolve(events) - }) - }) - } - - batchedList(batchKey: string, relays: string[], filters: Filter[]): Promise { - return new Promise(resolve => { - if (!this.batchedByKey[batchKey]) { - this.batchedByKey[batchKey] = [ - { - filters, - relays, - resolve, - events: [], - }, - ] - - setTimeout(() => { - Object.keys(this.batchedByKey).forEach(async batchKey => { - const batchedRequests = this.batchedByKey[batchKey] - - const filters = [] as Filter[] - const relays = [] as string[] - batchedRequests.forEach(br => { - filters.push(...br.filters) - relays.push(...br.relays) - }) - - const sub = this.sub(relays, [mergeFilters(...filters)]) - sub.on('event', event => { - batchedRequests.forEach(br => matchFilters(br.filters, event) && br.events.push(event)) - }) - sub.on('eose', () => { - sub.unsub() - batchedRequests.forEach(br => br.resolve(br.events)) - }) - - delete this.batchedByKey[batchKey] - }) - }, this.batchInterval) - } else { - this.batchedByKey[batchKey].push({ - filters, - relays, - resolve, - events: [], - }) + publish(relays: string[], event: Event): Promise[] { + return relays.map(normalizeURL).map(async (url, i) => { + if (relays.indexOf(url) !== i) { + // duplicate + return Promise.reject('duplicate') } - }) - } - publish(relays: string[], event: Event): Promise[] { - return relays.map(async relay => { - let r = await this.ensureRelay(relay) + let r = await this.ensureRelay(url) return r.publish(event) }) } +} - seenOn(id: string): string[] { - return Array.from(this._seenOn[id]?.values?.() || []) +export class RelayTrackingPool extends SimplePool { + public seenOn = new Map>() + + subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): Promise<{ close: () => void }> { + params.receivedEvent = (relay: Relay, id: string) => { + let set = this.seenOn.get(id) + if (!set) { + set = new Set() + this.seenOn.set(id, set) + } + set.add(relay) + } + + return super.subscribeMany(relays, filters, params) } } diff --git a/relay.ts b/relay.ts index 82df7da..b18ac26 100644 --- a/relay.ts +++ b/relay.ts @@ -128,8 +128,13 @@ export class Relay { // we do this before parsing the JSON to not have to do that for duplicate events // since JSON parsing is slow const id = getHex64(json, 'id') - so.receivedEvent?.(id) // this is so the client knows this relay had this event - if (so.alreadyHaveEvent?.(id)) { + const alreadyHave = so.alreadyHaveEvent?.(id) + + // notify any interested client that the relay has this event + // (do this after alreadyHaveEvent() because the client may rely on this to answer that) + so.receivedEvent?.(this, id) + + if (alreadyHave) { // if we had already seen this event we can just stop here return } @@ -263,7 +268,7 @@ export class Subscription { public eosed: boolean = false public alreadyHaveEvent: ((id: string) => boolean) | undefined - public receivedEvent: ((id: string) => boolean) | undefined + public receivedEvent: ((relay: Relay, id: string) => void) | undefined public readonly filters: Filter[] public onevent: (evt: Event) => void @@ -298,7 +303,7 @@ export type SubscriptionParams = { oneose?: () => void onclose?: (reason: string) => void alreadyHaveEvent?: (id: string) => boolean - receivedEvent?: (id: string) => boolean + receivedEvent?: (relay: Relay, id: string) => void } export type CountResolver = {