pool tests and pool.ts tweaks.

This commit is contained in:
fiatjaf
2023-12-17 22:19:28 -03:00
parent a0cb2eecae
commit f56f2ae709
2 changed files with 76 additions and 93 deletions

View File

@@ -6,31 +6,21 @@ import { SimplePool } from './pool.ts'
let pool = new SimplePool() let pool = new SimplePool()
let relays = [ let relays = ['wss://relay.damus.io/', 'wss://relay.nostr.bg/', 'wss://nos.lol', 'wss://public.relaying.io']
'wss://relay.damus.io/',
'wss://relay.nostr.bg/',
'wss://nostr.fmt.wiz.biz/',
'wss://relay.nostr.band/',
'wss://nos.lol/',
]
afterAll(() => {
pool.close([...relays, 'wss://nostr.wine', 'wss://offchain.pub', 'wss://eden.nostr.land'])
})
test('removing duplicates when querying', async () => { test('removing duplicates when querying', async () => {
let priv = generatePrivateKey() let priv = generatePrivateKey()
let pub = getPublicKey(priv) let pub = getPublicKey(priv)
let sub = pool.sub(relays, [{ authors: [pub] }]) pool.subscribeMany(relays, [{ authors: [pub] }], {
let received: Event[] = [] onevent(event: Event) {
// this should be called only once even though we're listening
sub.on('event', event => { // to multiple relays because the events will be catched and
// this should be called only once even though we're listening // deduplicated efficiently (without even being parsed)
// to multiple relays because the events will be catched and received.push(event)
// deduplicated efficiently (without even being parsed) },
received.push(event)
}) })
let received: Event[] = []
let event = finishEvent( let event = finishEvent(
{ {
@@ -42,30 +32,31 @@ test('removing duplicates when querying', async () => {
priv, priv,
) )
pool.publish(relays, event) await Promise.any(pool.publish(relays, event))
await new Promise(resolve => setTimeout(resolve, 1500)) await new Promise(resolve => setTimeout(resolve, 1500))
expect(received).toHaveLength(1) expect(received).toHaveLength(1)
expect(received[0]).toEqual(event)
}) })
test('same with double querying', async () => { test('same with double querying', async () => {
let priv = generatePrivateKey() let priv = generatePrivateKey()
let pub = getPublicKey(priv) let pub = getPublicKey(priv)
let sub1 = pool.sub(relays, [{ authors: [pub] }]) pool.subscribeMany(relays, [{ authors: [pub] }], {
let sub2 = pool.sub(relays, [{ authors: [pub] }]) onevent(event) {
received.push(event)
},
})
pool.subscribeMany(relays, [{ authors: [pub] }], {
onevent(event) {
received.push(event)
},
})
let received: Event[] = [] let received: Event[] = []
sub1.on('event', event => {
received.push(event)
})
sub2.on('event', event => {
received.push(event)
})
let event = finishEvent( let event = finishEvent(
{ {
created_at: Math.round(Date.now() / 1000), created_at: Math.round(Date.now() / 1000),
@@ -76,51 +67,30 @@ test('same with double querying', async () => {
priv, priv,
) )
pool.publish(relays, event) await Promise.any(pool.publish(relays, event))
await new Promise(resolve => setTimeout(resolve, 1500)) await new Promise(resolve => setTimeout(resolve, 1500))
expect(received).toHaveLength(2) expect(received).toHaveLength(2)
}) })
test('get()', async () => {
let event = await pool.get(relays, {
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
})
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027')
})
test('list()', async () => { test('list()', async () => {
let events = await pool.list( let events = await pool.querySync([...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'], {
[...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'], authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'],
[ kinds: [1],
{ limit: 2,
authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'], })
kinds: [1],
limit: 2,
},
],
)
// the actual received number will be greater than 2, but there will be no duplicates // the actual received number will be greater than 2, but there will be no duplicates
expect(events.length).toBeGreaterThan(2)
const uniqueEventCount = new Set(events.map(evt => evt.id)).size const uniqueEventCount = new Set(events.map(evt => evt.id)).size
expect(events.length).toEqual(uniqueEventCount) expect(events).toHaveLength(uniqueEventCount)
let relaysForAllEvents = events.map(event => pool.seenOn(event.id)).reduce((acc, n) => acc.concat(n), [])
expect(relaysForAllEvents.length).toBeGreaterThanOrEqual(events.length)
}) })
test('seenOnEnabled: false', async () => { test('get()', async () => {
const poolWithoutSeenOn = new SimplePool({ seenOnEnabled: false }) let event = await pool.get(relays, {
ids: ['9fa1c618fcaad6357e074417b07ed132b083ed30e13113ebb10fcda7137442fe'],
const event = await poolWithoutSeenOn.get(relays, {
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
}) })
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027') expect(event).not.toBeNull()
expect(event).toHaveProperty('id', '9fa1c618fcaad6357e074417b07ed132b083ed30e13113ebb10fcda7137442fe')
const relaysForEvent = poolWithoutSeenOn.seenOn(event!.id)
expect(relaysForEvent).toHaveLength(0)
}) })

71
pool.ts
View File

@@ -4,9 +4,10 @@ import { normalizeURL } from './utils.ts'
import type { Event } from './event.ts' import type { Event } from './event.ts'
import { type Filter } from './filter.ts' import { type Filter } from './filter.ts'
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose'> & { export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
eoseSubTimeout: number eoseSubTimeout?: number
onclose?: (reasons: string[]) => void onclose?: (reasons: string[]) => void
id?: string
} }
export class SimplePool { export class SimplePool {
@@ -43,15 +44,6 @@ export class SimplePool {
} }
const _knownIds = new Set<string>() const _knownIds = new Set<string>()
params.alreadyHaveEvent = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
const subs: Subscription[] = [] const subs: Subscription[] = []
// batch all EOSEs into a single // batch all EOSEs into a single
@@ -78,12 +70,21 @@ export class SimplePool {
} }
} }
const localAlreadyHaveEventHandler = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
// open a subscription in all given relays // open a subscription in all given relays
await Promise.all( await Promise.all(
relays.map(normalizeURL).map(async (url, i) => { relays.map(normalizeURL).map(async (url, i, arr) => {
if (relays.indexOf(url) !== i) { if (arr.indexOf(url) !== i) {
// duplicate // duplicate
handleClose(i, 'duplicate') handleClose(i, 'duplicate url')
return return
} }
@@ -99,6 +100,7 @@ export class SimplePool {
...params, ...params,
oneose: handleEose, oneose: handleEose,
onclose: reason => handleClose(i, reason), onclose: reason => handleClose(i, reason),
alreadyHaveEvent: localAlreadyHaveEventHandler,
}) })
subs.push(subscription) subs.push(subscription)
@@ -122,38 +124,49 @@ export class SimplePool {
const sub = await this.subscribeMany(relays, filters, { const sub = await this.subscribeMany(relays, filters, {
...params, ...params,
oneose() { oneose() {
sub.close() setTimeout(() => {
sub.close()
}, 0)
}, },
}) })
return sub return sub
} }
get( async querySync(
relays: string[], relays: string[],
filter: Filter, filter: Filter,
params: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>, params?: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>,
): Promise<Event | null> { ): Promise<Event[]> {
return new Promise(async (resolve, reject) => { return new Promise(async resolve => {
const sub = await this.subscribeManyEose(relays, [filter], { const events: Event[] = []
await this.subscribeManyEose(relays, [filter], {
...params, ...params,
onevent(event: Event) { onevent(event: Event) {
resolve(event) events.push(event)
sub.close()
}, },
onclose(reasons: string[]) { onclose(_: string[]) {
const err = new Error('subscriptions closed') resolve(events)
err.cause = reasons
reject(err)
}, },
}) })
}) })
} }
async get(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'eoseSubTimeout'>,
): Promise<Event | null> {
filter.limit = 1
const events = await this.querySync(relays, filter, params)
events.sort((a, b) => b.created_at - a.created_at)
return events[0] || null
}
publish(relays: string[], event: Event): Promise<string>[] { publish(relays: string[], event: Event): Promise<string>[] {
return relays.map(normalizeURL).map(async (url, i) => { return relays.map(normalizeURL).map(async (url, i, arr) => {
if (relays.indexOf(url) !== i) { if (arr.indexOf(url) !== i) {
// duplicate // duplicate
return Promise.reject('duplicate') return Promise.reject('duplicate url')
} }
let r = await this.ensureRelay(url) let r = await this.ensureRelay(url)