rewrite pool.ts to be much simpler.

This commit is contained in:
fiatjaf
2023-12-17 11:19:44 -03:00
parent 420a6910e9
commit 3bfb50e267
2 changed files with 131 additions and 196 deletions

314
pool.ts
View File

@@ -1,237 +1,167 @@
import { eventsGenerator, relayInit, type Relay, type Sub, type SubscriptionOptions } from './relay.ts'
import { relayConnect, type Relay, SubscriptionParams, Subscription } from './relay.ts'
import { normalizeURL } from './utils.ts'
import type { Event } from './event.ts'
import { matchFilters, mergeFilters, type Filter } from './filter.ts'
import { type Filter } from './filter.ts'
type BatchedRequest = {
filters: Filter[]
relays: string[]
resolve: (events: Event[]) => void
events: Event[]
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose'> & {
eoseSubTimeout: number
onclose?: (reasons: string[]) => void
}
export class SimplePool {
private _conn: { [url: string]: Relay }
private _seenOn: { [id: string]: Set<string> } = {} // a map of all events we've seen in each relay
private batchedByKey: { [batchKey: string]: BatchedRequest[] } = {}
private eoseSubTimeout: number
private getTimeout: number
private seenOnEnabled: boolean = true
private batchInterval: number = 100
constructor(
options: {
eoseSubTimeout?: number
getTimeout?: number
seenOnEnabled?: boolean
batchInterval?: number
} = {},
) {
this._conn = {}
this.eoseSubTimeout = options.eoseSubTimeout || 3400
this.getTimeout = options.getTimeout || 3400
this.seenOnEnabled = options.seenOnEnabled !== false
this.batchInterval = options.batchInterval || 100
}
close(relays: string[]): void {
relays.forEach(url => {
let relay = this._conn[normalizeURL(url)]
if (relay) relay.close()
})
}
private relays = new Map<string, Relay>()
async ensureRelay(url: string): Promise<Relay> {
const nm = normalizeURL(url)
url = normalizeURL(url)
if (!this._conn[nm]) {
this._conn[nm] = relayInit(nm, {
getTimeout: this.getTimeout * 0.9,
listTimeout: this.getTimeout * 0.9,
})
let relay = this.relays.get(url)
if (!relay) {
relay = relayConnect(url)
this.relays.set(url, relay)
}
const relay = this._conn[nm]
await relay.connect()
return relay
}
sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub {
let _knownIds: Set<string> = new Set()
let modifiedOpts = { ...(opts || {}) }
modifiedOpts.alreadyHaveEvent = (id, url) => {
if (opts?.alreadyHaveEvent?.(id, url)) {
async subscribeMany(
relays: string[],
filters: Filter[],
params: SubscribeManyParams,
): Promise<{ close: () => void }> {
const _knownIds = new Set<string>()
params.alreadyHaveEvent = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
if (this.seenOnEnabled) {
let set = this._seenOn[id] || new Set()
set.add(url)
this._seenOn[id] = set
}
return _knownIds.has(id)
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
let subs: Sub[] = []
let eventListeners: Set<any> = new Set()
let eoseListeners: Set<() => void> = new Set()
const subs: Subscription[] = []
// batch all EOSEs into a single
let eosesMissing = relays.length
let handleEose = () => {
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
params.oneose?.()
}
}
const eoseTimeout = setTimeout(() => {
handleEose = () => {}
params.oneose?.()
}, params.eoseSubTimeout || 5400)
let eoseSent = false
let eoseTimeout = setTimeout(
() => {
eoseSent = true
for (let cb of eoseListeners.values()) cb()
},
opts?.eoseSubTimeout || this.eoseSubTimeout,
)
// batch all closes into a single
const closesReceived: string[] = []
const handleClose = (i: number, reason: string) => {
handleEose()
closesReceived[i] = reason
if (closesReceived.length === relays.length) {
params.onclose?.(closesReceived)
}
}
relays
.filter((r, i, a) => a.indexOf(r) === i)
.forEach(async relay => {
let r
// open a subscription in all given relays
await Promise.all(
relays.map(normalizeURL).map(async (url, i) => {
if (relays.indexOf(url) !== i) {
// duplicate
handleClose(i, 'duplicate')
return
}
let relay: Relay
try {
r = await this.ensureRelay(relay)
relay = await this.ensureRelay(url)
} catch (err) {
handleEose()
return
}
if (!r) return
let s = r.sub(filters, modifiedOpts)
s.on('event', event => {
_knownIds.add(event.id as string)
for (let cb of eventListeners.values()) cb(event)
})
s.on('eose', () => {
if (eoseSent) return
handleEose()
})
subs.push(s)
function handleEose() {
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
for (let cb of eoseListeners.values()) cb()
}
}
})
let subscription = await relay.subscribe(filters, {
...params,
oneose: handleEose,
onclose: reason => handleClose(i, reason),
})
let greaterSub: Sub = {
sub(filters, opts) {
subs.forEach(sub => sub.sub(filters, opts))
return greaterSub as any
},
unsub() {
subs.forEach(sub => sub.unsub())
},
on(type, cb) {
if (type === 'event') {
eventListeners.add(cb)
} else if (type === 'eose') {
eoseListeners.add(cb as () => void | Promise<void>)
}
},
off(type, cb) {
if (type === 'event') {
eventListeners.delete(cb)
} else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise<void>)
},
get events() {
return eventsGenerator(greaterSub)
subs.push(subscription)
}),
)
return {
close() {
subs.forEach(sub => {
sub.close()
})
},
}
return greaterSub
}
get(relays: string[], filter: Filter, opts?: SubscriptionOptions): Promise<Event | null> {
return new Promise(resolve => {
let sub = this.sub(relays, [filter], opts)
let timeout = setTimeout(() => {
sub.unsub()
resolve(null)
}, this.getTimeout)
sub.on('event', event => {
resolve(event)
clearTimeout(timeout)
sub.unsub()
async subscribeManyEose(
relays: string[],
filters: Filter[],
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'eoseSubTimeout'>,
): Promise<{ close: () => void }> {
const sub = await this.subscribeMany(relays, filters, {
...params,
oneose() {
sub.close()
},
})
return sub
}
get(
relays: string[],
filter: Filter,
params: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>,
): Promise<Event | null> {
return new Promise(async (resolve, reject) => {
const sub = await this.subscribeManyEose(relays, [filter], {
...params,
onevent(event: Event) {
resolve(event)
sub.close()
},
onclose(reasons: string[]) {
const err = new Error('subscriptions closed')
err.cause = reasons
reject(err)
},
})
})
}
list(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Promise<Event[]> {
return new Promise(resolve => {
let events: Event[] = []
let sub = this.sub(relays, filters, opts)
sub.on('event', event => {
events.push(event)
})
// we can rely on an eose being emitted here because pool.sub() will fake one
sub.on('eose', () => {
sub.unsub()
resolve(events)
})
})
}
batchedList(batchKey: string, relays: string[], filters: Filter[]): Promise<Event[]> {
return new Promise(resolve => {
if (!this.batchedByKey[batchKey]) {
this.batchedByKey[batchKey] = [
{
filters,
relays,
resolve,
events: [],
},
]
setTimeout(() => {
Object.keys(this.batchedByKey).forEach(async batchKey => {
const batchedRequests = this.batchedByKey[batchKey]
const filters = [] as Filter[]
const relays = [] as string[]
batchedRequests.forEach(br => {
filters.push(...br.filters)
relays.push(...br.relays)
})
const sub = this.sub(relays, [mergeFilters(...filters)])
sub.on('event', event => {
batchedRequests.forEach(br => matchFilters(br.filters, event) && br.events.push(event))
})
sub.on('eose', () => {
sub.unsub()
batchedRequests.forEach(br => br.resolve(br.events))
})
delete this.batchedByKey[batchKey]
})
}, this.batchInterval)
} else {
this.batchedByKey[batchKey].push({
filters,
relays,
resolve,
events: [],
})
publish(relays: string[], event: Event): Promise<string>[] {
return relays.map(normalizeURL).map(async (url, i) => {
if (relays.indexOf(url) !== i) {
// duplicate
return Promise.reject('duplicate')
}
})
}
publish(relays: string[], event: Event): Promise<void>[] {
return relays.map(async relay => {
let r = await this.ensureRelay(relay)
let r = await this.ensureRelay(url)
return r.publish(event)
})
}
}
seenOn(id: string): string[] {
return Array.from(this._seenOn[id]?.values?.() || [])
export class RelayTrackingPool extends SimplePool {
public seenOn = new Map<string, Set<Relay>>()
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): Promise<{ close: () => void }> {
params.receivedEvent = (relay: Relay, id: string) => {
let set = this.seenOn.get(id)
if (!set) {
set = new Set()
this.seenOn.set(id, set)
}
set.add(relay)
}
return super.subscribeMany(relays, filters, params)
}
}

View File

@@ -128,8 +128,13 @@ export class Relay {
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
so.receivedEvent?.(id) // this is so the client knows this relay had this event
if (so.alreadyHaveEvent?.(id)) {
const alreadyHave = so.alreadyHaveEvent?.(id)
// notify any interested client that the relay has this event
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
so.receivedEvent?.(this, id)
if (alreadyHave) {
// if we had already seen this event we can just stop here
return
}
@@ -263,7 +268,7 @@ export class Subscription {
public eosed: boolean = false
public alreadyHaveEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((relay: Relay, id: string) => void) | undefined
public readonly filters: Filter[]
public onevent: (evt: Event) => void
@@ -298,7 +303,7 @@ export type SubscriptionParams = {
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent?: (id: string) => boolean
receivedEvent?: (id: string) => boolean
receivedEvent?: (relay: Relay, id: string) => void
}
export type CountResolver = {