Compare commits

..

22 Commits

Author SHA1 Message Date
fiatjaf
f727058a3a rename benchmark.ts -> benchmarks.ts 2023-12-22 11:48:07 -03:00
fiatjaf
1de54838d3 changed the relay in the test, must also change the event queried for. 2023-12-22 11:46:02 -03:00
fiatjaf
703c29a311 fix things so relays tests work. 2023-12-22 11:38:35 -03:00
fiatjaf
ddf1064da9 adjust benchmarks to be done in a more realistic scenario. 2023-12-22 11:11:04 -03:00
fiatjaf
f719d99a11 rename this._push to this._onmessage and use it internally. 2023-12-22 10:54:03 -03:00
fiatjaf
6152238d65 update nostr-wasm to fix memory leak bug. 2023-12-22 10:53:08 -03:00
fiatjaf
9ac1b63994 a test on pool subscribing to many relays, getting many events then closing on eose. 2023-12-22 08:21:58 -03:00
fiatjaf
1890c91ae3 comments. 2023-12-22 08:02:39 -03:00
fiatjaf
7067b47cd4 remove last remains of pool-pure.ts 2023-12-22 07:51:17 -03:00
fiatjaf
397931f847 mention benchmark results in readme. 2023-12-22 06:59:32 -03:00
fiatjaf
5d795c291f fix relay.ts imports after 7f11c0c618. 2023-12-22 06:58:01 -03:00
fiatjaf
7adbd30799 streamline and improve benchmarks. 2023-12-22 06:57:23 -03:00
fiatjaf
83b6dd7ec3 remove pool-wasm.ts that I had forgotten. 2023-12-21 21:04:27 -03:00
fiatjaf
d61cc6c9bf just benchmark 2023-12-21 20:59:45 -03:00
fiatjaf
d7dad8e204 reduce spaces on justfile. 2023-12-21 20:50:42 -03:00
fiatjaf
daaa2ef0a1 bring back relayConnect() as deprecated. 2023-12-21 19:59:12 -03:00
fiatjaf
7f11c0c618 unsplit, backwards-compatibility, wasm relay and pool must be configured manually from the abstract classes. 2023-12-21 19:57:28 -03:00
fiatjaf
a4ae964ee6 split relay and pool into pure and wasm modules. 2023-12-21 17:27:42 -03:00
fiatjaf
1f7378ca49 import from core.ts instead of pure.ts whenever possible. 2023-12-21 17:27:32 -03:00
fiatjaf
d155bcdcda tag v2.0.3 2023-12-21 17:27:25 -03:00
Shusui MOYATANI
919d29363e export kinds 2023-12-21 16:42:30 -03:00
Shusui MOYATANI
ef12a451be fix ensureRelay 2023-12-21 16:42:00 -03:00
24 changed files with 786 additions and 591 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@ package-lock.json
.envrc
lib
test.html
bench.js

View File

@@ -43,9 +43,9 @@ let isGood = verifyEvent(event)
### Interacting with a relay
```js
import { relayConnect, finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools'
import { Relay, finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools'
const relay = await relayConnect('wss://relay.example.com')
const relay = await Relay.connect('wss://relay.example.com')
console.log(`connected to ${relay.url}`)
// let's query for an event that exists
@@ -210,6 +210,8 @@ Importing the entirety of `nostr-tools` may bloat your build, so you should prob
```js
import { generateSecretKey, finalizeEvent, verifyEvent } from 'nostr-tools/pure'
import { SimplePool } from 'nostr-tools/pool'
import { Relay, Subscription } from 'nostr-tools/relay'
import { matchFilter } from 'nostr-tools/filter'
import { decode, nprofileEncode, neventEncode, npubEncode } from 'nostr-tools/nip19'
// and so on and so forth
@@ -230,7 +232,36 @@ initNostrWasm().then(setNostrWasm)
// see https://www.npmjs.com/package/nostr-wasm for options
```
This may be faster than the pure-JS [noble libraries](https://paulmillr.com/noble/) used by default and in `nostr-tools/pure`.
If you're going to use `Relay` and `SimplePool` you must also import `nostr-tools/abstract-relay` and/or `nostr-tools/abstract-pool` instead of the defaults and then instantiate them by passing the `verifyEvent`:
```js
import { setNostrWasm, verifyEvent } from 'nostr-tools/wasm'
import { AbstractRelay } from 'nostr-tools/abstract-relay'
import { AbstractSimplePool } from 'nostr-tools/abstract-pool'
import { initNostrWasm } from 'nostr-wasm'
initNostrWasm().then(setNostrWasm)
const relay = AbstractRelay.connect('wss://relayable.org', { verifyEvent })
const pool = new AbstractSimplePool({ verifyEvent })
```
This may be faster than the pure-JS [noble libraries](https://paulmillr.com/noble/) used by default and in `nostr-tools/pure`. Benchmarks:
```
benchmark time (avg) (min … max) p75 p99 p995
------------------------------------------------- -----------------------------
• relay read message and verify event (many events)
------------------------------------------------- -----------------------------
wasm 34.94 ms/iter (34.61 ms … 35.73 ms) 35.07 ms 35.73 ms 35.73 ms
pure js 239.7 ms/iter (235.41 ms … 243.69 ms) 240.51 ms 243.69 ms 243.69 ms
trusted 402.71 µs/iter (344.57 µs … 2.98 ms) 407.39 µs 745.62 µs 812.59 µs
summary for relay read message and verify event
wasm
86.77x slower than trusted
6.86x faster than pure js
```
### Using from the browser (if you don't want to use a bundler)

190
abstract-pool.ts Normal file
View File

@@ -0,0 +1,190 @@
import { AbstractRelay as AbstractRelay, SubscriptionParams, Subscription } from './abstract-relay.ts'
import { normalizeURL } from './utils.ts'
import type { Event, Nostr } from './core.ts'
import { type Filter } from './filter.ts'
import { alwaysTrue } from './helpers.ts'
export type SubCloser = { close: () => void }
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
maxWait?: number
onclose?: (reasons: string[]) => void
id?: string
}
export class AbstractSimplePool {
private relays = new Map<string, AbstractRelay>()
public seenOn = new Map<string, Set<AbstractRelay>>()
public trackRelays: boolean = false
public verifyEvent: Nostr['verifyEvent']
public trustedRelayURLs = new Set<string>()
constructor(opts: { verifyEvent: Nostr['verifyEvent'] }) {
this.verifyEvent = opts.verifyEvent
}
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
url = normalizeURL(url)
let relay = this.relays.get(url)
if (!relay) {
relay = new AbstractRelay(url, {
verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent,
})
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
this.relays.set(url, relay)
}
await relay.connect()
return relay
}
close(relays: string[]) {
relays.map(normalizeURL).forEach(url => {
this.relays.get(url)?.close()
})
}
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
if (this.trackRelays) {
params.receivedEvent = (relay: AbstractRelay, id: string) => {
let set = this.seenOn.get(id)
if (!set) {
set = new Set()
this.seenOn.set(id, set)
}
set.add(relay)
}
}
const _knownIds = new Set<string>()
const subs: Subscription[] = []
// batch all EOSEs into a single
const eosesReceived: boolean[] = []
let handleEose = (i: number) => {
eosesReceived[i] = true
if (eosesReceived.filter(a => a).length === relays.length) {
params.oneose?.()
handleEose = () => {}
}
}
// batch all closes into a single
const closesReceived: string[] = []
let handleClose = (i: number, reason: string) => {
handleEose(i)
closesReceived[i] = reason
if (closesReceived.filter(a => a).length === relays.length) {
params.onclose?.(closesReceived)
handleClose = () => {}
}
}
const localAlreadyHaveEventHandler = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
// open a subscription in all given relays
const allOpened = Promise.all(
relays.map(normalizeURL).map(async (url, i, arr) => {
if (arr.indexOf(url) !== i) {
// duplicate
handleClose(i, 'duplicate url')
return
}
let relay: AbstractRelay
try {
relay = await this.ensureRelay(url, {
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
})
} catch (err) {
handleClose(i, (err as any)?.message || String(err))
return
}
let subscription = relay.subscribe(filters, {
...params,
oneose: () => handleEose(i),
onclose: reason => handleClose(i, reason),
alreadyHaveEvent: localAlreadyHaveEventHandler,
eoseTimeout: params.maxWait,
})
subs.push(subscription)
}),
)
return {
async close() {
await allOpened
subs.forEach(sub => {
sub.close()
})
},
}
}
subscribeManyEose(
relays: string[],
filters: Filter[],
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'maxWait'>,
): SubCloser {
const subcloser = this.subscribeMany(relays, filters, {
...params,
oneose() {
subcloser.close()
},
})
return subcloser
}
async querySync(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event[]> {
return new Promise(async resolve => {
const events: Event[] = []
this.subscribeManyEose(relays, [filter], {
...params,
onevent(event: Event) {
events.push(event)
},
onclose(_: string[]) {
resolve(events)
},
})
})
}
async get(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event | null> {
filter.limit = 1
const events = await this.querySync(relays, filter, params)
events.sort((a, b) => b.created_at - a.created_at)
return events[0] || null
}
publish(relays: string[], event: Event): Promise<string>[] {
return relays.map(normalizeURL).map(async (url, i, arr) => {
if (arr.indexOf(url) !== i) {
// duplicate
return Promise.reject('duplicate url')
}
let r = await this.ensureRelay(url)
return r.publish(event)
})
}
}

356
abstract-relay.ts Normal file
View File

@@ -0,0 +1,356 @@
/* global WebSocket */
import type { Event, EventTemplate, Nostr } from './core.ts'
import { matchFilters, type Filter } from './filter.ts'
import { getHex64, getSubscriptionId } from './fakejson.ts'
import { Queue, normalizeURL } from './utils.ts'
import { makeAuthEvent } from './nip42.ts'
import { yieldThread } from './helpers.ts'
export class AbstractRelay {
public readonly url: string
private _connected: boolean = false
public onclose: (() => void) | null = null
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
public baseEoseTimeout: number = 4400
public connectionTimeout: number = 4400
public openSubs = new Map<string, Subscription>()
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private connectionPromise: Promise<void> | undefined
private openCountRequests = new Map<string, CountResolver>()
private openEventPublishes = new Map<string, EventPublishResolver>()
private ws: WebSocket | undefined
private incomingMessageQueue = new Queue<string>()
private queueRunning = false
private challenge: string | undefined
private serial: number = 0
private verifyEvent: Nostr['verifyEvent']
constructor(url: string, opts: { verifyEvent: Nostr['verifyEvent'] }) {
this.url = normalizeURL(url)
this.verifyEvent = opts.verifyEvent
}
static async connect(url: string, opts: { verifyEvent: Nostr['verifyEvent'] }) {
const relay = new AbstractRelay(url, opts)
await relay.connect()
return relay
}
private closeAllSubscriptions(reason: string) {
for (let [_, sub] of this.openSubs) {
sub.close(reason)
}
this.openSubs.clear()
for (let [_, ep] of this.openEventPublishes) {
ep.reject(new Error(reason))
}
this.openEventPublishes.clear()
for (let [_, cr] of this.openCountRequests) {
cr.reject(new Error(reason))
}
this.openCountRequests.clear()
}
public get connected(): boolean {
return this._connected
}
public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise
this.challenge = undefined
this.connectionPromise = new Promise((resolve, reject) => {
this.connectionTimeoutHandle = setTimeout(() => {
reject('connection timed out')
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection timed out')
}, this.connectionTimeout)
try {
this.ws = new WebSocket(this.url)
} catch (err) {
reject(err)
return
}
this.ws.onopen = () => {
clearTimeout(this.connectionTimeoutHandle)
this._connected = true
resolve()
}
this.ws.onerror = ev => {
reject((ev as any).message)
if (this._connected) {
this.onclose?.()
this.closeAllSubscriptions('relay connection errored')
this._connected = false
}
}
this.ws.onclose = async () => {
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection closed')
this._connected = false
}
this.ws.onmessage = this._onmessage.bind(this)
})
return this.connectionPromise
}
private async runQueue() {
this.queueRunning = true
while (true) {
if (false === this.handleNext()) {
break
}
await yieldThread()
}
this.queueRunning = false
}
private handleNext(): undefined | false {
const json = this.incomingMessageQueue.dequeue()
if (!json) {
return false
}
const subid = getSubscriptionId(json)
if (subid) {
const so = this.openSubs.get(subid as string)
if (!so) {
// this is an EVENT message, but for a subscription we don't have, so just stop here
return
}
// this will be called only when this message is a EVENT message for a subscription we have
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
const alreadyHave = so.alreadyHaveEvent?.(id)
// notify any interested client that the relay has this event
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
so.receivedEvent?.(this, id)
if (alreadyHave) {
// if we had already seen this event we can just stop here
return
}
}
try {
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
const so = this.openSubs.get(data[1] as string) as Subscription
const event = data[2] as Event
if (this.verifyEvent(event) && matchFilters(so.filters, event)) {
so.onevent(event)
}
return
}
case 'COUNT': {
const id: string = data[1]
const payload = data[2] as { count: number }
const cr = this.openCountRequests.get(id) as CountResolver
if (cr) {
cr.resolve(payload.count)
this.openCountRequests.delete(id)
}
return
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so) return
so.receivedEose()
return
}
case 'OK': {
const id: string = data[1]
const ok: boolean = data[2]
const reason: string = data[3]
const ep = this.openEventPublishes.get(id) as EventPublishResolver
if (ok) ep.resolve(reason)
else ep.reject(new Error(reason))
this.openEventPublishes.delete(id)
return
}
case 'CLOSED': {
const id: string = data[1]
const so = this.openSubs.get(id)
if (!so) return
so.closed = true
so.close(data[2] as string)
return
}
case 'NOTICE':
this.onnotice(data[1] as string)
return
case 'AUTH': {
this.challenge = data[1] as string
return
}
}
} catch (err) {
return
}
}
public async send(message: string) {
if (!this.connectionPromise) throw new Error('sending on closed connection')
this.connectionPromise.then(() => {
this.ws?.send(message)
})
}
public async auth(signAuthEvent: (authEvent: EventTemplate) => Promise<void>) {
if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
const evt = makeAuthEvent(this.url, this.challenge)
await signAuthEvent(evt)
this.send('["AUTH",' + JSON.stringify(evt) + ']')
}
public async publish(event: Event): Promise<string> {
const ret = new Promise<string>((resolve, reject) => {
this.openEventPublishes.set(event.id, { resolve, reject })
})
this.send('["EVENT",' + JSON.stringify(event) + ']')
return ret
}
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
this.serial++
const id = params?.id || 'count:' + this.serial
const ret = new Promise<number>((resolve, reject) => {
this.openCountRequests.set(id, { resolve, reject })
})
this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']')
return ret
}
public subscribe(filters: Filter[], params: Partial<SubscriptionParams>): Subscription {
const subscription = this.prepareSubscription(filters, params)
subscription.fire()
return subscription
}
public prepareSubscription(filters: Filter[], params: Partial<SubscriptionParams> & { id?: string }): Subscription {
this.serial++
const id = params.id || 'sub:' + this.serial
const subscription = new Subscription(this, id, filters, params)
this.openSubs.set(id, subscription)
return subscription
}
public close() {
this.closeAllSubscriptions('relay connection closed by us')
this._connected = false
this.ws?.close()
}
// this is the function assigned to this.ws.onmessage
// it's exposed for testing and debugging purposes
public _onmessage(ev: MessageEvent<any>) {
this.incomingMessageQueue.enqueue(ev.data as string)
if (!this.queueRunning) {
this.runQueue()
}
}
}
export class Subscription {
public readonly relay: AbstractRelay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
public filters: Filter[]
public alreadyHaveEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((relay: AbstractRelay, id: string) => void) | undefined
public onevent: (evt: Event) => void
public oneose: (() => void) | undefined
public onclose: ((reason: string) => void) | undefined
public eoseTimeout: number
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined
constructor(relay: AbstractRelay, id: string, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = id
this.alreadyHaveEvent = params.alreadyHaveEvent
this.receivedEvent = params.receivedEvent
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout
this.oneose = params.oneose
this.onclose = params.onclose
this.onevent =
params.onevent ||
(event => {
console.warn(
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
event,
)
})
}
public fire() {
this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1))
// only now we start counting the eoseTimeout
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout)
}
public receivedEose() {
if (this.eosed) return
clearTimeout(this.eoseTimeoutHandle)
this.eosed = true
this.oneose?.()
}
public close(reason: string = 'closed by caller') {
if (!this.closed) {
// if the connection was closed by the user calling .close() we will send a CLOSE message
// otherwise this._open will be already set to false so we will skip this
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
this.closed = true
}
this.relay.openSubs.delete(this.id)
this.onclose?.(reason)
}
}
export type SubscriptionParams = {
onevent?: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent?: (id: string) => boolean
receivedEvent?: (relay: AbstractRelay, id: string) => void
eoseTimeout?: number
}
export type CountResolver = {
resolve: (count: number) => void
reject: (err: Error) => void
}
export type EventPublishResolver = {
resolve: (reason: string) => void
reject: (err: Error) => void
}

61
benchmarks.ts Normal file
View File

@@ -0,0 +1,61 @@
import { run, bench, group, baseline } from 'mitata'
import { initNostrWasm } from 'nostr-wasm'
import { NostrEvent } from './core'
import { finalizeEvent, generateSecretKey } from './pure'
import { setNostrWasm, verifyEvent } from './wasm'
import { AbstractRelay } from './abstract-relay.ts'
import { Relay as PureRelay } from './relay.ts'
import { alwaysTrue } from './helpers.ts'
// benchmarking relay reads with verifyEvent
const EVENTS = 200
let messages: string[] = []
let baseContent = ''
for (let i = 0; i < EVENTS; i++) {
baseContent += 'a'
}
const secretKey = generateSecretKey()
for (let i = 0; i < EVENTS; i++) {
const tags = []
for (let t = 0; t < i; t++) {
tags.push(['t', 'nada'])
}
const event = { created_at: Math.round(Date.now()) / 1000, kind: 1, content: baseContent.slice(0, EVENTS - i), tags }
const signed = finalizeEvent(event, secretKey)
messages.push(JSON.stringify(['EVENT', '_', signed]))
}
setNostrWasm(await initNostrWasm())
const pureRelay = new PureRelay('wss://pure.com/')
const trustedRelay = new AbstractRelay('wss://trusted.com/', { verifyEvent: alwaysTrue })
const wasmRelay = new AbstractRelay('wss://wasm.com/', { verifyEvent })
const runWith = (relay: AbstractRelay) => async () => {
return new Promise<void>(resolve => {
let received = 0
let sub = relay.prepareSubscription([{}], {
id: '_',
onevent(_: NostrEvent) {
received++
if (received === messages.length - 1) {
resolve()
sub.closed = true
sub.close()
}
},
})
for (let e = 0; e < messages.length; e++) {
relay._onmessage({ data: messages[e] } as any)
}
})
}
group(`relay read ${EVENTS} messages and verify its events`, () => {
baseline('wasm', runWith(wasmRelay))
bench('pure js', runWith(pureRelay))
bench('trusted', runWith(trustedRelay))
})
// actually running the thing
await run()

View File

@@ -10,6 +10,7 @@ const entryPoints = fs
file !== 'core.ts' &&
file !== 'test-helpers.ts' &&
file !== 'helpers.ts' &&
file !== 'benchmarks.ts' &&
!file.endsWith('.test.ts') &&
fs.statSync(join(process.cwd(), file)).isFile(),
)

BIN
bun.lockb

Binary file not shown.

View File

@@ -19,6 +19,7 @@ export interface Event {
[verifiedSymbol]?: boolean
}
export type NostrEvent = Event
export type EventTemplate = Pick<Event, 'kind' | 'tags' | 'content' | 'created_at'>
export type UnsignedEvent = Pick<Event, 'kind' | 'tags' | 'content' | 'created_at' | 'pubkey'>

View File

@@ -1,4 +1,4 @@
import { Event } from './pure.ts'
import { Event } from './core.ts'
export type Filter = {
ids?: string[]

View File

@@ -1,3 +1,5 @@
import { verifiedSymbol, type Event, type Nostr, VerifiedEvent } from './core.ts'
export async function yieldThread() {
return new Promise(resolve => {
const ch = new MessageChannel()
@@ -7,3 +9,8 @@ export async function yieldThread() {
ch.port1.start()
})
}
export const alwaysTrue: Nostr['verifyEvent'] = (t: Event): t is VerifiedEvent => {
t[verifiedSymbol] = true
return true
}

View File

@@ -1,6 +1,5 @@
export * from './pure.ts'
export * from './relay.ts'
export * from './pure.ts'
export * from './filter.ts'
export * from './pool.ts'
export * from './references.ts'
@@ -24,5 +23,6 @@ export * as nip47 from './nip47.ts'
export * as nip57 from './nip57.ts'
export * as nip98 from './nip98.ts'
export * as kinds from './kinds.ts'
export * as fj from './fakejson.ts'
export * as utils from './utils.ts'

View File

@@ -1,25 +1,32 @@
export PATH := "./node_modules/.bin:" + env_var('PATH')
build:
rm -rf lib
bun run build.js
rm -rf lib
bun run build.js
test:
bun test --timeout 20000
bun test --timeout 20000
test-only file:
bun test {{file}}
bun test {{file}}
emit-types:
tsc # see tsconfig.json
tsc # see tsconfig.json
publish: build emit-types
npm publish
npm publish
format:
eslint --ext .ts --fix *.ts
prettier --write *.ts
eslint --ext .ts --fix *.ts
prettier --write *.ts
lint:
eslint --ext .ts *.ts
prettier --check *.ts
eslint --ext .ts *.ts
prettier --check *.ts
benchmark:
bun build --target=node --outfile=bench.js benchmarks.ts
timeout 60s deno run --allow-read bench.js || true
timeout 60s node bench.js || true
timeout 60s bun run benchmarks.ts || true
rm bench.js

View File

@@ -1,4 +1,4 @@
import type { Event } from './pure.ts'
import type { Event } from './core.ts'
import type { EventPointer, ProfilePointer } from './nip19.ts'
export type NIP10Result = {

View File

@@ -1,10 +1,10 @@
import { test, expect } from 'bun:test'
import { makeAuthEvent } from './nip42.ts'
import { relayConnect } from './relay.ts'
import { Relay } from './relay.ts'
test('auth flow', async () => {
const relay = await relayConnect('wss://nostr.wine')
const relay = await Relay.connect('wss://nostr.wine')
const auth = makeAuthEvent(relay.url, 'chachacha')
expect(auth.tags).toHaveLength(2)

View File

@@ -1,4 +1,4 @@
import { EventTemplate } from './pure.ts'
import { EventTemplate } from './core.ts'
import { ClientAuth } from './kinds.ts'
/**

View File

@@ -1,6 +1,7 @@
{
"type": "module",
"name": "nostr-tools",
"version": "2.0.2",
"version": "2.1.0",
"description": "Tools for making a Nostr client.",
"repository": {
"type": "git",
@@ -29,16 +30,31 @@
"require": "./lib/cjs/wasm.js",
"types": "./lib/types/wasm.d.ts"
},
"./kinds": {
"import": "./lib/esm/kinds.js",
"require": "./lib/cjs/kinds.js",
"types": "./lib/types/kinds.d.ts"
},
"./filter": {
"import": "./lib/esm/filter.js",
"require": "./lib/cjs/filter.js",
"types": "./lib/types/filter.d.ts"
},
"./abstract-relay": {
"import": "./lib/esm/abstract-relay.js",
"require": "./lib/cjs/abstract-relay.js",
"types": "./lib/types/abstract-relay.d.ts"
},
"./relay": {
"import": "./lib/esm/relay.js",
"require": "./lib/cjs/relay.js",
"types": "./lib/types/relay.d.ts"
},
"./abstract-pool": {
"import": "./lib/esm/abstract-pool.js",
"require": "./lib/cjs/abstract-pool.js",
"types": "./lib/types/abstract-pool.d.ts"
},
"./pool": {
"import": "./lib/esm/pool.js",
"require": "./lib/cjs/pool.js",
@@ -158,7 +174,8 @@
"@scure/base": "1.1.1",
"@scure/bip32": "1.3.1",
"@scure/bip39": "1.2.1",
"nostr-wasm": "v0.0.3"
"mitata": "^0.1.6",
"nostr-wasm": "v0.1.0"
},
"peerDependencies": {
"typescript": ">=5.0.0"

View File

@@ -12,7 +12,7 @@ afterAll(() => {
pool.close([...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'])
})
test('removing duplicates when querying', async () => {
test('removing duplicates when subscribing', async () => {
let priv = generateSecretKey()
let pub = getPublicKey(priv)
@@ -43,7 +43,7 @@ test('removing duplicates when querying', async () => {
expect(received[0]).toEqual(event)
})
test('same with double querying', async () => {
test('same with double subs', async () => {
let priv = generateSecretKey()
let pub = getPublicKey(priv)
@@ -76,6 +76,23 @@ test('same with double querying', async () => {
expect(received).toHaveLength(2)
})
test('query a bunch of events and cancel on eose', async () => {
let events = new Set<string>()
await new Promise<void>(resolve => {
pool.subscribeManyEose(
[...relays, 'wss://relayable.org', 'wss://relay.noswhere.com', 'wss://nothing.com'],
[{ kinds: [0, 1], limit: 40 }],
{
onevent(event) {
events.add(event.id)
},
onclose: resolve as any,
},
)
})
expect(events.size).toBeGreaterThan(50)
})
test('querySync()', async () => {
let events = await pool.querySync([...relays.slice(2), 'wss://offchain.pub', 'wss://eden.nostr.land'], {
authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'],

187
pool.ts
View File

@@ -1,183 +1,10 @@
import { Relay, SubscriptionParams, Subscription } from './relay.ts'
import { normalizeURL } from './utils.ts'
import { verifyEvent } from './pure.ts'
import { AbstractSimplePool } from './abstract-pool.ts'
import type { Event } from './pure.ts'
import { type Filter } from './filter.ts'
export type SubCloser = { close: () => void }
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
maxWait?: number
onclose?: (reasons: string[]) => void
id?: string
}
export class SimplePool {
private relays = new Map<string, Relay>()
public seenOn = new Map<string, Set<Relay>>()
public trackRelays: boolean = false
public trustedRelayURLs = new Set<string>()
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<Relay> {
url = normalizeURL(url)
let relay = this.relays.get(url)
if (!relay) {
relay = new Relay(url)
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
if (this.trustedRelayURLs.has(relay.url)) relay.trusted = true
this.relays.set(url, relay)
await relay.connect()
}
return relay
}
close(relays: string[]) {
relays.map(normalizeURL).forEach(url => {
this.relays.get(url)?.close()
})
}
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
if (this.trackRelays) {
params.receivedEvent = (relay: Relay, id: string) => {
let set = this.seenOn.get(id)
if (!set) {
set = new Set()
this.seenOn.set(id, set)
}
set.add(relay)
}
}
const _knownIds = new Set<string>()
const subs: Subscription[] = []
// batch all EOSEs into a single
const eosesReceived: boolean[] = []
let handleEose = (i: number) => {
eosesReceived[i] = true
if (eosesReceived.filter(a => a).length === relays.length) {
params.oneose?.()
handleEose = () => {}
}
}
// batch all closes into a single
const closesReceived: string[] = []
let handleClose = (i: number, reason: string) => {
handleEose(i)
closesReceived[i] = reason
if (closesReceived.filter(a => a).length === relays.length) {
params.onclose?.(closesReceived)
handleClose = () => {}
}
}
const localAlreadyHaveEventHandler = (id: string) => {
if (params.alreadyHaveEvent?.(id)) {
return true
}
const have = _knownIds.has(id)
_knownIds.add(id)
return have
}
// open a subscription in all given relays
const allOpened = Promise.all(
relays.map(normalizeURL).map(async (url, i, arr) => {
if (arr.indexOf(url) !== i) {
// duplicate
handleClose(i, 'duplicate url')
return
}
let relay: Relay
try {
relay = await this.ensureRelay(url, {
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
})
} catch (err) {
handleClose(i, (err as any)?.message || String(err))
return
}
let subscription = relay.subscribe(filters, {
...params,
oneose: () => handleEose(i),
onclose: reason => handleClose(i, reason),
alreadyHaveEvent: localAlreadyHaveEventHandler,
eoseTimeout: params.maxWait,
})
subs.push(subscription)
}),
)
return {
async close() {
await allOpened
subs.forEach(sub => {
sub.close()
})
},
}
}
subscribeManyEose(
relays: string[],
filters: Filter[],
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'maxWait'>,
): SubCloser {
const subcloser = this.subscribeMany(relays, filters, {
...params,
oneose() {
subcloser.close()
},
})
return subcloser
}
async querySync(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event[]> {
return new Promise(async resolve => {
const events: Event[] = []
this.subscribeManyEose(relays, [filter], {
...params,
onevent(event: Event) {
events.push(event)
},
onclose(_: string[]) {
resolve(events)
},
})
})
}
async get(
relays: string[],
filter: Filter,
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
): Promise<Event | null> {
filter.limit = 1
const events = await this.querySync(relays, filter, params)
events.sort((a, b) => b.created_at - a.created_at)
return events[0] || null
}
publish(relays: string[], event: Event): Promise<string>[] {
return relays.map(normalizeURL).map(async (url, i, arr) => {
if (arr.indexOf(url) !== i) {
// duplicate
return Promise.reject('duplicate url')
}
let r = await this.ensureRelay(url)
return r.publish(event)
})
export class SimplePool extends AbstractSimplePool {
constructor() {
super({ verifyEvent })
}
}
export * from './abstract-pool.ts'

View File

@@ -1,6 +1,6 @@
import { decode, type AddressPointer, type ProfilePointer, type EventPointer } from './nip19.ts'
import type { Event } from './pure.ts'
import type { Event } from './core.ts'
type Reference = {
text: string

View File

@@ -1,9 +1,9 @@
import { afterEach, expect, test } from 'bun:test'
import { finalizeEvent, generateSecretKey, getPublicKey } from './pure.ts'
import { Relay, relayConnect } from './relay.ts'
import { NostrEvent, finalizeEvent, generateSecretKey, getPublicKey } from './pure.ts'
import { Relay } from './relay.ts'
let relay = new Relay('wss://public.relaying.io')
let relay = new Relay('wss://relay.nostr.bg')
afterEach(() => {
relay.close()
@@ -14,48 +14,46 @@ test('connectivity', async () => {
expect(relay.connected).toBeTrue()
})
test('connectivity, with relayConnect()', async () => {
const relay = await relayConnect('wss://public.relaying.io')
test('connectivity, with Relay.connect()', async () => {
const relay = await Relay.connect('wss://public.relaying.io')
expect(relay.connected).toBeTrue()
relay.close()
})
test('querying', async () => {
await relay.connect()
let resolve1: () => void
let resolve2: () => void
let resolveEvent: () => void
let resolveEose: () => void
let waiting = Promise.all([
new Promise<void>(resolve => {
resolve1 = resolve
}),
new Promise<void>(resolve => {
resolve2 = resolve
}),
])
const evented = new Promise<void>(resolve => {
resolveEvent = resolve
})
const eosed = new Promise<void>(resolve => {
resolveEose = resolve
})
relay.subscribe(
[
{
ids: ['3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4'],
authors: ['9bbe185a20f50607b6e021c68a2c7275649770d3f8277c120d2b801a2b9a64fc'],
kinds: [0],
},
],
{
onevent(event) {
expect(event).toHaveProperty('id', '3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4')
expect(event).toHaveProperty('content', '+')
expect(event).toHaveProperty('kind', 7)
resolve1()
expect(event).toHaveProperty('pubkey', '9bbe185a20f50607b6e021c68a2c7275649770d3f8277c120d2b801a2b9a64fc')
expect(event).toHaveProperty('kind', 0)
resolveEvent()
},
oneose() {
resolve2()
resolveEose()
},
},
)
let [t1, t2] = await waiting
expect(t1).toBeUndefined()
expect(t2).toBeUndefined()
await eosed
await evented
}, 10000)
test('listening and publishing and closing', async () => {
@@ -63,17 +61,20 @@ test('listening and publishing and closing', async () => {
let sk = generateSecretKey()
let pk = getPublicKey(sk)
var resolve1: (_: void) => void
var resolve2: (_: void) => void
let resolveEose: (_: void) => void
let resolveEvent: (_: void) => void
let resolveClose: (_: void) => void
let eventReceived: NostrEvent | undefined
let waiting = Promise.all([
new Promise(resolve => {
resolve1 = resolve
}),
new Promise(resolve => {
resolve2 = resolve
}),
])
const eosed = new Promise(resolve => {
resolveEose = resolve
})
const evented = new Promise(resolve => {
resolveEvent = resolve
})
const closed = new Promise(resolve => {
resolveClose = resolve
})
let sub = relay.subscribe(
[
@@ -84,17 +85,20 @@ test('listening and publishing and closing', async () => {
],
{
onevent(event) {
expect(event).toHaveProperty('pubkey', pk)
expect(event).toHaveProperty('kind', 23571)
expect(event).toHaveProperty('content', 'nostr-tools test suite')
resolve1()
eventReceived = event
resolveEvent()
},
oneose() {
resolveEose()
},
onclose() {
resolve2()
resolveClose()
},
},
)
await eosed
let event = finalizeEvent(
{
kind: 23571,
@@ -106,9 +110,12 @@ test('listening and publishing and closing', async () => {
)
await relay.publish(event)
await evented
sub.close()
await closed
let [t1, t2] = await waiting
expect(t1).toBeUndefined()
expect(t2).toBeUndefined()
expect(eventReceived).toBeDefined()
expect(eventReceived).toHaveProperty('pubkey', pk)
expect(eventReceived).toHaveProperty('kind', 23571)
expect(eventReceived).toHaveProperty('content', 'nostr-tools test suite')
})

356
relay.ts
View File

@@ -1,351 +1,23 @@
/* global WebSocket */
import { verifyEvent } from './pure.ts'
import { AbstractRelay } from './abstract-relay.ts'
import { verifyEvent, validateEvent, type Event, EventTemplate } from './pure.ts'
import { matchFilters, type Filter } from './filter.ts'
import { getHex64, getSubscriptionId } from './fakejson.ts'
import { Queue, normalizeURL } from './utils.ts'
import { nip42 } from './index.ts'
import { yieldThread } from './helpers.ts'
export async function relayConnect(url: string) {
const relay = new Relay(url)
await relay.connect()
return relay
/**
* @deprecated use Relay.connect() instead.
*/
export function relayConnect(url: string): Promise<Relay> {
return Relay.connect(url)
}
export class Relay {
public readonly url: string
private _connected: boolean = false
public trusted: boolean = false
public onclose: (() => void) | null = null
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
public baseEoseTimeout: number = 4400
public connectionTimeout: number = 4400
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private connectionPromise: Promise<void> | undefined
private openSubs = new Map<string, Subscription>()
private openCountRequests = new Map<string, CountResolver>()
private openEventPublishes = new Map<string, EventPublishResolver>()
private ws: WebSocket | undefined
private incomingMessageQueue = new Queue<string>()
private queueRunning = false
private challenge: string | undefined
private serial: number = 0
export class Relay extends AbstractRelay {
constructor(url: string) {
this.url = normalizeURL(url)
super(url, { verifyEvent })
}
private closeAllSubscriptions(reason: string) {
for (let [_, sub] of this.openSubs) {
sub.close(reason)
}
this.openSubs.clear()
for (let [_, ep] of this.openEventPublishes) {
ep.reject(new Error(reason))
}
this.openEventPublishes.clear()
for (let [_, cr] of this.openCountRequests) {
cr.reject(new Error(reason))
}
this.openCountRequests.clear()
}
public get connected(): boolean {
return this._connected
}
public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise
this.challenge = undefined
this.connectionPromise = new Promise((resolve, reject) => {
this.connectionTimeoutHandle = setTimeout(() => {
reject('connection timed out')
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection timed out')
}, this.connectionTimeout)
try {
this.ws = new WebSocket(this.url)
} catch (err) {
reject(err)
return
}
this.ws.onopen = () => {
clearTimeout(this.connectionTimeoutHandle)
this._connected = true
resolve()
}
this.ws.onerror = ev => {
reject((ev as any).message)
if (this._connected) {
this.onclose?.()
this.closeAllSubscriptions('relay connection errored')
this._connected = false
}
}
this.ws.onclose = async () => {
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection closed')
this._connected = false
}
this.ws.onmessage = ev => {
this.incomingMessageQueue.enqueue(ev.data as string)
if (!this.queueRunning) {
this.runQueue()
}
}
})
return this.connectionPromise
}
private async runQueue() {
this.queueRunning = true
while (true) {
if (false === this.handleNext()) {
break
}
await yieldThread()
}
this.queueRunning = false
}
private handleNext(): undefined | false {
const json = this.incomingMessageQueue.dequeue()
if (!json) {
return false
}
const subid = getSubscriptionId(json)
if (subid) {
const so = this.openSubs.get(subid as string)
if (!so) {
// this is an EVENT message, but for a subscription we don't have, so just stop here
return
}
// this will be called only when this message is a EVENT message for a subscription we have
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
const alreadyHave = so.alreadyHaveEvent?.(id)
// notify any interested client that the relay has this event
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
so.receivedEvent?.(this, id)
if (alreadyHave) {
// if we had already seen this event we can just stop here
return
}
}
try {
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
const so = this.openSubs.get(data[1] as string) as Subscription
const event = data[2] as Event
if ((this.trusted || (validateEvent(event) && verifyEvent(event))) && matchFilters(so.filters, event)) {
so.onevent(event)
}
return
}
case 'COUNT': {
const id: string = data[1]
const payload = data[2] as { count: number }
const cr = this.openCountRequests.get(id) as CountResolver
if (cr) {
cr.resolve(payload.count)
this.openCountRequests.delete(id)
}
return
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so) return
so.receivedEose()
return
}
case 'OK': {
const id: string = data[1]
const ok: boolean = data[2]
const reason: string = data[3]
const ep = this.openEventPublishes.get(id) as EventPublishResolver
if (ok) ep.resolve(reason)
else ep.reject(new Error(reason))
this.openEventPublishes.delete(id)
return
}
case 'CLOSED': {
const id: string = data[1]
const so = this.openSubs.get(id)
if (!so) return
so.closed = true
so.close(data[2] as string)
this.openSubs.delete(id)
return
}
case 'NOTICE':
this.onnotice(data[1] as string)
return
case 'AUTH': {
this.challenge = data[1] as string
return
}
}
} catch (err) {
return
}
}
public async send(message: string) {
if (!this.connectionPromise) throw new Error('sending on closed connection')
this.connectionPromise.then(() => {
this.ws?.send(message)
})
}
public async auth(signAuthEvent: (authEvent: EventTemplate) => Promise<void>) {
if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
const evt = nip42.makeAuthEvent(this.url, this.challenge)
await signAuthEvent(evt)
this.send('["AUTH",' + JSON.stringify(evt) + ']')
}
public async publish(event: Event): Promise<string> {
const ret = new Promise<string>((resolve, reject) => {
this.openEventPublishes.set(event.id, { resolve, reject })
})
this.send('["EVENT",' + JSON.stringify(event) + ']')
return ret
}
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
this.serial++
const id = params?.id || 'count:' + this.serial
const ret = new Promise<number>((resolve, reject) => {
this.openCountRequests.set(id, { resolve, reject })
})
this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']')
return ret
}
public subscribe(filters: Filter[], params: Partial<SubscriptionParams>): Subscription {
const subscription = this.prepareSubscription(filters, params)
subscription.fire()
return subscription
}
public prepareSubscription(filters: Filter[], params: Partial<SubscriptionParams> & { id?: string }): Subscription {
this.serial++
const id = params.id || 'sub:' + this.serial
const subscription = new Subscription(this, id, filters, params)
this.openSubs.set(id, subscription)
return subscription
}
public close() {
this.closeAllSubscriptions('relay connection closed by us')
this._connected = false
this.ws?.close()
static async connect(url: string) {
const relay = new Relay(url)
await relay.connect()
return relay
}
}
export class Subscription {
public readonly relay: Relay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
public filters: Filter[]
public alreadyHaveEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((relay: Relay, id: string) => void) | undefined
public onevent: (evt: Event) => void
public oneose: (() => void) | undefined
public onclose: ((reason: string) => void) | undefined
public eoseTimeout: number
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined
constructor(relay: Relay, id: string, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = id
this.alreadyHaveEvent = params.alreadyHaveEvent
this.receivedEvent = params.receivedEvent
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout
this.oneose = params.oneose
this.onclose = params.onclose
this.onevent =
params.onevent ||
(event => {
console.warn(
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
event,
)
})
}
public fire() {
this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1))
// only now we start counting the eoseTimeout
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout)
}
public receivedEose() {
if (this.eosed) return
clearTimeout(this.eoseTimeoutHandle)
this.eosed = true
this.oneose?.()
}
public close(reason: string = 'closed by caller') {
if (!this.closed) {
// if the connection was closed by the user calling .close() we will send a CLOSE message
// otherwise this._open will be already set to false so we will skip this
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
this.closed = true
}
this.onclose?.(reason)
}
}
export type SubscriptionParams = {
onevent?: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent?: (id: string) => boolean
receivedEvent?: (relay: Relay, id: string) => void
eoseTimeout?: number
}
export type CountResolver = {
resolve: (count: number) => void
reject: (err: Error) => void
}
export type EventPublishResolver = {
resolve: (reason: string) => void
reject: (err: Error) => void
}
export * from './abstract-relay.ts'

View File

@@ -9,10 +9,10 @@
"skipLibCheck": true,
"esModuleInterop": true,
"emitDeclarationOnly": true,
"allowImportingTsExtensions": true,
"outDir": "lib/types",
"resolveJsonModule": true,
"rootDir": ".",
"allowImportingTsExtensions": true,
"types": ["bun-types"]
}
}

View File

@@ -2,7 +2,7 @@ import { describe, test, expect } from 'bun:test'
import { buildEvent } from './test-helpers.ts'
import { Queue, insertEventIntoAscendingList, insertEventIntoDescendingList, binarySearch } from './utils.ts'
import type { Event } from './pure.ts'
import type { Event } from './core.ts'
describe('inserting into a desc sorted list of events', () => {
test('insert into an empty list', async () => {

View File

@@ -1,4 +1,4 @@
import type { Event } from './pure.ts'
import type { Event } from './core.ts'
export const utf8Decoder = new TextDecoder('utf-8')
export const utf8Encoder = new TextEncoder()