improvements and fixes on pool.

This commit is contained in:
fiatjaf 2023-02-09 11:58:39 -03:00
parent 7ff97b5488
commit ff3bf4a51c
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
6 changed files with 182 additions and 123 deletions

View File

@ -134,12 +134,7 @@ const pool = new SimplePool()
let relays = ['wss://relay.example.com', 'wss://relay.example2.com']
relays.forEach(async url => {
let relay = pool.ensureRelay(url)
await relay.connect()
})
let relay = pool.ensureRelay('wss://relay.example3.com')
let relay = await pool.ensureRelay('wss://relay.example3.com')
let subs = pool.sub([...relays, relay], {
authors: ['32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245']

View File

@ -1,6 +1,6 @@
{
"name": "nostr-tools",
"version": "1.3.0",
"version": "1.3.1",
"description": "Tools for making a Nostr client.",
"repository": {
"type": "git",

View File

@ -19,50 +19,23 @@ let relays = [
'wss://nostr.zebedee.cloud/'
]
beforeAll(async () => {
Promise.all(
relays.map(relay => {
try {
let r = pool.ensureRelay(relay)
return r.connect()
} catch (err) {
/***/
}
})
)
})
afterAll(async () => {
relays.forEach(relay => {
try {
let r = pool.ensureRelay(relay)
r.close()
} catch (err) {
/***/
}
})
await pool.close([...relays, 'wss://nostr-relay.untethr.me'])
})
test('removing duplicates when querying', async () => {
let priv = generatePrivateKey()
let pub = getPublicKey(priv)
let subs = pool.sub(relays, [
{
authors: [pub]
}
])
let sub = pool.sub(relays, [{authors: [pub]}])
let received = []
subs.forEach(sub =>
sub.on('event', event => {
// this should be called only once even though we're listening
// to multiple relays because the events will be catched and
// deduplicated efficiently (without even being parsed)
received.push(event)
})
)
sub.on('event', event => {
// this should be called only once even though we're listening
// to multiple relays because the events will be catched and
// deduplicated efficiently (without even being parsed)
received.push(event)
})
let event = {
pubkey: pub,
@ -81,25 +54,22 @@ test('removing duplicates when querying', async () => {
expect(received).toHaveLength(1)
})
test('removing duplicates correctly when double querying', async () => {
test('same with double querying', async () => {
let priv = generatePrivateKey()
let pub = getPublicKey(priv)
let subs1 = pool.sub(relays, [{authors: [pub]}])
let subs2 = pool.sub(relays, [{authors: [pub]}])
let sub1 = pool.sub(relays, [{authors: [pub]}])
let sub2 = pool.sub(relays, [{authors: [pub]}])
let received = []
subs1.forEach(sub =>
sub.on('event', event => {
received.push(event)
})
)
subs2.forEach(sub =>
sub.on('event', event => {
received.push(event)
})
)
sub1.on('event', event => {
received.push(event)
})
sub2.on('event', event => {
received.push(event)
})
let event = {
pubkey: pub,
@ -117,3 +87,37 @@ test('removing duplicates correctly when double querying', async () => {
expect(received).toHaveLength(2)
})
test('get()', async () => {
let event = await pool.get(relays, {
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027']
})
expect(event).toHaveProperty(
'id',
'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'
)
})
test('list()', async () => {
let events = await pool.list(
[...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'],
[
{
authors: [
'3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'
],
kinds: [1],
limit: 2
}
]
)
// the actual received number will be greater than 2, but there will be no duplicates
expect(events.length).toEqual(
events
.map(evt => evt.id)
.reduce((acc, n) => (acc.indexOf(n) !== -1 ? acc : [...acc, n]), [])
.length
)
})

139
pool.ts
View File

@ -12,7 +12,16 @@ export class SimplePool {
defaultRelays.forEach(this.ensureRelay)
}
ensureRelay(url: string): Relay {
async close(relays: string[]): Promise<void> {
await Promise.all(
relays.map(async url => {
let relay = this._conn[normalizeURL(url)]
if (relay) await relay.close()
})
)
}
async ensureRelay(url: string): Promise<Relay> {
const nm = normalizeURL(url)
const existing = this._conn[nm]
if (existing) return existing
@ -20,21 +29,74 @@ export class SimplePool {
const relay = relayInit(nm)
this._conn[nm] = relay
await relay.connect()
return relay
}
sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub[] {
sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub {
let _knownIds: Set<string> = new Set()
let modifiedOpts = opts || {}
modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id)
return relays.map(relay => {
let r = this._conn[relay]
if (!r) return badSub()
let subs: Sub[] = []
let eventListeners: Set<(event: Event) => void> = new Set()
let eoseListeners: Set<() => void> = new Set()
let eosesMissing = relays.length
let eoseSent = false
let eoseTimeout = setTimeout(() => {
eoseSent = true
for (let cb of eoseListeners.values()) {
cb()
}
}, 2400)
relays.forEach(async relay => {
let r = await this.ensureRelay(relay)
if (!r) return
let s = r.sub(filters, modifiedOpts)
s.on('event', (event: Event) => _knownIds.add(event.id as string))
return s
s.on('event', (event: Event) => {
_knownIds.add(event.id as string)
for (let cb of eventListeners.values()) {
cb(event)
}
})
s.on('eose', () => {
if (eoseSent) return
eosesMissing--
if (eosesMissing === 0) {
clearTimeout(eoseTimeout)
for (let cb of eoseListeners.values()) {
cb()
}
}
})
subs.push(s)
})
let greaterSub: Sub = {
sub(filters, opts) {
subs.forEach(sub => sub.sub(filters, opts))
return greaterSub
},
unsub() {
subs.forEach(sub => sub.unsub())
},
on(type, cb) {
if (type === 'event') {
eventListeners.add(cb)
} else if (type === 'eose') eoseListeners.add(cb)
},
off(type, cb) {
if (type === 'event') {
eventListeners.delete(cb)
} else if (type === 'eose') eoseListeners.delete(cb)
}
}
return greaterSub
}
get(
@ -43,19 +105,15 @@ export class SimplePool {
opts?: SubscriptionOptions
): Promise<Event | null> {
return new Promise(resolve => {
let subs = this.sub(relays, [filter], opts)
let sub = this.sub(relays, [filter], opts)
let timeout = setTimeout(() => {
subs.forEach(sub => sub.unsub(), 1500)
sub.unsub()
resolve(null)
})
subs.forEach(sub => {
sub.on('event', (event: Event) => {
resolve(event)
clearTimeout(timeout)
subs.forEach(sub => {
sub.unsub()
})
})
}, 1500)
sub.on('event', (event: Event) => {
resolve(event)
clearTimeout(timeout)
sub.unsub()
})
})
}
@ -66,42 +124,24 @@ export class SimplePool {
opts?: SubscriptionOptions
): Promise<Event[]> {
return new Promise(resolve => {
let _knownIds: Set<string> = new Set()
let modifiedOpts = opts || {}
modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id)
let events: Event[] = []
let sub = this.sub(relays, filters, opts)
let subs = this.sub(relays, filters, modifiedOpts)
let timeout = setTimeout(() => {
subs.forEach(sub => sub.unsub(), 1500)
resolve(events)
sub.on('event', (event: Event) => {
events.push(event)
})
let pendingEoses = relays.length
subs.forEach(sub => {
sub.on('event', (event: Event) => {
events.push(event)
})
sub.on('eose', () => {
pendingEoses--
if (pendingEoses === 0) {
resolve(events)
clearTimeout(timeout)
subs.forEach(sub => {
sub.unsub()
})
}
})
// we can rely on an eose being emitted here because pool.sub() will fake one
sub.on('eose', () => {
sub.unsub()
resolve(events)
})
})
}
publish(relays: string[], event: Event): Pub[] {
return relays.map(relay => {
let r = this._conn[relay]
let r = this._conn[normalizeURL(relay)]
if (!r) return badPub(relay)
let s = r.publish(event)
return s
@ -109,17 +149,6 @@ export class SimplePool {
}
}
function badSub(): Sub {
return {
on() {},
off() {},
sub(): Sub {
return badSub()
},
unsub() {}
}
}
function badPub(relay: string): Pub {
return {
on(typ, cb) {

View File

@ -32,7 +32,7 @@ test('connectivity', () => {
).resolves.toBe(true)
})
test('querying', () => {
test('querying', async () => {
var resolve1
var resolve2
@ -52,16 +52,42 @@ test('querying', () => {
resolve2(true)
})
return expect(
Promise.all([
new Promise(resolve => {
resolve1 = resolve
}),
new Promise(resolve => {
resolve2 = resolve
})
])
).resolves.toEqual([true, true])
let [t1, t2] = await Promise.all([
new Promise(resolve => {
resolve1 = resolve
}),
new Promise(resolve => {
resolve2 = resolve
})
])
expect(t1).toEqual(true)
expect(t2).toEqual(true)
})
test('get()', async () => {
let event = await relay.get({
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027']
})
expect(event).toHaveProperty(
'id',
'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'
)
})
test('list()', async () => {
let events = await relay.list([
{
authors: [
'3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'
],
kinds: [1],
limit: 2
}
])
expect(events.length).toEqual(2)
})
test('listening (twice) and publishing', async () => {

View File

@ -108,8 +108,12 @@ export function relayInit(url: string): Relay {
let subid = getSubscriptionId(json)
if (subid) {
let {alreadyHaveEvent} = openSubs[subid]
if (alreadyHaveEvent && alreadyHaveEvent(getHex64(json, 'id'))) {
let so = openSubs[subid]
if (
so &&
so.alreadyHaveEvent &&
so.alreadyHaveEvent(getHex64(json, 'id'))
) {
return
}
}
@ -320,6 +324,7 @@ export function relayInit(url: string): Relay {
},
connect,
close(): Promise<void> {
if (ws.readyState > 1) return Promise.resolve()
ws.close()
return new Promise<void>(resolve => {
resolveClose = resolve