add batchedList method to SimplePool

This commit is contained in:
Sepehr Safari 2023-08-21 15:07:29 +03:30 committed by fiatjaf_
parent bce976fecd
commit 0925f5db81
3 changed files with 692 additions and 752 deletions

View File

@ -1,6 +1,6 @@
{
"name": "nostr-tools",
"version": "1.14.1",
"version": "1.14.2",
"description": "Tools for making a Nostr client.",
"repository": {
"type": "git",

122
pool.ts
View File

@ -7,26 +7,38 @@ import {
import {normalizeURL} from './utils.ts'
import type {Event} from './event.ts'
import type {Filter} from './filter.ts'
import {matchFilters, type Filter} from './filter.ts'
type BatchedRequest = {
filters: Filter<any>[]
relays: string[]
resolve: (events: Event<any>[]) => void
events: Event<any>[]
}
export class SimplePool {
private _conn: {[url: string]: Relay}
private _seenOn: {[id: string]: Set<string>} = {} // a map of all events we've seen in each relay
private batchedByKey: {[batchKey: string]: BatchedRequest[]} = {}
private eoseSubTimeout: number
private getTimeout: number
private seenOnEnabled: boolean = true
private batchInterval: number = 100
constructor(
options: {
eoseSubTimeout?: number
getTimeout?: number
seenOnEnabled?: boolean
batchInterval?: number
} = {}
) {
this._conn = {}
this.eoseSubTimeout = options.eoseSubTimeout || 3400
this.getTimeout = options.getTimeout || 3400
this.seenOnEnabled = options.seenOnEnabled !== false
this.batchInterval = options.batchInterval || 100
}
close(relays: string[]): void {
@ -81,34 +93,36 @@ export class SimplePool {
for (let cb of eoseListeners.values()) cb()
}, this.eoseSubTimeout)
relays.forEach(async relay => {
let r
try {
r = await this.ensureRelay(relay)
} catch (err) {
handleEose()
return
}
if (!r) return
let s = r.sub(filters, modifiedOpts)
s.on('event', event => {
_knownIds.add(event.id as string)
for (let cb of eventListeners.values()) cb(event)
})
s.on('eose', () => {
if (eoseSent) return
handleEose()
})
subs.push(s)
function handleEose() {
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
for (let cb of eoseListeners.values()) cb()
relays
.filter((r, i, a) => a.indexOf(r) == i)
.forEach(async relay => {
let r
try {
r = await this.ensureRelay(relay)
} catch (err) {
handleEose()
return
}
}
})
if (!r) return
let s = r.sub(filters, modifiedOpts)
s.on('event', event => {
_knownIds.add(event.id as string)
for (let cb of eventListeners.values()) cb(event)
})
s.on('eose', () => {
if (eoseSent) return
handleEose()
})
subs.push(s)
function handleEose() {
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
for (let cb of eoseListeners.values()) cb()
}
}
})
let greaterSub: Sub = {
sub(filters, opts) {
@ -176,6 +190,58 @@ export class SimplePool {
})
}
batchedList<K extends number = number>(
batchKey: string,
relays: string[],
filters: Filter<K>[]
): Promise<Event<K>[]> {
return new Promise(resolve => {
if (!this.batchedByKey[batchKey]) {
this.batchedByKey[batchKey] = [
{
filters,
relays,
resolve,
events: []
}
]
setTimeout(() => {
Object.keys(this.batchedByKey).forEach(async batchKey => {
const batchedRequests = this.batchedByKey[batchKey]
const filters = [] as Filter[]
const relays = [] as string[]
batchedRequests.forEach(br => {
filters.push(...br.filters)
relays.push(...br.relays)
})
const sub = this.sub(relays, filters)
sub.on('event', event => {
batchedRequests.forEach(
br => matchFilters(br.filters, event) && br.events.push(event)
)
})
sub.on('eose', () => {
sub.unsub()
batchedRequests.forEach(br => br.resolve(br.events))
})
delete this.batchedByKey[batchKey]
})
}, this.batchInterval)
} else {
this.batchedByKey[batchKey].push({
filters,
relays,
resolve,
events: []
})
}
})
}
publish(relays: string[], event: Event<number>): Promise<void>[] {
return relays.map(async relay => {
let r = await this.ensureRelay(relay)

1320
yarn.lock

File diff suppressed because it is too large Load Diff