subscribeMap() now sends multiple filters to the same relay in the same REQ.
because the initiative to get rid of multiple filters went down.
This commit is contained in:
parent
23aebbd341
commit
c9ff51e278
|
@ -78,14 +78,14 @@ export class AbstractSimplePool {
|
||||||
for (let i = 0; i < relays.length; i++) {
|
for (let i = 0; i < relays.length; i++) {
|
||||||
const url = normalizeURL(relays[i])
|
const url = normalizeURL(relays[i])
|
||||||
if (!request.find(r => r.url === url)) {
|
if (!request.find(r => r.url === url)) {
|
||||||
request.push({ url, filter })
|
request.push({ url, filter: filter })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.subscribeMap(request, params)
|
return this.subscribeMap(request, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
|
subscribeMany(relays: string[], filter: Filter, params: SubscribeManyParams): SubCloser {
|
||||||
params.onauth = params.onauth || params.doauth
|
params.onauth = params.onauth || params.doauth
|
||||||
|
|
||||||
const request: { url: string; filter: Filter }[] = []
|
const request: { url: string; filter: Filter }[] = []
|
||||||
|
@ -93,9 +93,8 @@ export class AbstractSimplePool {
|
||||||
for (let i = 0; i < relays.length; i++) {
|
for (let i = 0; i < relays.length; i++) {
|
||||||
const url = normalizeURL(relays[i])
|
const url = normalizeURL(relays[i])
|
||||||
if (uniqUrls.indexOf(url) === -1) {
|
if (uniqUrls.indexOf(url) === -1) {
|
||||||
for (let f = 0; f < filters.length; f++) {
|
uniqUrls.push(url)
|
||||||
request.push({ url, filter: filters[f] })
|
request.push({ url, filter: filter })
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,6 +104,14 @@ export class AbstractSimplePool {
|
||||||
subscribeMap(requests: { url: string; filter: Filter }[], params: SubscribeManyParams): SubCloser {
|
subscribeMap(requests: { url: string; filter: Filter }[], params: SubscribeManyParams): SubCloser {
|
||||||
params.onauth = params.onauth || params.doauth
|
params.onauth = params.onauth || params.doauth
|
||||||
|
|
||||||
|
const grouped = new Map<string, Filter[]>()
|
||||||
|
for (const req of requests) {
|
||||||
|
const { url, filter } = req
|
||||||
|
if (!grouped.has(url)) grouped.set(url, [])
|
||||||
|
grouped.get(url)!.push(filter)
|
||||||
|
}
|
||||||
|
const groupedRequests = Array.from(grouped.entries()).map(([url, filters]) => ({ url, filters }))
|
||||||
|
|
||||||
if (this.trackRelays) {
|
if (this.trackRelays) {
|
||||||
params.receivedEvent = (relay: AbstractRelay, id: string) => {
|
params.receivedEvent = (relay: AbstractRelay, id: string) => {
|
||||||
let set = this.seenOn.get(id)
|
let set = this.seenOn.get(id)
|
||||||
|
@ -152,7 +159,7 @@ export class AbstractSimplePool {
|
||||||
|
|
||||||
// open a subscription in all given relays
|
// open a subscription in all given relays
|
||||||
const allOpened = Promise.all(
|
const allOpened = Promise.all(
|
||||||
requests.map(async ({ url, filter }, i) => {
|
groupedRequests.map(async ({ url, filters }, i) => {
|
||||||
let relay: AbstractRelay
|
let relay: AbstractRelay
|
||||||
try {
|
try {
|
||||||
relay = await this.ensureRelay(url, {
|
relay = await this.ensureRelay(url, {
|
||||||
|
@ -163,7 +170,7 @@ export class AbstractSimplePool {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
let subscription = relay.subscribe([filter], {
|
let subscription = relay.subscribe(filters, {
|
||||||
...params,
|
...params,
|
||||||
oneose: () => handleEose(i),
|
oneose: () => handleEose(i),
|
||||||
onclose: reason => {
|
onclose: reason => {
|
||||||
|
@ -171,7 +178,7 @@ export class AbstractSimplePool {
|
||||||
relay
|
relay
|
||||||
.auth(params.onauth)
|
.auth(params.onauth)
|
||||||
.then(() => {
|
.then(() => {
|
||||||
relay.subscribe([filter], {
|
relay.subscribe(filters, {
|
||||||
...params,
|
...params,
|
||||||
oneose: () => handleEose(i),
|
oneose: () => handleEose(i),
|
||||||
onclose: reason => {
|
onclose: reason => {
|
||||||
|
@ -224,12 +231,12 @@ export class AbstractSimplePool {
|
||||||
|
|
||||||
subscribeManyEose(
|
subscribeManyEose(
|
||||||
relays: string[],
|
relays: string[],
|
||||||
filters: Filter[],
|
filter: Filter,
|
||||||
params: Pick<SubscribeManyParams, 'label' | 'id' | 'onevent' | 'onclose' | 'maxWait' | 'onauth' | 'doauth'>,
|
params: Pick<SubscribeManyParams, 'label' | 'id' | 'onevent' | 'onclose' | 'maxWait' | 'onauth' | 'doauth'>,
|
||||||
): SubCloser {
|
): SubCloser {
|
||||||
params.onauth = params.onauth || params.doauth
|
params.onauth = params.onauth || params.doauth
|
||||||
|
|
||||||
const subcloser = this.subscribeMany(relays, filters, {
|
const subcloser = this.subscribeMany(relays, filter, {
|
||||||
...params,
|
...params,
|
||||||
oneose() {
|
oneose() {
|
||||||
subcloser.close('closed automatically on eose')
|
subcloser.close('closed automatically on eose')
|
||||||
|
|
24
pool.test.ts
24
pool.test.ts
|
@ -35,14 +35,18 @@ test('removing duplicates when subscribing', async () => {
|
||||||
priv,
|
priv,
|
||||||
)
|
)
|
||||||
|
|
||||||
pool.subscribeMany(relayURLs, [{ authors: [pub] }], {
|
pool.subscribeMany(
|
||||||
onevent(event: Event) {
|
relayURLs,
|
||||||
// this should be called only once even though we're listening
|
{ authors: [pub] },
|
||||||
// to multiple relays because the events will be caught and
|
{
|
||||||
// deduplicated efficiently (without even being parsed)
|
onevent(event: Event) {
|
||||||
received.push(event)
|
// this should be called only once even though we're listening
|
||||||
|
// to multiple relays because the events will be caught and
|
||||||
|
// deduplicated efficiently (without even being parsed)
|
||||||
|
received.push(event)
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
await Promise.any(pool.publish(relayURLs, event))
|
await Promise.any(pool.publish(relayURLs, event))
|
||||||
await new Promise(resolve => setTimeout(resolve, 200)) // wait for the new published event to be received
|
await new Promise(resolve => setTimeout(resolve, 200)) // wait for the new published event to be received
|
||||||
|
@ -55,12 +59,12 @@ test('same with double subs', async () => {
|
||||||
let priv = generateSecretKey()
|
let priv = generateSecretKey()
|
||||||
let pub = getPublicKey(priv)
|
let pub = getPublicKey(priv)
|
||||||
|
|
||||||
pool.subscribeMany(relayURLs, [{ authors: [pub] }], {
|
pool.subscribeMany(relayURLs, { authors: [pub] }, {
|
||||||
onevent(event) {
|
onevent(event) {
|
||||||
received.push(event)
|
received.push(event)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
pool.subscribeMany(relayURLs, [{ authors: [pub] }], {
|
pool.subscribeMany(relayURLs, { authors: [pub] }, {
|
||||||
onevent(event) {
|
onevent(event) {
|
||||||
received.push(event)
|
received.push(event)
|
||||||
},
|
},
|
||||||
|
@ -168,7 +172,7 @@ test('query a bunch of events and cancel on eose', async () => {
|
||||||
let events = new Set<string>()
|
let events = new Set<string>()
|
||||||
|
|
||||||
await new Promise<void>(resolve => {
|
await new Promise<void>(resolve => {
|
||||||
pool.subscribeManyEose(relayURLs, [{ kinds: [0, 1, 2, 3, 4, 5, 6], limit: 40 }], {
|
pool.subscribeManyEose(relayURLs, { kinds: [0, 1, 2, 3, 4, 5, 6], limit: 40 }, {
|
||||||
onevent(event) {
|
onevent(event) {
|
||||||
events.add(event.id)
|
events.add(event.id)
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue