pool.subscribeManyMap()

This commit is contained in:
fiatjaf
2024-04-12 21:50:26 -03:00
parent e858698cb9
commit 0a5eaac088
4 changed files with 100 additions and 9 deletions

View File

@@ -48,6 +48,10 @@ export class AbstractSimplePool {
}
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
return this.subscribeManyMap(Object.fromEntries(relays.map(url => [url, filters])), params)
}
subscribeManyMap(requests: { [relay: string]: Filter[] }, params: SubscribeManyParams): SubCloser {
if (this.trackRelays) {
params.receivedEvent = (relay: AbstractRelay, id: string) => {
let set = this.seenOn.get(id)
@@ -61,12 +65,13 @@ export class AbstractSimplePool {
const _knownIds = new Set<string>()
const subs: Subscription[] = []
const relaysLength = Object.keys(requests).length
// batch all EOSEs into a single
const eosesReceived: boolean[] = []
let handleEose = (i: number) => {
eosesReceived[i] = true
if (eosesReceived.filter(a => a).length === relays.length) {
if (eosesReceived.filter(a => a).length === relaysLength) {
params.oneose?.()
handleEose = () => {}
}
@@ -76,7 +81,7 @@ export class AbstractSimplePool {
let handleClose = (i: number, reason: string) => {
handleEose(i)
closesReceived[i] = reason
if (closesReceived.filter(a => a).length === relays.length) {
if (closesReceived.filter(a => a).length === relaysLength) {
params.onclose?.(closesReceived)
handleClose = () => {}
}
@@ -93,13 +98,16 @@ export class AbstractSimplePool {
// open a subscription in all given relays
const allOpened = Promise.all(
relays.map(normalizeURL).map(async (url, i, arr) => {
if (arr.indexOf(url) !== i) {
Object.entries(requests).map(async (req, i, arr) => {
if (arr.indexOf(req) !== i) {
// duplicate
handleClose(i, 'duplicate url')
return
}
let [url, filters] = req
url = normalizeURL(url)
let relay: AbstractRelay
try {
relay = await this.ensureRelay(url, {

View File

@@ -41,7 +41,9 @@ export function matchFilter(filter: Filter, event: Event): boolean {
export function matchFilters(filters: Filter[], event: Event): boolean {
for (let i = 0; i < filters.length; i++) {
if (matchFilter(filters[i], event)) return true
if (matchFilter(filters[i], event)) {
return true
}
}
return false
}

View File

@@ -4,6 +4,7 @@ import { SimplePool } from './pool.ts'
import { finalizeEvent, generateSecretKey, getPublicKey, type Event } from './pure.ts'
import { useWebSocketImplementation } from './relay.ts'
import { MockRelay, MockWebSocketClient } from './test-helpers.ts'
import { hexToBytes } from '@noble/hashes/utils'
useWebSocketImplementation(MockWebSocketClient)
@@ -84,6 +85,86 @@ test('same with double subs', async () => {
expect(received).toHaveLength(2)
})
test('subscribe many map', async () => {
let priv = hexToBytes('8ea002840d413ccdd5be98df5dd89d799eaa566355ede83ca0bbdbb4b145e0d3')
let pub = getPublicKey(priv)
let received: Event[] = []
let event1 = finalizeEvent(
{
created_at: Math.round(Date.now() / 1000),
content: 'test1',
kind: 20001,
tags: [],
},
priv,
)
let event2 = finalizeEvent(
{
created_at: Math.round(Date.now() / 1000),
content: 'test2',
kind: 20002,
tags: [['t', 'biloba']],
},
priv,
)
let event3 = finalizeEvent(
{
created_at: Math.round(Date.now() / 1000),
content: 'test3',
kind: 20003,
tags: [['t', 'biloba']],
},
priv,
)
const [relayA, relayB, relayC] = relayURLs
pool.subscribeManyMap(
{
[relayA]: [{ authors: [pub], kinds: [20001] }],
[relayB]: [{ authors: [pub], kinds: [20002] }],
[relayC]: [{ kinds: [20003], '#t': ['biloba'] }],
},
{
onevent(event: Event) {
received.push(event)
},
},
)
// publish the first
await Promise.all(pool.publish([relayA, relayB], event1))
await new Promise(resolve => setTimeout(resolve, 100))
expect(received).toHaveLength(1)
expect(received[0]).toEqual(event1)
// publish the second
await pool.publish([relayB], event2)[0]
await new Promise(resolve => setTimeout(resolve, 100))
expect(received).toHaveLength(2)
expect(received[1]).toEqual(event2)
// publish a events that shouldn't match our filters
await Promise.all([
...pool.publish([relayA, relayB], event3),
...pool.publish([relayA, relayB, relayC], event1),
pool.publish([relayA, relayB, relayC], event2),
])
await new Promise(resolve => setTimeout(resolve, 100))
expect(received).toHaveLength(2)
// publsih the third
await pool.publish([relayC], event3)[0]
await new Promise(resolve => setTimeout(resolve, 100))
expect(received).toHaveLength(3)
expect(received[2]).toEqual(event3)
})
test('query a bunch of events and cancel on eose', async () => {
let events = new Set<string>()

View File

@@ -35,9 +35,9 @@ export class MockRelay {
finalizeEvent(
{
kind: 1,
content: '',
content: 'autogenerated by relay',
created_at: Math.floor(Date.now() / 1000),
tags: [],
tags: [['t', 'auto']],
},
sk,
),
@@ -68,9 +68,9 @@ export class MockRelay {
const event = finalizeEvent(
{
kind,
content: '',
content: 'kind-aware autogenerated by relay',
created_at: Math.floor(Date.now() / 1000),
tags: [],
tags: [['t', 'auto']],
},
sk,
)