import { relayConnect, type Relay, SubscriptionParams, Subscription } from './relay.ts' import { normalizeURL } from './utils.ts' import type { Event } from './event.ts' import { type Filter } from './filter.ts' export type SubscribeManyParams = Omit & { eoseSubTimeout?: number onclose?: (reasons: string[]) => void id?: string } export class SimplePool { private relays = new Map() public seenOn = new Map>() public trackRelays: boolean = false async ensureRelay(url: string): Promise { url = normalizeURL(url) let relay = this.relays.get(url) if (!relay) { relay = relayConnect(url) this.relays.set(url, relay) } return relay } async subscribeMany( relays: string[], filters: Filter[], params: SubscribeManyParams, ): Promise<{ close: () => void }> { if (this.trackRelays) { params.receivedEvent = (relay: Relay, id: string) => { let set = this.seenOn.get(id) if (!set) { set = new Set() this.seenOn.set(id, set) } set.add(relay) } } const _knownIds = new Set() const subs: Subscription[] = [] // batch all EOSEs into a single let eosesMissing = relays.length let handleEose = () => { eosesMissing-- if (eosesMissing === 0) { clearTimeout(eoseTimeout) params.oneose?.() } } const eoseTimeout = setTimeout(() => { handleEose = () => {} params.oneose?.() }, params.eoseSubTimeout || 5400) // batch all closes into a single const closesReceived: string[] = [] const handleClose = (i: number, reason: string) => { handleEose() closesReceived[i] = reason if (closesReceived.length === relays.length) { params.onclose?.(closesReceived) } } const localAlreadyHaveEventHandler = (id: string) => { if (params.alreadyHaveEvent?.(id)) { return true } const have = _knownIds.has(id) _knownIds.add(id) return have } // open a subscription in all given relays await Promise.all( relays.map(normalizeURL).map(async (url, i, arr) => { if (arr.indexOf(url) !== i) { // duplicate handleClose(i, 'duplicate url') return } let relay: Relay try { relay = await this.ensureRelay(url) } catch (err) { handleEose() return } let subscription = await relay.subscribe(filters, { ...params, oneose: handleEose, onclose: reason => handleClose(i, reason), alreadyHaveEvent: localAlreadyHaveEventHandler, }) subs.push(subscription) }), ) return { close() { subs.forEach(sub => { sub.close() }) }, } } async subscribeManyEose( relays: string[], filters: Filter[], params: Pick, ): Promise<{ close: () => void }> { const sub = await this.subscribeMany(relays, filters, { ...params, oneose() { setTimeout(() => { sub.close() }, 0) }, }) return sub } async querySync( relays: string[], filter: Filter, params?: Pick, ): Promise { return new Promise(async resolve => { const events: Event[] = [] await this.subscribeManyEose(relays, [filter], { ...params, onevent(event: Event) { events.push(event) }, onclose(_: string[]) { resolve(events) }, }) }) } async get( relays: string[], filter: Filter, params?: Pick, ): Promise { filter.limit = 1 const events = await this.querySync(relays, filter, params) events.sort((a, b) => b.created_at - a.created_at) return events[0] || null } publish(relays: string[], event: Event): Promise[] { return relays.map(normalizeURL).map(async (url, i, arr) => { if (arr.indexOf(url) !== i) { // duplicate return Promise.reject('duplicate url') } let r = await this.ensureRelay(url) return r.publish(event) }) } }