From 0a5eaac088bb2bc08e5ea34bf0e28582f6291f9b Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Fri, 12 Apr 2024 21:50:26 -0300 Subject: [PATCH] pool.subscribeManyMap() --- abstract-pool.ts | 16 +++++++--- filter.ts | 4 ++- pool.test.ts | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ test-helpers.ts | 8 ++--- 4 files changed, 100 insertions(+), 9 deletions(-) diff --git a/abstract-pool.ts b/abstract-pool.ts index 2021c98..730147e 100644 --- a/abstract-pool.ts +++ b/abstract-pool.ts @@ -48,6 +48,10 @@ export class AbstractSimplePool { } subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser { + return this.subscribeManyMap(Object.fromEntries(relays.map(url => [url, filters])), params) + } + + subscribeManyMap(requests: { [relay: string]: Filter[] }, params: SubscribeManyParams): SubCloser { if (this.trackRelays) { params.receivedEvent = (relay: AbstractRelay, id: string) => { let set = this.seenOn.get(id) @@ -61,12 +65,13 @@ export class AbstractSimplePool { const _knownIds = new Set() const subs: Subscription[] = [] + const relaysLength = Object.keys(requests).length // batch all EOSEs into a single const eosesReceived: boolean[] = [] let handleEose = (i: number) => { eosesReceived[i] = true - if (eosesReceived.filter(a => a).length === relays.length) { + if (eosesReceived.filter(a => a).length === relaysLength) { params.oneose?.() handleEose = () => {} } @@ -76,7 +81,7 @@ export class AbstractSimplePool { let handleClose = (i: number, reason: string) => { handleEose(i) closesReceived[i] = reason - if (closesReceived.filter(a => a).length === relays.length) { + if (closesReceived.filter(a => a).length === relaysLength) { params.onclose?.(closesReceived) handleClose = () => {} } @@ -93,13 +98,16 @@ export class AbstractSimplePool { // open a subscription in all given relays const allOpened = Promise.all( - relays.map(normalizeURL).map(async (url, i, arr) => { - if (arr.indexOf(url) !== i) { + Object.entries(requests).map(async (req, i, arr) => { + if (arr.indexOf(req) !== i) { // duplicate handleClose(i, 'duplicate url') return } + let [url, filters] = req + url = normalizeURL(url) + let relay: AbstractRelay try { relay = await this.ensureRelay(url, { diff --git a/filter.ts b/filter.ts index a6e2f82..424e2c2 100644 --- a/filter.ts +++ b/filter.ts @@ -41,7 +41,9 @@ export function matchFilter(filter: Filter, event: Event): boolean { export function matchFilters(filters: Filter[], event: Event): boolean { for (let i = 0; i < filters.length; i++) { - if (matchFilter(filters[i], event)) return true + if (matchFilter(filters[i], event)) { + return true + } } return false } diff --git a/pool.test.ts b/pool.test.ts index 09fd0d1..7b594f7 100644 --- a/pool.test.ts +++ b/pool.test.ts @@ -4,6 +4,7 @@ import { SimplePool } from './pool.ts' import { finalizeEvent, generateSecretKey, getPublicKey, type Event } from './pure.ts' import { useWebSocketImplementation } from './relay.ts' import { MockRelay, MockWebSocketClient } from './test-helpers.ts' +import { hexToBytes } from '@noble/hashes/utils' useWebSocketImplementation(MockWebSocketClient) @@ -84,6 +85,86 @@ test('same with double subs', async () => { expect(received).toHaveLength(2) }) +test('subscribe many map', async () => { + let priv = hexToBytes('8ea002840d413ccdd5be98df5dd89d799eaa566355ede83ca0bbdbb4b145e0d3') + let pub = getPublicKey(priv) + + let received: Event[] = [] + let event1 = finalizeEvent( + { + created_at: Math.round(Date.now() / 1000), + content: 'test1', + kind: 20001, + tags: [], + }, + priv, + ) + let event2 = finalizeEvent( + { + created_at: Math.round(Date.now() / 1000), + content: 'test2', + kind: 20002, + tags: [['t', 'biloba']], + }, + priv, + ) + let event3 = finalizeEvent( + { + created_at: Math.round(Date.now() / 1000), + content: 'test3', + kind: 20003, + tags: [['t', 'biloba']], + }, + priv, + ) + + const [relayA, relayB, relayC] = relayURLs + + pool.subscribeManyMap( + { + [relayA]: [{ authors: [pub], kinds: [20001] }], + [relayB]: [{ authors: [pub], kinds: [20002] }], + [relayC]: [{ kinds: [20003], '#t': ['biloba'] }], + }, + { + onevent(event: Event) { + received.push(event) + }, + }, + ) + + // publish the first + await Promise.all(pool.publish([relayA, relayB], event1)) + await new Promise(resolve => setTimeout(resolve, 100)) + + expect(received).toHaveLength(1) + expect(received[0]).toEqual(event1) + + // publish the second + await pool.publish([relayB], event2)[0] + await new Promise(resolve => setTimeout(resolve, 100)) + + expect(received).toHaveLength(2) + expect(received[1]).toEqual(event2) + + // publish a events that shouldn't match our filters + await Promise.all([ + ...pool.publish([relayA, relayB], event3), + ...pool.publish([relayA, relayB, relayC], event1), + pool.publish([relayA, relayB, relayC], event2), + ]) + await new Promise(resolve => setTimeout(resolve, 100)) + + expect(received).toHaveLength(2) + + // publsih the third + await pool.publish([relayC], event3)[0] + await new Promise(resolve => setTimeout(resolve, 100)) + + expect(received).toHaveLength(3) + expect(received[2]).toEqual(event3) +}) + test('query a bunch of events and cancel on eose', async () => { let events = new Set() diff --git a/test-helpers.ts b/test-helpers.ts index 2f3bca4..9c7267e 100644 --- a/test-helpers.ts +++ b/test-helpers.ts @@ -35,9 +35,9 @@ export class MockRelay { finalizeEvent( { kind: 1, - content: '', + content: 'autogenerated by relay', created_at: Math.floor(Date.now() / 1000), - tags: [], + tags: [['t', 'auto']], }, sk, ), @@ -68,9 +68,9 @@ export class MockRelay { const event = finalizeEvent( { kind, - content: '', + content: 'kind-aware autogenerated by relay', created_at: Math.floor(Date.now() / 1000), - tags: [], + tags: [['t', 'auto']], }, sk, )