use per-subscription alreadyHaveEvent handler instead of per-relay.

now pools are much smarter.
This commit is contained in:
fiatjaf 2023-02-08 12:37:27 -03:00
parent b5c8255b2f
commit 75d7be5a54
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
3 changed files with 113 additions and 37 deletions

View File

@ -12,18 +12,19 @@ const {
let p = pool()
let relays = [
p.ensureRelay('wss://nostr-dev.wellorder.net/'),
p.ensureRelay('wss://relay.nostr.bg/'),
p.ensureRelay('wss://nostr.fmt.wiz.biz/'),
p.ensureRelay('wss://relay.nostr.band/'),
p.ensureRelay('wss://nostr.zebedee.cloud/')
'wss://nostr-dev.wellorder.net/',
'wss://relay.nostr.bg/',
'wss://nostr.fmt.wiz.biz/',
'wss://relay.nostr.band/',
'wss://nostr.zebedee.cloud/'
]
beforeAll(async () => {
Promise.all(
relays.map(relay => {
try {
return relay.connect()
let r = p.ensureRelay(relay)
return r.connect()
} catch (err) {
/***/
}
@ -34,7 +35,8 @@ beforeAll(async () => {
afterAll(async () => {
relays.forEach(relay => {
try {
relay.close()
let r = p.ensureRelay(relay)
r.close()
} catch (err) {
/***/
}
@ -45,13 +47,11 @@ test('removing duplicates when querying', async () => {
let priv = generatePrivateKey()
let pub = getPublicKey(priv)
let subs = relays.map(relay =>
relay.sub([
{
authors: [pub]
}
])
)
let subs = p.sub(relays, [
{
authors: [pub]
}
])
let received = []
@ -74,11 +74,46 @@ test('removing duplicates when querying', async () => {
event.id = getEventHash(event)
event.sig = signEvent(event, priv)
relays.forEach(relay => {
relay.publish(event)
})
p.publish(relays, event)
await new Promise(resolve => setTimeout(resolve, 1500))
return expect(received).toHaveLength(1)
expect(received).toHaveLength(1)
})
test('removing duplicates correctly when double querying', async () => {
let priv = generatePrivateKey()
let pub = getPublicKey(priv)
let subs1 = p.sub(relays, [ { authors: [pub] } ])
let subs2 = p.sub(relays, [ { authors: [pub] } ])
let received = []
subs1.forEach(sub =>
sub.on('event', event => {
received.push(event)
})
)
subs2.forEach(sub =>
sub.on('event', event => {
received.push(event)
})
)
let event = {
pubkey: pub,
created_at: Math.round(Date.now() / 1000),
content: 'test2',
kind: 22346,
tags: []
}
event.id = getEventHash(event)
event.sig = signEvent(event, priv)
p.publish(relays, event)
await new Promise(resolve => setTimeout(resolve, 1500))
expect(received).toHaveLength(2)
})

56
pool.ts
View File

@ -2,7 +2,7 @@ import {Relay, relayInit} from './relay'
import {normalizeURL} from './utils'
import {Filter} from './filter'
import {Event} from './event'
import {SubscriptionOptions, Sub} from './relay'
import {SubscriptionOptions, Sub, Pub} from './relay'
export function pool(defaultRelays: string[] = []) {
return new SimplePool(defaultRelays)
@ -10,7 +10,6 @@ export function pool(defaultRelays: string[] = []) {
class SimplePool {
private _conn: {[url: string]: Relay}
private _knownIds: Set<string> = new Set()
constructor(defaultRelays: string[]) {
this._conn = {}
@ -22,17 +21,52 @@ class SimplePool {
const existing = this._conn[nm]
if (existing) return existing
const hasEventId = (id: string): boolean => this._knownIds.has(id)
const relay = relayInit(nm, hasEventId)
const relay = relayInit(nm)
this._conn[nm] = relay
let sub = relay.sub
relay.sub = (filters: Filter[], opts?: SubscriptionOptions): Sub => {
let s = sub(filters, opts)
s.on('event', (event: Event) => this._knownIds.add(event.id as string))
return s
}
return relay
}
sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub[] {
let _knownIds: Set<string> = new Set()
let modifiedOpts = opts || {}
modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id)
return relays.map(relay => {
let r = this._conn[relay]
if (!r) return badSub()
let s = r.sub(filters, modifiedOpts)
s.on('event', (event: Event) => _knownIds.add(event.id as string))
return s
})
}
publish(relays: string[], event: Event): Pub[] {
return relays.map(relay => {
let r = this._conn[relay]
if (!r) return badPub(relay)
let s = r.publish(event)
return s
})
}
}
function badSub(): Sub {
return {
on() {},
off() {},
sub(): Sub {
return badSub()
},
unsub() {}
}
}
function badPub(relay: string): Pub {
return {
on(typ, cb) {
if (typ === 'failed') cb(`relay ${relay} not connected`)
},
off() {}
}
}

View File

@ -2,7 +2,7 @@
import {Event, verifySignature, validateEvent} from './event'
import {Filter, matchFilters} from './filter'
import {getHex64} from './fakejson'
import {getHex64, getSubscriptionId} from './fakejson'
type RelayEvent = 'connect' | 'disconnect' | 'error' | 'notice'
@ -29,13 +29,11 @@ export type Sub = {
export type SubscriptionOptions = {
skipVerification?: boolean
alreadyHaveEvent?: null | ((id: string) => boolean)
id?: string
}
export function relayInit(
url: string,
alreadyHaveEvent: (id: string) => boolean = () => false
): Relay {
export function relayInit(url: string): Relay {
var ws: WebSocket
var resolveClose: () => void
var setOpen: (value: PromiseLike<void> | void) => void
@ -104,8 +102,14 @@ export function relayInit(
}
var json = incomingMessageQueue.shift()
if (!json || alreadyHaveEvent(getHex64(json, 'id'))) {
return
if (!json) return
let subid = getSubscriptionId(json)
if (subid) {
let {alreadyHaveEvent} = openSubs[subid]
if (alreadyHaveEvent && alreadyHaveEvent(getHex64(json, 'id'))) {
return
}
}
try {
@ -173,6 +177,7 @@ export function relayInit(
filters: Filter[],
{
skipVerification = false,
alreadyHaveEvent = null,
id = Math.random().toString().slice(2)
}: SubscriptionOptions = {}
): Sub => {
@ -181,7 +186,8 @@ export function relayInit(
openSubs[subid] = {
id: subid,
filters,
skipVerification
skipVerification,
alreadyHaveEvent
}
trySend(['REQ', subid, ...filters])
@ -189,6 +195,7 @@ export function relayInit(
sub: (newFilters, newOpts = {}) =>
sub(newFilters || filters, {
skipVerification: newOpts.skipVerification || skipVerification,
alreadyHaveEvent: newOpts.alreadyHaveEvent || alreadyHaveEvent,
id: subid
}),
unsub: () => {