mirror of
https://github.com/nbd-wtf/nostr-tools.git
synced 2025-12-08 16:28:49 +00:00
Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75df47421f | ||
|
|
1cfe705baf | ||
|
|
566437fe2e | ||
|
|
5d6c2b9e5d | ||
|
|
a43f2a708c | ||
|
|
f727058a3a | ||
|
|
1de54838d3 | ||
|
|
703c29a311 | ||
|
|
ddf1064da9 | ||
|
|
f719d99a11 | ||
|
|
6152238d65 | ||
|
|
9ac1b63994 | ||
|
|
1890c91ae3 | ||
|
|
7067b47cd4 | ||
|
|
397931f847 | ||
|
|
5d795c291f | ||
|
|
7adbd30799 | ||
|
|
83b6dd7ec3 | ||
|
|
d61cc6c9bf | ||
|
|
d7dad8e204 | ||
|
|
daaa2ef0a1 | ||
|
|
7f11c0c618 | ||
|
|
a4ae964ee6 | ||
|
|
1f7378ca49 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,3 +5,4 @@ package-lock.json
|
||||
.envrc
|
||||
lib
|
||||
test.html
|
||||
bench.js
|
||||
|
||||
37
README.md
37
README.md
@@ -43,9 +43,9 @@ let isGood = verifyEvent(event)
|
||||
### Interacting with a relay
|
||||
|
||||
```js
|
||||
import { relayConnect, finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools'
|
||||
import { Relay, finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools'
|
||||
|
||||
const relay = await relayConnect('wss://relay.example.com')
|
||||
const relay = await Relay.connect('wss://relay.example.com')
|
||||
console.log(`connected to ${relay.url}`)
|
||||
|
||||
// let's query for an event that exists
|
||||
@@ -210,6 +210,8 @@ Importing the entirety of `nostr-tools` may bloat your build, so you should prob
|
||||
|
||||
```js
|
||||
import { generateSecretKey, finalizeEvent, verifyEvent } from 'nostr-tools/pure'
|
||||
import { SimplePool } from 'nostr-tools/pool'
|
||||
import { Relay, Subscription } from 'nostr-tools/relay'
|
||||
import { matchFilter } from 'nostr-tools/filter'
|
||||
import { decode, nprofileEncode, neventEncode, npubEncode } from 'nostr-tools/nip19'
|
||||
// and so on and so forth
|
||||
@@ -230,7 +232,36 @@ initNostrWasm().then(setNostrWasm)
|
||||
// see https://www.npmjs.com/package/nostr-wasm for options
|
||||
```
|
||||
|
||||
This may be faster than the pure-JS [noble libraries](https://paulmillr.com/noble/) used by default and in `nostr-tools/pure`.
|
||||
If you're going to use `Relay` and `SimplePool` you must also import `nostr-tools/abstract-relay` and/or `nostr-tools/abstract-pool` instead of the defaults and then instantiate them by passing the `verifyEvent`:
|
||||
|
||||
```js
|
||||
import { setNostrWasm, verifyEvent } from 'nostr-tools/wasm'
|
||||
import { AbstractRelay } from 'nostr-tools/abstract-relay'
|
||||
import { AbstractSimplePool } from 'nostr-tools/abstract-pool'
|
||||
import { initNostrWasm } from 'nostr-wasm'
|
||||
|
||||
initNostrWasm().then(setNostrWasm)
|
||||
|
||||
const relay = AbstractRelay.connect('wss://relayable.org', { verifyEvent })
|
||||
const pool = new AbstractSimplePool({ verifyEvent })
|
||||
```
|
||||
|
||||
This may be faster than the pure-JS [noble libraries](https://paulmillr.com/noble/) used by default and in `nostr-tools/pure`. Benchmarks:
|
||||
|
||||
```
|
||||
benchmark time (avg) (min … max) p75 p99 p995
|
||||
------------------------------------------------- -----------------------------
|
||||
• relay read message and verify event (many events)
|
||||
------------------------------------------------- -----------------------------
|
||||
wasm 34.94 ms/iter (34.61 ms … 35.73 ms) 35.07 ms 35.73 ms 35.73 ms
|
||||
pure js 239.7 ms/iter (235.41 ms … 243.69 ms) 240.51 ms 243.69 ms 243.69 ms
|
||||
trusted 402.71 µs/iter (344.57 µs … 2.98 ms) 407.39 µs 745.62 µs 812.59 µs
|
||||
|
||||
summary for relay read message and verify event
|
||||
wasm
|
||||
86.77x slower than trusted
|
||||
6.86x faster than pure js
|
||||
```
|
||||
|
||||
### Using from the browser (if you don't want to use a bundler)
|
||||
|
||||
|
||||
190
abstract-pool.ts
Normal file
190
abstract-pool.ts
Normal file
@@ -0,0 +1,190 @@
|
||||
import { AbstractRelay as AbstractRelay, SubscriptionParams, Subscription } from './abstract-relay.ts'
|
||||
import { normalizeURL } from './utils.ts'
|
||||
|
||||
import type { Event, Nostr } from './core.ts'
|
||||
import { type Filter } from './filter.ts'
|
||||
import { alwaysTrue } from './helpers.ts'
|
||||
|
||||
export type SubCloser = { close: () => void }
|
||||
|
||||
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
|
||||
maxWait?: number
|
||||
onclose?: (reasons: string[]) => void
|
||||
id?: string
|
||||
}
|
||||
|
||||
export class AbstractSimplePool {
|
||||
private relays = new Map<string, AbstractRelay>()
|
||||
public seenOn = new Map<string, Set<AbstractRelay>>()
|
||||
public trackRelays: boolean = false
|
||||
|
||||
public verifyEvent: Nostr['verifyEvent']
|
||||
public trustedRelayURLs = new Set<string>()
|
||||
|
||||
constructor(opts: { verifyEvent: Nostr['verifyEvent'] }) {
|
||||
this.verifyEvent = opts.verifyEvent
|
||||
}
|
||||
|
||||
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
|
||||
url = normalizeURL(url)
|
||||
|
||||
let relay = this.relays.get(url)
|
||||
if (!relay) {
|
||||
relay = new AbstractRelay(url, {
|
||||
verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent,
|
||||
})
|
||||
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
|
||||
this.relays.set(url, relay)
|
||||
}
|
||||
await relay.connect()
|
||||
|
||||
return relay
|
||||
}
|
||||
|
||||
close(relays: string[]) {
|
||||
relays.map(normalizeURL).forEach(url => {
|
||||
this.relays.get(url)?.close()
|
||||
})
|
||||
}
|
||||
|
||||
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
|
||||
if (this.trackRelays) {
|
||||
params.receivedEvent = (relay: AbstractRelay, id: string) => {
|
||||
let set = this.seenOn.get(id)
|
||||
if (!set) {
|
||||
set = new Set()
|
||||
this.seenOn.set(id, set)
|
||||
}
|
||||
set.add(relay)
|
||||
}
|
||||
}
|
||||
|
||||
const _knownIds = new Set<string>()
|
||||
const subs: Subscription[] = []
|
||||
|
||||
// batch all EOSEs into a single
|
||||
const eosesReceived: boolean[] = []
|
||||
let handleEose = (i: number) => {
|
||||
eosesReceived[i] = true
|
||||
if (eosesReceived.filter(a => a).length === relays.length) {
|
||||
params.oneose?.()
|
||||
handleEose = () => {}
|
||||
}
|
||||
}
|
||||
// batch all closes into a single
|
||||
const closesReceived: string[] = []
|
||||
let handleClose = (i: number, reason: string) => {
|
||||
handleEose(i)
|
||||
closesReceived[i] = reason
|
||||
if (closesReceived.filter(a => a).length === relays.length) {
|
||||
params.onclose?.(closesReceived)
|
||||
handleClose = () => {}
|
||||
}
|
||||
}
|
||||
|
||||
const localAlreadyHaveEventHandler = (id: string) => {
|
||||
if (params.alreadyHaveEvent?.(id)) {
|
||||
return true
|
||||
}
|
||||
const have = _knownIds.has(id)
|
||||
_knownIds.add(id)
|
||||
return have
|
||||
}
|
||||
|
||||
// open a subscription in all given relays
|
||||
const allOpened = Promise.all(
|
||||
relays.map(normalizeURL).map(async (url, i, arr) => {
|
||||
if (arr.indexOf(url) !== i) {
|
||||
// duplicate
|
||||
handleClose(i, 'duplicate url')
|
||||
return
|
||||
}
|
||||
|
||||
let relay: AbstractRelay
|
||||
try {
|
||||
relay = await this.ensureRelay(url, {
|
||||
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
|
||||
})
|
||||
} catch (err) {
|
||||
handleClose(i, (err as any)?.message || String(err))
|
||||
return
|
||||
}
|
||||
|
||||
let subscription = relay.subscribe(filters, {
|
||||
...params,
|
||||
oneose: () => handleEose(i),
|
||||
onclose: reason => handleClose(i, reason),
|
||||
alreadyHaveEvent: localAlreadyHaveEventHandler,
|
||||
eoseTimeout: params.maxWait,
|
||||
})
|
||||
|
||||
subs.push(subscription)
|
||||
}),
|
||||
)
|
||||
|
||||
return {
|
||||
async close() {
|
||||
await allOpened
|
||||
subs.forEach(sub => {
|
||||
sub.close()
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
subscribeManyEose(
|
||||
relays: string[],
|
||||
filters: Filter[],
|
||||
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'maxWait'>,
|
||||
): SubCloser {
|
||||
const subcloser = this.subscribeMany(relays, filters, {
|
||||
...params,
|
||||
oneose() {
|
||||
subcloser.close()
|
||||
},
|
||||
})
|
||||
return subcloser
|
||||
}
|
||||
|
||||
async querySync(
|
||||
relays: string[],
|
||||
filter: Filter,
|
||||
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
|
||||
): Promise<Event[]> {
|
||||
return new Promise(async resolve => {
|
||||
const events: Event[] = []
|
||||
this.subscribeManyEose(relays, [filter], {
|
||||
...params,
|
||||
onevent(event: Event) {
|
||||
events.push(event)
|
||||
},
|
||||
onclose(_: string[]) {
|
||||
resolve(events)
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async get(
|
||||
relays: string[],
|
||||
filter: Filter,
|
||||
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
|
||||
): Promise<Event | null> {
|
||||
filter.limit = 1
|
||||
const events = await this.querySync(relays, filter, params)
|
||||
events.sort((a, b) => b.created_at - a.created_at)
|
||||
return events[0] || null
|
||||
}
|
||||
|
||||
publish(relays: string[], event: Event): Promise<string>[] {
|
||||
return relays.map(normalizeURL).map(async (url, i, arr) => {
|
||||
if (arr.indexOf(url) !== i) {
|
||||
// duplicate
|
||||
return Promise.reject('duplicate url')
|
||||
}
|
||||
|
||||
let r = await this.ensureRelay(url)
|
||||
return r.publish(event)
|
||||
})
|
||||
}
|
||||
}
|
||||
359
abstract-relay.ts
Normal file
359
abstract-relay.ts
Normal file
@@ -0,0 +1,359 @@
|
||||
/* global WebSocket */
|
||||
|
||||
import type { Event, EventTemplate, VerifiedEvent, Nostr } from './core.ts'
|
||||
import { matchFilters, type Filter } from './filter.ts'
|
||||
import { getHex64, getSubscriptionId } from './fakejson.ts'
|
||||
import { Queue, normalizeURL } from './utils.ts'
|
||||
import { makeAuthEvent } from './nip42.ts'
|
||||
import { yieldThread } from './helpers.ts'
|
||||
|
||||
export class AbstractRelay {
|
||||
public readonly url: string
|
||||
private _connected: boolean = false
|
||||
|
||||
public onclose: (() => void) | null = null
|
||||
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
|
||||
|
||||
public baseEoseTimeout: number = 4400
|
||||
public connectionTimeout: number = 4400
|
||||
public openSubs = new Map<string, Subscription>()
|
||||
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
private connectionPromise: Promise<void> | undefined
|
||||
private openCountRequests = new Map<string, CountResolver>()
|
||||
private openEventPublishes = new Map<string, EventPublishResolver>()
|
||||
private ws: WebSocket | undefined
|
||||
private incomingMessageQueue = new Queue<string>()
|
||||
private queueRunning = false
|
||||
private challenge: string | undefined
|
||||
private serial: number = 0
|
||||
private verifyEvent: Nostr['verifyEvent']
|
||||
|
||||
constructor(url: string, opts: { verifyEvent: Nostr['verifyEvent'] }) {
|
||||
this.url = normalizeURL(url)
|
||||
this.verifyEvent = opts.verifyEvent
|
||||
}
|
||||
|
||||
static async connect(url: string, opts: { verifyEvent: Nostr['verifyEvent'] }) {
|
||||
const relay = new AbstractRelay(url, opts)
|
||||
await relay.connect()
|
||||
return relay
|
||||
}
|
||||
|
||||
private closeAllSubscriptions(reason: string) {
|
||||
for (let [_, sub] of this.openSubs) {
|
||||
sub.close(reason)
|
||||
}
|
||||
this.openSubs.clear()
|
||||
|
||||
for (let [_, ep] of this.openEventPublishes) {
|
||||
ep.reject(new Error(reason))
|
||||
}
|
||||
this.openEventPublishes.clear()
|
||||
|
||||
for (let [_, cr] of this.openCountRequests) {
|
||||
cr.reject(new Error(reason))
|
||||
}
|
||||
this.openCountRequests.clear()
|
||||
}
|
||||
|
||||
public get connected(): boolean {
|
||||
return this._connected
|
||||
}
|
||||
|
||||
public async connect(): Promise<void> {
|
||||
if (this.connectionPromise) return this.connectionPromise
|
||||
|
||||
this.challenge = undefined
|
||||
this.connectionPromise = new Promise((resolve, reject) => {
|
||||
this.connectionTimeoutHandle = setTimeout(() => {
|
||||
reject('connection timed out')
|
||||
this.connectionPromise = undefined
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection timed out')
|
||||
}, this.connectionTimeout)
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(this.url)
|
||||
} catch (err) {
|
||||
reject(err)
|
||||
return
|
||||
}
|
||||
|
||||
this.ws.onopen = () => {
|
||||
clearTimeout(this.connectionTimeoutHandle)
|
||||
this._connected = true
|
||||
resolve()
|
||||
}
|
||||
|
||||
this.ws.onerror = ev => {
|
||||
reject((ev as any).message)
|
||||
if (this._connected) {
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection errored')
|
||||
this._connected = false
|
||||
}
|
||||
}
|
||||
|
||||
this.ws.onclose = async () => {
|
||||
this.connectionPromise = undefined
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection closed')
|
||||
this._connected = false
|
||||
}
|
||||
|
||||
this.ws.onmessage = this._onmessage.bind(this)
|
||||
})
|
||||
|
||||
return this.connectionPromise
|
||||
}
|
||||
|
||||
private async runQueue() {
|
||||
this.queueRunning = true
|
||||
while (true) {
|
||||
if (false === this.handleNext()) {
|
||||
break
|
||||
}
|
||||
await yieldThread()
|
||||
}
|
||||
this.queueRunning = false
|
||||
}
|
||||
|
||||
private handleNext(): undefined | false {
|
||||
const json = this.incomingMessageQueue.dequeue()
|
||||
if (!json) {
|
||||
return false
|
||||
}
|
||||
|
||||
const subid = getSubscriptionId(json)
|
||||
if (subid) {
|
||||
const so = this.openSubs.get(subid as string)
|
||||
if (!so) {
|
||||
// this is an EVENT message, but for a subscription we don't have, so just stop here
|
||||
return
|
||||
}
|
||||
|
||||
// this will be called only when this message is a EVENT message for a subscription we have
|
||||
// we do this before parsing the JSON to not have to do that for duplicate events
|
||||
// since JSON parsing is slow
|
||||
const id = getHex64(json, 'id')
|
||||
const alreadyHave = so.alreadyHaveEvent?.(id)
|
||||
|
||||
// notify any interested client that the relay has this event
|
||||
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
|
||||
so.receivedEvent?.(this, id)
|
||||
|
||||
if (alreadyHave) {
|
||||
// if we had already seen this event we can just stop here
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
let data = JSON.parse(json)
|
||||
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
|
||||
// will naturally be caught by the encompassing try..catch block
|
||||
|
||||
switch (data[0]) {
|
||||
case 'EVENT': {
|
||||
const so = this.openSubs.get(data[1] as string) as Subscription
|
||||
const event = data[2] as Event
|
||||
if (this.verifyEvent(event) && matchFilters(so.filters, event)) {
|
||||
so.onevent(event)
|
||||
}
|
||||
return
|
||||
}
|
||||
case 'COUNT': {
|
||||
const id: string = data[1]
|
||||
const payload = data[2] as { count: number }
|
||||
const cr = this.openCountRequests.get(id) as CountResolver
|
||||
if (cr) {
|
||||
cr.resolve(payload.count)
|
||||
this.openCountRequests.delete(id)
|
||||
}
|
||||
return
|
||||
}
|
||||
case 'EOSE': {
|
||||
const so = this.openSubs.get(data[1] as string)
|
||||
if (!so) return
|
||||
so.receivedEose()
|
||||
return
|
||||
}
|
||||
case 'OK': {
|
||||
const id: string = data[1]
|
||||
const ok: boolean = data[2]
|
||||
const reason: string = data[3]
|
||||
const ep = this.openEventPublishes.get(id) as EventPublishResolver
|
||||
if (ok) ep.resolve(reason)
|
||||
else ep.reject(new Error(reason))
|
||||
this.openEventPublishes.delete(id)
|
||||
return
|
||||
}
|
||||
case 'CLOSED': {
|
||||
const id: string = data[1]
|
||||
const so = this.openSubs.get(id)
|
||||
if (!so) return
|
||||
so.closed = true
|
||||
so.close(data[2] as string)
|
||||
return
|
||||
}
|
||||
case 'NOTICE':
|
||||
this.onnotice(data[1] as string)
|
||||
return
|
||||
case 'AUTH': {
|
||||
this.challenge = data[1] as string
|
||||
return
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
public async send(message: string) {
|
||||
if (!this.connectionPromise) throw new Error('sending on closed connection')
|
||||
|
||||
this.connectionPromise.then(() => {
|
||||
this.ws?.send(message)
|
||||
})
|
||||
}
|
||||
|
||||
public async auth(signAuthEvent: (evt: EventTemplate) => Promise<VerifiedEvent>) {
|
||||
if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
|
||||
const evt = await signAuthEvent(makeAuthEvent(this.url, this.challenge))
|
||||
const ret = new Promise<string>((resolve, reject) => {
|
||||
this.openEventPublishes.set(evt.id, { resolve, reject })
|
||||
})
|
||||
this.send('["AUTH",' + JSON.stringify(evt) + ']')
|
||||
return ret
|
||||
}
|
||||
|
||||
public async publish(event: Event): Promise<string> {
|
||||
const ret = new Promise<string>((resolve, reject) => {
|
||||
this.openEventPublishes.set(event.id, { resolve, reject })
|
||||
})
|
||||
this.send('["EVENT",' + JSON.stringify(event) + ']')
|
||||
return ret
|
||||
}
|
||||
|
||||
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
|
||||
this.serial++
|
||||
const id = params?.id || 'count:' + this.serial
|
||||
const ret = new Promise<number>((resolve, reject) => {
|
||||
this.openCountRequests.set(id, { resolve, reject })
|
||||
})
|
||||
this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']')
|
||||
return ret
|
||||
}
|
||||
|
||||
public subscribe(filters: Filter[], params: Partial<SubscriptionParams>): Subscription {
|
||||
const subscription = this.prepareSubscription(filters, params)
|
||||
subscription.fire()
|
||||
return subscription
|
||||
}
|
||||
|
||||
public prepareSubscription(filters: Filter[], params: Partial<SubscriptionParams> & { id?: string }): Subscription {
|
||||
this.serial++
|
||||
const id = params.id || 'sub:' + this.serial
|
||||
const subscription = new Subscription(this, id, filters, params)
|
||||
this.openSubs.set(id, subscription)
|
||||
return subscription
|
||||
}
|
||||
|
||||
public close() {
|
||||
this.closeAllSubscriptions('relay connection closed by us')
|
||||
this._connected = false
|
||||
this.ws?.close()
|
||||
}
|
||||
|
||||
// this is the function assigned to this.ws.onmessage
|
||||
// it's exposed for testing and debugging purposes
|
||||
public _onmessage(ev: MessageEvent<any>) {
|
||||
this.incomingMessageQueue.enqueue(ev.data as string)
|
||||
if (!this.queueRunning) {
|
||||
this.runQueue()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class Subscription {
|
||||
public readonly relay: AbstractRelay
|
||||
public readonly id: string
|
||||
|
||||
public closed: boolean = false
|
||||
public eosed: boolean = false
|
||||
public filters: Filter[]
|
||||
public alreadyHaveEvent: ((id: string) => boolean) | undefined
|
||||
public receivedEvent: ((relay: AbstractRelay, id: string) => void) | undefined
|
||||
|
||||
public onevent: (evt: Event) => void
|
||||
public oneose: (() => void) | undefined
|
||||
public onclose: ((reason: string) => void) | undefined
|
||||
|
||||
public eoseTimeout: number
|
||||
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
constructor(relay: AbstractRelay, id: string, filters: Filter[], params: SubscriptionParams) {
|
||||
this.relay = relay
|
||||
this.filters = filters
|
||||
this.id = id
|
||||
this.alreadyHaveEvent = params.alreadyHaveEvent
|
||||
this.receivedEvent = params.receivedEvent
|
||||
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout
|
||||
|
||||
this.oneose = params.oneose
|
||||
this.onclose = params.onclose
|
||||
this.onevent =
|
||||
params.onevent ||
|
||||
(event => {
|
||||
console.warn(
|
||||
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
|
||||
event,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
public fire() {
|
||||
this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1))
|
||||
|
||||
// only now we start counting the eoseTimeout
|
||||
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout)
|
||||
}
|
||||
|
||||
public receivedEose() {
|
||||
if (this.eosed) return
|
||||
clearTimeout(this.eoseTimeoutHandle)
|
||||
this.eosed = true
|
||||
this.oneose?.()
|
||||
}
|
||||
|
||||
public close(reason: string = 'closed by caller') {
|
||||
if (!this.closed) {
|
||||
// if the connection was closed by the user calling .close() we will send a CLOSE message
|
||||
// otherwise this._open will be already set to false so we will skip this
|
||||
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
|
||||
this.closed = true
|
||||
}
|
||||
this.relay.openSubs.delete(this.id)
|
||||
this.onclose?.(reason)
|
||||
}
|
||||
}
|
||||
|
||||
export type SubscriptionParams = {
|
||||
onevent?: (evt: Event) => void
|
||||
oneose?: () => void
|
||||
onclose?: (reason: string) => void
|
||||
alreadyHaveEvent?: (id: string) => boolean
|
||||
receivedEvent?: (relay: AbstractRelay, id: string) => void
|
||||
eoseTimeout?: number
|
||||
}
|
||||
|
||||
export type CountResolver = {
|
||||
resolve: (count: number) => void
|
||||
reject: (err: Error) => void
|
||||
}
|
||||
|
||||
export type EventPublishResolver = {
|
||||
resolve: (reason: string) => void
|
||||
reject: (err: Error) => void
|
||||
}
|
||||
61
benchmarks.ts
Normal file
61
benchmarks.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { run, bench, group, baseline } from 'mitata'
|
||||
import { initNostrWasm } from 'nostr-wasm'
|
||||
import { NostrEvent } from './core'
|
||||
import { finalizeEvent, generateSecretKey } from './pure'
|
||||
import { setNostrWasm, verifyEvent } from './wasm'
|
||||
import { AbstractRelay } from './abstract-relay.ts'
|
||||
import { Relay as PureRelay } from './relay.ts'
|
||||
import { alwaysTrue } from './helpers.ts'
|
||||
|
||||
// benchmarking relay reads with verifyEvent
|
||||
const EVENTS = 200
|
||||
let messages: string[] = []
|
||||
let baseContent = ''
|
||||
for (let i = 0; i < EVENTS; i++) {
|
||||
baseContent += 'a'
|
||||
}
|
||||
const secretKey = generateSecretKey()
|
||||
for (let i = 0; i < EVENTS; i++) {
|
||||
const tags = []
|
||||
for (let t = 0; t < i; t++) {
|
||||
tags.push(['t', 'nada'])
|
||||
}
|
||||
const event = { created_at: Math.round(Date.now()) / 1000, kind: 1, content: baseContent.slice(0, EVENTS - i), tags }
|
||||
const signed = finalizeEvent(event, secretKey)
|
||||
messages.push(JSON.stringify(['EVENT', '_', signed]))
|
||||
}
|
||||
|
||||
setNostrWasm(await initNostrWasm())
|
||||
|
||||
const pureRelay = new PureRelay('wss://pure.com/')
|
||||
const trustedRelay = new AbstractRelay('wss://trusted.com/', { verifyEvent: alwaysTrue })
|
||||
const wasmRelay = new AbstractRelay('wss://wasm.com/', { verifyEvent })
|
||||
|
||||
const runWith = (relay: AbstractRelay) => async () => {
|
||||
return new Promise<void>(resolve => {
|
||||
let received = 0
|
||||
let sub = relay.prepareSubscription([{}], {
|
||||
id: '_',
|
||||
onevent(_: NostrEvent) {
|
||||
received++
|
||||
if (received === messages.length - 1) {
|
||||
resolve()
|
||||
sub.closed = true
|
||||
sub.close()
|
||||
}
|
||||
},
|
||||
})
|
||||
for (let e = 0; e < messages.length; e++) {
|
||||
relay._onmessage({ data: messages[e] } as any)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
group(`relay read ${EVENTS} messages and verify its events`, () => {
|
||||
baseline('wasm', runWith(wasmRelay))
|
||||
bench('pure js', runWith(pureRelay))
|
||||
bench('trusted', runWith(trustedRelay))
|
||||
})
|
||||
|
||||
// actually running the thing
|
||||
await run()
|
||||
15
build.js
15
build.js
@@ -10,6 +10,7 @@ const entryPoints = fs
|
||||
file !== 'core.ts' &&
|
||||
file !== 'test-helpers.ts' &&
|
||||
file !== 'helpers.ts' &&
|
||||
file !== 'benchmarks.ts' &&
|
||||
!file.endsWith('.test.ts') &&
|
||||
fs.statSync(join(process.cwd(), file)).isFile(),
|
||||
)
|
||||
@@ -27,12 +28,7 @@ esbuild
|
||||
format: 'esm',
|
||||
packages: 'external',
|
||||
})
|
||||
.then(() => {
|
||||
const packageJson = JSON.stringify({ type: 'module' })
|
||||
fs.writeFileSync(`${__dirname}/lib/esm/package.json`, packageJson, 'utf8')
|
||||
|
||||
console.log('esm build success.')
|
||||
})
|
||||
.then(() => console.log('esm build success.'))
|
||||
|
||||
esbuild
|
||||
.build({
|
||||
@@ -41,7 +37,12 @@ esbuild
|
||||
format: 'cjs',
|
||||
packages: 'external',
|
||||
})
|
||||
.then(() => console.log('cjs build success.'))
|
||||
.then(() => {
|
||||
const packageJson = JSON.stringify({ type: 'commonjs' })
|
||||
fs.writeFileSync(`${__dirname}/lib/cjs/package.json`, packageJson, 'utf8')
|
||||
|
||||
console.log('cjs build success.')
|
||||
})
|
||||
|
||||
esbuild
|
||||
.build({
|
||||
|
||||
1
core.ts
1
core.ts
@@ -19,6 +19,7 @@ export interface Event {
|
||||
[verifiedSymbol]?: boolean
|
||||
}
|
||||
|
||||
export type NostrEvent = Event
|
||||
export type EventTemplate = Pick<Event, 'kind' | 'tags' | 'content' | 'created_at'>
|
||||
export type UnsignedEvent = Pick<Event, 'kind' | 'tags' | 'content' | 'created_at' | 'pubkey'>
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Event } from './pure.ts'
|
||||
import { Event } from './core.ts'
|
||||
|
||||
export type Filter = {
|
||||
ids?: string[]
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { verifiedSymbol, type Event, type Nostr, VerifiedEvent } from './core.ts'
|
||||
|
||||
export async function yieldThread() {
|
||||
return new Promise(resolve => {
|
||||
const ch = new MessageChannel()
|
||||
@@ -7,3 +9,8 @@ export async function yieldThread() {
|
||||
ch.port1.start()
|
||||
})
|
||||
}
|
||||
|
||||
export const alwaysTrue: Nostr['verifyEvent'] = (t: Event): t is VerifiedEvent => {
|
||||
t[verifiedSymbol] = true
|
||||
return true
|
||||
}
|
||||
|
||||
1
index.ts
1
index.ts
@@ -1,6 +1,5 @@
|
||||
export * from './pure.ts'
|
||||
export * from './relay.ts'
|
||||
export * from './pure.ts'
|
||||
export * from './filter.ts'
|
||||
export * from './pool.ts'
|
||||
export * from './references.ts'
|
||||
|
||||
27
justfile
27
justfile
@@ -1,25 +1,32 @@
|
||||
export PATH := "./node_modules/.bin:" + env_var('PATH')
|
||||
|
||||
build:
|
||||
rm -rf lib
|
||||
bun run build.js
|
||||
rm -rf lib
|
||||
bun run build.js
|
||||
|
||||
test:
|
||||
bun test --timeout 20000
|
||||
bun test --timeout 20000
|
||||
|
||||
test-only file:
|
||||
bun test {{file}}
|
||||
bun test {{file}}
|
||||
|
||||
emit-types:
|
||||
tsc # see tsconfig.json
|
||||
tsc # see tsconfig.json
|
||||
|
||||
publish: build emit-types
|
||||
npm publish
|
||||
npm publish
|
||||
|
||||
format:
|
||||
eslint --ext .ts --fix *.ts
|
||||
prettier --write *.ts
|
||||
eslint --ext .ts --fix *.ts
|
||||
prettier --write *.ts
|
||||
|
||||
lint:
|
||||
eslint --ext .ts *.ts
|
||||
prettier --check *.ts
|
||||
eslint --ext .ts *.ts
|
||||
prettier --check *.ts
|
||||
|
||||
benchmark:
|
||||
bun build --target=node --outfile=bench.js benchmarks.ts
|
||||
timeout 60s deno run --allow-read bench.js || true
|
||||
timeout 60s node bench.js || true
|
||||
timeout 60s bun run benchmarks.ts || true
|
||||
rm bench.js
|
||||
|
||||
2
nip10.ts
2
nip10.ts
@@ -1,4 +1,4 @@
|
||||
import type { Event } from './pure.ts'
|
||||
import type { Event } from './core.ts'
|
||||
import type { EventPointer, ProfilePointer } from './nip19.ts'
|
||||
|
||||
export type NIP10Result = {
|
||||
|
||||
@@ -78,13 +78,13 @@ test('encode and decode naddr', () => {
|
||||
test('encode and decode nevent', () => {
|
||||
let pk = getPublicKey(generateSecretKey())
|
||||
let relays = ['wss://relay.nostr.example.mydomain.example.com', 'wss://nostr.banana.com']
|
||||
let naddr = neventEncode({
|
||||
let nevent = neventEncode({
|
||||
id: pk,
|
||||
relays,
|
||||
kind: 30023,
|
||||
})
|
||||
expect(naddr).toMatch(/nevent1\w+/)
|
||||
let { type, data } = decode(naddr)
|
||||
expect(nevent).toMatch(/nevent1\w+/)
|
||||
let { type, data } = decode(nevent)
|
||||
expect(type).toEqual('nevent')
|
||||
const pointer = data as EventPointer
|
||||
expect(pointer.id).toEqual(pk)
|
||||
@@ -95,13 +95,13 @@ test('encode and decode nevent', () => {
|
||||
test('encode and decode nevent with kind 0', () => {
|
||||
let pk = getPublicKey(generateSecretKey())
|
||||
let relays = ['wss://relay.nostr.example.mydomain.example.com', 'wss://nostr.banana.com']
|
||||
let naddr = neventEncode({
|
||||
let nevent = neventEncode({
|
||||
id: pk,
|
||||
relays,
|
||||
kind: 0,
|
||||
})
|
||||
expect(naddr).toMatch(/nevent1\w+/)
|
||||
let { type, data } = decode(naddr)
|
||||
expect(nevent).toMatch(/nevent1\w+/)
|
||||
let { type, data } = decode(nevent)
|
||||
expect(type).toEqual('nevent')
|
||||
const pointer = data as EventPointer
|
||||
expect(pointer.id).toEqual(pk)
|
||||
@@ -109,6 +109,25 @@ test('encode and decode nevent with kind 0', () => {
|
||||
expect(pointer.kind).toEqual(0)
|
||||
})
|
||||
|
||||
test('encode and decode naddr with empty "d"', () => {
|
||||
let pk = getPublicKey(generateSecretKey())
|
||||
let relays = ['wss://relay.nostr.example.mydomain.example.com', 'wss://nostr.banana.com']
|
||||
let naddr = naddrEncode({
|
||||
identifier: '',
|
||||
pubkey: pk,
|
||||
relays,
|
||||
kind: 3,
|
||||
})
|
||||
expect(naddr).toMatch(/naddr\w+/)
|
||||
let { type, data } = decode(naddr)
|
||||
expect(type).toEqual('naddr')
|
||||
const pointer = data as AddressPointer
|
||||
expect(pointer.identifier).toEqual('')
|
||||
expect(pointer.relays).toContain(relays[0])
|
||||
expect(pointer.kind).toEqual(3)
|
||||
expect(pointer.pubkey).toEqual(pk)
|
||||
})
|
||||
|
||||
test('decode naddr from habla.news', () => {
|
||||
let { type, data } = decode(
|
||||
'naddr1qq98yetxv4ex2mnrv4esygrl54h466tz4v0re4pyuavvxqptsejl0vxcmnhfl60z3rth2xkpjspsgqqqw4rsf34vl5',
|
||||
|
||||
19
nip19.ts
19
nip19.ts
@@ -149,7 +149,6 @@ function parseTLV(data: Uint8Array): TLV {
|
||||
while (rest.length > 0) {
|
||||
let t = rest[0]
|
||||
let l = rest[1]
|
||||
if (!l) throw new Error(`malformed TLV ${t}`)
|
||||
let v = rest.slice(2, 2 + l)
|
||||
rest = rest.slice(2 + l)
|
||||
if (v.length < l) throw new Error(`not enough data to read on TLV ${t}`)
|
||||
@@ -227,15 +226,17 @@ export function nrelayEncode(url: string): `nrelay1${string}` {
|
||||
function encodeTLV(tlv: TLV): Uint8Array {
|
||||
let entries: Uint8Array[] = []
|
||||
|
||||
Object.entries(tlv).forEach(([t, vs]) => {
|
||||
vs.forEach(v => {
|
||||
let entry = new Uint8Array(v.length + 2)
|
||||
entry.set([parseInt(t)], 0)
|
||||
entry.set([v.length], 1)
|
||||
entry.set(v, 2)
|
||||
entries.push(entry)
|
||||
Object.entries(tlv)
|
||||
.reverse()
|
||||
.forEach(([t, vs]) => {
|
||||
vs.forEach(v => {
|
||||
let entry = new Uint8Array(v.length + 2)
|
||||
entry.set([parseInt(t)], 0)
|
||||
entry.set([v.length], 1)
|
||||
entry.set(v, 2)
|
||||
entries.push(entry)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
return concatBytes(...entries)
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { test, expect } from 'bun:test'
|
||||
|
||||
import { makeAuthEvent } from './nip42.ts'
|
||||
import { relayConnect } from './relay.ts'
|
||||
import { Relay } from './relay.ts'
|
||||
|
||||
test('auth flow', async () => {
|
||||
const relay = await relayConnect('wss://nostr.wine')
|
||||
const relay = await Relay.connect('wss://nostr.wine')
|
||||
|
||||
const auth = makeAuthEvent(relay.url, 'chachacha')
|
||||
expect(auth.tags).toHaveLength(2)
|
||||
|
||||
2
nip42.ts
2
nip42.ts
@@ -1,4 +1,4 @@
|
||||
import { EventTemplate } from './pure.ts'
|
||||
import { EventTemplate } from './core.ts'
|
||||
import { ClientAuth } from './kinds.ts'
|
||||
|
||||
/**
|
||||
|
||||
16
package.json
16
package.json
@@ -1,6 +1,7 @@
|
||||
{
|
||||
"type": "module",
|
||||
"name": "nostr-tools",
|
||||
"version": "2.0.3",
|
||||
"version": "2.1.1",
|
||||
"description": "Tools for making a Nostr client.",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@@ -39,11 +40,21 @@
|
||||
"require": "./lib/cjs/filter.js",
|
||||
"types": "./lib/types/filter.d.ts"
|
||||
},
|
||||
"./abstract-relay": {
|
||||
"import": "./lib/esm/abstract-relay.js",
|
||||
"require": "./lib/cjs/abstract-relay.js",
|
||||
"types": "./lib/types/abstract-relay.d.ts"
|
||||
},
|
||||
"./relay": {
|
||||
"import": "./lib/esm/relay.js",
|
||||
"require": "./lib/cjs/relay.js",
|
||||
"types": "./lib/types/relay.d.ts"
|
||||
},
|
||||
"./abstract-pool": {
|
||||
"import": "./lib/esm/abstract-pool.js",
|
||||
"require": "./lib/cjs/abstract-pool.js",
|
||||
"types": "./lib/types/abstract-pool.d.ts"
|
||||
},
|
||||
"./pool": {
|
||||
"import": "./lib/esm/pool.js",
|
||||
"require": "./lib/cjs/pool.js",
|
||||
@@ -163,7 +174,8 @@
|
||||
"@scure/base": "1.1.1",
|
||||
"@scure/bip32": "1.3.1",
|
||||
"@scure/bip39": "1.2.1",
|
||||
"nostr-wasm": "v0.0.3"
|
||||
"mitata": "^0.1.6",
|
||||
"nostr-wasm": "v0.1.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"typescript": ">=5.0.0"
|
||||
|
||||
21
pool.test.ts
21
pool.test.ts
@@ -12,7 +12,7 @@ afterAll(() => {
|
||||
pool.close([...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'])
|
||||
})
|
||||
|
||||
test('removing duplicates when querying', async () => {
|
||||
test('removing duplicates when subscribing', async () => {
|
||||
let priv = generateSecretKey()
|
||||
let pub = getPublicKey(priv)
|
||||
|
||||
@@ -43,7 +43,7 @@ test('removing duplicates when querying', async () => {
|
||||
expect(received[0]).toEqual(event)
|
||||
})
|
||||
|
||||
test('same with double querying', async () => {
|
||||
test('same with double subs', async () => {
|
||||
let priv = generateSecretKey()
|
||||
let pub = getPublicKey(priv)
|
||||
|
||||
@@ -76,6 +76,23 @@ test('same with double querying', async () => {
|
||||
expect(received).toHaveLength(2)
|
||||
})
|
||||
|
||||
test('query a bunch of events and cancel on eose', async () => {
|
||||
let events = new Set<string>()
|
||||
await new Promise<void>(resolve => {
|
||||
pool.subscribeManyEose(
|
||||
[...relays, 'wss://relayable.org', 'wss://relay.noswhere.com', 'wss://nothing.com'],
|
||||
[{ kinds: [0, 1], limit: 40 }],
|
||||
{
|
||||
onevent(event) {
|
||||
events.add(event.id)
|
||||
},
|
||||
onclose: resolve as any,
|
||||
},
|
||||
)
|
||||
})
|
||||
expect(events.size).toBeGreaterThan(50)
|
||||
})
|
||||
|
||||
test('querySync()', async () => {
|
||||
let events = await pool.querySync([...relays.slice(2), 'wss://offchain.pub', 'wss://eden.nostr.land'], {
|
||||
authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'],
|
||||
|
||||
187
pool.ts
187
pool.ts
@@ -1,183 +1,10 @@
|
||||
import { Relay, SubscriptionParams, Subscription } from './relay.ts'
|
||||
import { normalizeURL } from './utils.ts'
|
||||
import { verifyEvent } from './pure.ts'
|
||||
import { AbstractSimplePool } from './abstract-pool.ts'
|
||||
|
||||
import type { Event } from './pure.ts'
|
||||
import { type Filter } from './filter.ts'
|
||||
|
||||
export type SubCloser = { close: () => void }
|
||||
|
||||
export type SubscribeManyParams = Omit<SubscriptionParams, 'onclose' | 'id'> & {
|
||||
maxWait?: number
|
||||
onclose?: (reasons: string[]) => void
|
||||
id?: string
|
||||
}
|
||||
|
||||
export class SimplePool {
|
||||
private relays = new Map<string, Relay>()
|
||||
public seenOn = new Map<string, Set<Relay>>()
|
||||
public trackRelays: boolean = false
|
||||
|
||||
public trustedRelayURLs = new Set<string>()
|
||||
|
||||
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<Relay> {
|
||||
url = normalizeURL(url)
|
||||
|
||||
let relay = this.relays.get(url)
|
||||
if (!relay) {
|
||||
relay = new Relay(url)
|
||||
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
|
||||
if (this.trustedRelayURLs.has(relay.url)) relay.trusted = true
|
||||
this.relays.set(url, relay)
|
||||
}
|
||||
await relay.connect()
|
||||
|
||||
return relay
|
||||
}
|
||||
|
||||
close(relays: string[]) {
|
||||
relays.map(normalizeURL).forEach(url => {
|
||||
this.relays.get(url)?.close()
|
||||
})
|
||||
}
|
||||
|
||||
subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser {
|
||||
if (this.trackRelays) {
|
||||
params.receivedEvent = (relay: Relay, id: string) => {
|
||||
let set = this.seenOn.get(id)
|
||||
if (!set) {
|
||||
set = new Set()
|
||||
this.seenOn.set(id, set)
|
||||
}
|
||||
set.add(relay)
|
||||
}
|
||||
}
|
||||
|
||||
const _knownIds = new Set<string>()
|
||||
const subs: Subscription[] = []
|
||||
|
||||
// batch all EOSEs into a single
|
||||
const eosesReceived: boolean[] = []
|
||||
let handleEose = (i: number) => {
|
||||
eosesReceived[i] = true
|
||||
if (eosesReceived.filter(a => a).length === relays.length) {
|
||||
params.oneose?.()
|
||||
handleEose = () => {}
|
||||
}
|
||||
}
|
||||
// batch all closes into a single
|
||||
const closesReceived: string[] = []
|
||||
let handleClose = (i: number, reason: string) => {
|
||||
handleEose(i)
|
||||
closesReceived[i] = reason
|
||||
if (closesReceived.filter(a => a).length === relays.length) {
|
||||
params.onclose?.(closesReceived)
|
||||
handleClose = () => {}
|
||||
}
|
||||
}
|
||||
|
||||
const localAlreadyHaveEventHandler = (id: string) => {
|
||||
if (params.alreadyHaveEvent?.(id)) {
|
||||
return true
|
||||
}
|
||||
const have = _knownIds.has(id)
|
||||
_knownIds.add(id)
|
||||
return have
|
||||
}
|
||||
|
||||
// open a subscription in all given relays
|
||||
const allOpened = Promise.all(
|
||||
relays.map(normalizeURL).map(async (url, i, arr) => {
|
||||
if (arr.indexOf(url) !== i) {
|
||||
// duplicate
|
||||
handleClose(i, 'duplicate url')
|
||||
return
|
||||
}
|
||||
|
||||
let relay: Relay
|
||||
try {
|
||||
relay = await this.ensureRelay(url, {
|
||||
connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined,
|
||||
})
|
||||
} catch (err) {
|
||||
handleClose(i, (err as any)?.message || String(err))
|
||||
return
|
||||
}
|
||||
|
||||
let subscription = relay.subscribe(filters, {
|
||||
...params,
|
||||
oneose: () => handleEose(i),
|
||||
onclose: reason => handleClose(i, reason),
|
||||
alreadyHaveEvent: localAlreadyHaveEventHandler,
|
||||
eoseTimeout: params.maxWait,
|
||||
})
|
||||
|
||||
subs.push(subscription)
|
||||
}),
|
||||
)
|
||||
|
||||
return {
|
||||
async close() {
|
||||
await allOpened
|
||||
subs.forEach(sub => {
|
||||
sub.close()
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
subscribeManyEose(
|
||||
relays: string[],
|
||||
filters: Filter[],
|
||||
params: Pick<SubscribeManyParams, 'id' | 'onevent' | 'onclose' | 'maxWait'>,
|
||||
): SubCloser {
|
||||
const subcloser = this.subscribeMany(relays, filters, {
|
||||
...params,
|
||||
oneose() {
|
||||
subcloser.close()
|
||||
},
|
||||
})
|
||||
return subcloser
|
||||
}
|
||||
|
||||
async querySync(
|
||||
relays: string[],
|
||||
filter: Filter,
|
||||
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
|
||||
): Promise<Event[]> {
|
||||
return new Promise(async resolve => {
|
||||
const events: Event[] = []
|
||||
this.subscribeManyEose(relays, [filter], {
|
||||
...params,
|
||||
onevent(event: Event) {
|
||||
events.push(event)
|
||||
},
|
||||
onclose(_: string[]) {
|
||||
resolve(events)
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async get(
|
||||
relays: string[],
|
||||
filter: Filter,
|
||||
params?: Pick<SubscribeManyParams, 'id' | 'maxWait'>,
|
||||
): Promise<Event | null> {
|
||||
filter.limit = 1
|
||||
const events = await this.querySync(relays, filter, params)
|
||||
events.sort((a, b) => b.created_at - a.created_at)
|
||||
return events[0] || null
|
||||
}
|
||||
|
||||
publish(relays: string[], event: Event): Promise<string>[] {
|
||||
return relays.map(normalizeURL).map(async (url, i, arr) => {
|
||||
if (arr.indexOf(url) !== i) {
|
||||
// duplicate
|
||||
return Promise.reject('duplicate url')
|
||||
}
|
||||
|
||||
let r = await this.ensureRelay(url)
|
||||
return r.publish(event)
|
||||
})
|
||||
export class SimplePool extends AbstractSimplePool {
|
||||
constructor() {
|
||||
super({ verifyEvent })
|
||||
}
|
||||
}
|
||||
|
||||
export * from './abstract-pool.ts'
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { decode, type AddressPointer, type ProfilePointer, type EventPointer } from './nip19.ts'
|
||||
|
||||
import type { Event } from './pure.ts'
|
||||
import type { Event } from './core.ts'
|
||||
|
||||
type Reference = {
|
||||
text: string
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { afterEach, expect, test } from 'bun:test'
|
||||
|
||||
import { finalizeEvent, generateSecretKey, getPublicKey } from './pure.ts'
|
||||
import { Relay, relayConnect } from './relay.ts'
|
||||
import { NostrEvent, finalizeEvent, generateSecretKey, getPublicKey } from './pure.ts'
|
||||
import { Relay } from './relay.ts'
|
||||
|
||||
let relay = new Relay('wss://public.relaying.io')
|
||||
let relay = new Relay('wss://relay.nostr.bg')
|
||||
|
||||
afterEach(() => {
|
||||
relay.close()
|
||||
@@ -14,48 +14,46 @@ test('connectivity', async () => {
|
||||
expect(relay.connected).toBeTrue()
|
||||
})
|
||||
|
||||
test('connectivity, with relayConnect()', async () => {
|
||||
const relay = await relayConnect('wss://public.relaying.io')
|
||||
test('connectivity, with Relay.connect()', async () => {
|
||||
const relay = await Relay.connect('wss://public.relaying.io')
|
||||
expect(relay.connected).toBeTrue()
|
||||
relay.close()
|
||||
})
|
||||
|
||||
test('querying', async () => {
|
||||
await relay.connect()
|
||||
|
||||
let resolve1: () => void
|
||||
let resolve2: () => void
|
||||
let resolveEvent: () => void
|
||||
let resolveEose: () => void
|
||||
|
||||
let waiting = Promise.all([
|
||||
new Promise<void>(resolve => {
|
||||
resolve1 = resolve
|
||||
}),
|
||||
new Promise<void>(resolve => {
|
||||
resolve2 = resolve
|
||||
}),
|
||||
])
|
||||
const evented = new Promise<void>(resolve => {
|
||||
resolveEvent = resolve
|
||||
})
|
||||
const eosed = new Promise<void>(resolve => {
|
||||
resolveEose = resolve
|
||||
})
|
||||
|
||||
relay.subscribe(
|
||||
[
|
||||
{
|
||||
ids: ['3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4'],
|
||||
authors: ['9bbe185a20f50607b6e021c68a2c7275649770d3f8277c120d2b801a2b9a64fc'],
|
||||
kinds: [0],
|
||||
},
|
||||
],
|
||||
{
|
||||
onevent(event) {
|
||||
expect(event).toHaveProperty('id', '3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4')
|
||||
expect(event).toHaveProperty('content', '+')
|
||||
expect(event).toHaveProperty('kind', 7)
|
||||
resolve1()
|
||||
expect(event).toHaveProperty('pubkey', '9bbe185a20f50607b6e021c68a2c7275649770d3f8277c120d2b801a2b9a64fc')
|
||||
expect(event).toHaveProperty('kind', 0)
|
||||
resolveEvent()
|
||||
},
|
||||
oneose() {
|
||||
resolve2()
|
||||
resolveEose()
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
let [t1, t2] = await waiting
|
||||
expect(t1).toBeUndefined()
|
||||
expect(t2).toBeUndefined()
|
||||
await eosed
|
||||
await evented
|
||||
}, 10000)
|
||||
|
||||
test('listening and publishing and closing', async () => {
|
||||
@@ -63,17 +61,20 @@ test('listening and publishing and closing', async () => {
|
||||
|
||||
let sk = generateSecretKey()
|
||||
let pk = getPublicKey(sk)
|
||||
var resolve1: (_: void) => void
|
||||
var resolve2: (_: void) => void
|
||||
let resolveEose: (_: void) => void
|
||||
let resolveEvent: (_: void) => void
|
||||
let resolveClose: (_: void) => void
|
||||
let eventReceived: NostrEvent | undefined
|
||||
|
||||
let waiting = Promise.all([
|
||||
new Promise(resolve => {
|
||||
resolve1 = resolve
|
||||
}),
|
||||
new Promise(resolve => {
|
||||
resolve2 = resolve
|
||||
}),
|
||||
])
|
||||
const eosed = new Promise(resolve => {
|
||||
resolveEose = resolve
|
||||
})
|
||||
const evented = new Promise(resolve => {
|
||||
resolveEvent = resolve
|
||||
})
|
||||
const closed = new Promise(resolve => {
|
||||
resolveClose = resolve
|
||||
})
|
||||
|
||||
let sub = relay.subscribe(
|
||||
[
|
||||
@@ -84,17 +85,20 @@ test('listening and publishing and closing', async () => {
|
||||
],
|
||||
{
|
||||
onevent(event) {
|
||||
expect(event).toHaveProperty('pubkey', pk)
|
||||
expect(event).toHaveProperty('kind', 23571)
|
||||
expect(event).toHaveProperty('content', 'nostr-tools test suite')
|
||||
resolve1()
|
||||
eventReceived = event
|
||||
resolveEvent()
|
||||
},
|
||||
oneose() {
|
||||
resolveEose()
|
||||
},
|
||||
onclose() {
|
||||
resolve2()
|
||||
resolveClose()
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
await eosed
|
||||
|
||||
let event = finalizeEvent(
|
||||
{
|
||||
kind: 23571,
|
||||
@@ -106,9 +110,12 @@ test('listening and publishing and closing', async () => {
|
||||
)
|
||||
|
||||
await relay.publish(event)
|
||||
await evented
|
||||
sub.close()
|
||||
await closed
|
||||
|
||||
let [t1, t2] = await waiting
|
||||
expect(t1).toBeUndefined()
|
||||
expect(t2).toBeUndefined()
|
||||
expect(eventReceived).toBeDefined()
|
||||
expect(eventReceived).toHaveProperty('pubkey', pk)
|
||||
expect(eventReceived).toHaveProperty('kind', 23571)
|
||||
expect(eventReceived).toHaveProperty('content', 'nostr-tools test suite')
|
||||
})
|
||||
|
||||
356
relay.ts
356
relay.ts
@@ -1,351 +1,23 @@
|
||||
/* global WebSocket */
|
||||
import { verifyEvent } from './pure.ts'
|
||||
import { AbstractRelay } from './abstract-relay.ts'
|
||||
|
||||
import { verifyEvent, validateEvent, type Event, EventTemplate } from './pure.ts'
|
||||
import { matchFilters, type Filter } from './filter.ts'
|
||||
import { getHex64, getSubscriptionId } from './fakejson.ts'
|
||||
import { Queue, normalizeURL } from './utils.ts'
|
||||
import { nip42 } from './index.ts'
|
||||
import { yieldThread } from './helpers.ts'
|
||||
|
||||
export async function relayConnect(url: string) {
|
||||
const relay = new Relay(url)
|
||||
await relay.connect()
|
||||
return relay
|
||||
/**
|
||||
* @deprecated use Relay.connect() instead.
|
||||
*/
|
||||
export function relayConnect(url: string): Promise<Relay> {
|
||||
return Relay.connect(url)
|
||||
}
|
||||
|
||||
export class Relay {
|
||||
public readonly url: string
|
||||
private _connected: boolean = false
|
||||
|
||||
public trusted: boolean = false
|
||||
public onclose: (() => void) | null = null
|
||||
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
|
||||
|
||||
public baseEoseTimeout: number = 4400
|
||||
public connectionTimeout: number = 4400
|
||||
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
private connectionPromise: Promise<void> | undefined
|
||||
private openSubs = new Map<string, Subscription>()
|
||||
private openCountRequests = new Map<string, CountResolver>()
|
||||
private openEventPublishes = new Map<string, EventPublishResolver>()
|
||||
private ws: WebSocket | undefined
|
||||
private incomingMessageQueue = new Queue<string>()
|
||||
private queueRunning = false
|
||||
private challenge: string | undefined
|
||||
private serial: number = 0
|
||||
|
||||
export class Relay extends AbstractRelay {
|
||||
constructor(url: string) {
|
||||
this.url = normalizeURL(url)
|
||||
super(url, { verifyEvent })
|
||||
}
|
||||
|
||||
private closeAllSubscriptions(reason: string) {
|
||||
for (let [_, sub] of this.openSubs) {
|
||||
sub.close(reason)
|
||||
}
|
||||
this.openSubs.clear()
|
||||
|
||||
for (let [_, ep] of this.openEventPublishes) {
|
||||
ep.reject(new Error(reason))
|
||||
}
|
||||
this.openEventPublishes.clear()
|
||||
|
||||
for (let [_, cr] of this.openCountRequests) {
|
||||
cr.reject(new Error(reason))
|
||||
}
|
||||
this.openCountRequests.clear()
|
||||
}
|
||||
|
||||
public get connected(): boolean {
|
||||
return this._connected
|
||||
}
|
||||
|
||||
public async connect(): Promise<void> {
|
||||
if (this.connectionPromise) return this.connectionPromise
|
||||
|
||||
this.challenge = undefined
|
||||
this.connectionPromise = new Promise((resolve, reject) => {
|
||||
this.connectionTimeoutHandle = setTimeout(() => {
|
||||
reject('connection timed out')
|
||||
this.connectionPromise = undefined
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection timed out')
|
||||
}, this.connectionTimeout)
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(this.url)
|
||||
} catch (err) {
|
||||
reject(err)
|
||||
return
|
||||
}
|
||||
|
||||
this.ws.onopen = () => {
|
||||
clearTimeout(this.connectionTimeoutHandle)
|
||||
this._connected = true
|
||||
resolve()
|
||||
}
|
||||
|
||||
this.ws.onerror = ev => {
|
||||
reject((ev as any).message)
|
||||
if (this._connected) {
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection errored')
|
||||
this._connected = false
|
||||
}
|
||||
}
|
||||
|
||||
this.ws.onclose = async () => {
|
||||
this.connectionPromise = undefined
|
||||
this.onclose?.()
|
||||
this.closeAllSubscriptions('relay connection closed')
|
||||
this._connected = false
|
||||
}
|
||||
|
||||
this.ws.onmessage = ev => {
|
||||
this.incomingMessageQueue.enqueue(ev.data as string)
|
||||
if (!this.queueRunning) {
|
||||
this.runQueue()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return this.connectionPromise
|
||||
}
|
||||
|
||||
private async runQueue() {
|
||||
this.queueRunning = true
|
||||
while (true) {
|
||||
if (false === this.handleNext()) {
|
||||
break
|
||||
}
|
||||
await yieldThread()
|
||||
}
|
||||
this.queueRunning = false
|
||||
}
|
||||
|
||||
private handleNext(): undefined | false {
|
||||
const json = this.incomingMessageQueue.dequeue()
|
||||
if (!json) {
|
||||
return false
|
||||
}
|
||||
|
||||
const subid = getSubscriptionId(json)
|
||||
if (subid) {
|
||||
const so = this.openSubs.get(subid as string)
|
||||
if (!so) {
|
||||
// this is an EVENT message, but for a subscription we don't have, so just stop here
|
||||
return
|
||||
}
|
||||
|
||||
// this will be called only when this message is a EVENT message for a subscription we have
|
||||
// we do this before parsing the JSON to not have to do that for duplicate events
|
||||
// since JSON parsing is slow
|
||||
const id = getHex64(json, 'id')
|
||||
const alreadyHave = so.alreadyHaveEvent?.(id)
|
||||
|
||||
// notify any interested client that the relay has this event
|
||||
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
|
||||
so.receivedEvent?.(this, id)
|
||||
|
||||
if (alreadyHave) {
|
||||
// if we had already seen this event we can just stop here
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
let data = JSON.parse(json)
|
||||
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
|
||||
// will naturally be caught by the encompassing try..catch block
|
||||
|
||||
switch (data[0]) {
|
||||
case 'EVENT': {
|
||||
const so = this.openSubs.get(data[1] as string) as Subscription
|
||||
const event = data[2] as Event
|
||||
if ((this.trusted || (validateEvent(event) && verifyEvent(event))) && matchFilters(so.filters, event)) {
|
||||
so.onevent(event)
|
||||
}
|
||||
return
|
||||
}
|
||||
case 'COUNT': {
|
||||
const id: string = data[1]
|
||||
const payload = data[2] as { count: number }
|
||||
const cr = this.openCountRequests.get(id) as CountResolver
|
||||
if (cr) {
|
||||
cr.resolve(payload.count)
|
||||
this.openCountRequests.delete(id)
|
||||
}
|
||||
return
|
||||
}
|
||||
case 'EOSE': {
|
||||
const so = this.openSubs.get(data[1] as string)
|
||||
if (!so) return
|
||||
so.receivedEose()
|
||||
return
|
||||
}
|
||||
case 'OK': {
|
||||
const id: string = data[1]
|
||||
const ok: boolean = data[2]
|
||||
const reason: string = data[3]
|
||||
const ep = this.openEventPublishes.get(id) as EventPublishResolver
|
||||
if (ok) ep.resolve(reason)
|
||||
else ep.reject(new Error(reason))
|
||||
this.openEventPublishes.delete(id)
|
||||
return
|
||||
}
|
||||
case 'CLOSED': {
|
||||
const id: string = data[1]
|
||||
const so = this.openSubs.get(id)
|
||||
if (!so) return
|
||||
so.closed = true
|
||||
so.close(data[2] as string)
|
||||
this.openSubs.delete(id)
|
||||
return
|
||||
}
|
||||
case 'NOTICE':
|
||||
this.onnotice(data[1] as string)
|
||||
return
|
||||
case 'AUTH': {
|
||||
this.challenge = data[1] as string
|
||||
return
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
public async send(message: string) {
|
||||
if (!this.connectionPromise) throw new Error('sending on closed connection')
|
||||
|
||||
this.connectionPromise.then(() => {
|
||||
this.ws?.send(message)
|
||||
})
|
||||
}
|
||||
|
||||
public async auth(signAuthEvent: (authEvent: EventTemplate) => Promise<void>) {
|
||||
if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
|
||||
const evt = nip42.makeAuthEvent(this.url, this.challenge)
|
||||
await signAuthEvent(evt)
|
||||
this.send('["AUTH",' + JSON.stringify(evt) + ']')
|
||||
}
|
||||
|
||||
public async publish(event: Event): Promise<string> {
|
||||
const ret = new Promise<string>((resolve, reject) => {
|
||||
this.openEventPublishes.set(event.id, { resolve, reject })
|
||||
})
|
||||
this.send('["EVENT",' + JSON.stringify(event) + ']')
|
||||
return ret
|
||||
}
|
||||
|
||||
public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
|
||||
this.serial++
|
||||
const id = params?.id || 'count:' + this.serial
|
||||
const ret = new Promise<number>((resolve, reject) => {
|
||||
this.openCountRequests.set(id, { resolve, reject })
|
||||
})
|
||||
this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']')
|
||||
return ret
|
||||
}
|
||||
|
||||
public subscribe(filters: Filter[], params: Partial<SubscriptionParams>): Subscription {
|
||||
const subscription = this.prepareSubscription(filters, params)
|
||||
subscription.fire()
|
||||
return subscription
|
||||
}
|
||||
|
||||
public prepareSubscription(filters: Filter[], params: Partial<SubscriptionParams> & { id?: string }): Subscription {
|
||||
this.serial++
|
||||
const id = params.id || 'sub:' + this.serial
|
||||
const subscription = new Subscription(this, id, filters, params)
|
||||
this.openSubs.set(id, subscription)
|
||||
return subscription
|
||||
}
|
||||
|
||||
public close() {
|
||||
this.closeAllSubscriptions('relay connection closed by us')
|
||||
this._connected = false
|
||||
this.ws?.close()
|
||||
static async connect(url: string) {
|
||||
const relay = new Relay(url)
|
||||
await relay.connect()
|
||||
return relay
|
||||
}
|
||||
}
|
||||
|
||||
export class Subscription {
|
||||
public readonly relay: Relay
|
||||
public readonly id: string
|
||||
|
||||
public closed: boolean = false
|
||||
public eosed: boolean = false
|
||||
public filters: Filter[]
|
||||
public alreadyHaveEvent: ((id: string) => boolean) | undefined
|
||||
public receivedEvent: ((relay: Relay, id: string) => void) | undefined
|
||||
|
||||
public onevent: (evt: Event) => void
|
||||
public oneose: (() => void) | undefined
|
||||
public onclose: ((reason: string) => void) | undefined
|
||||
|
||||
public eoseTimeout: number
|
||||
private eoseTimeoutHandle: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
constructor(relay: Relay, id: string, filters: Filter[], params: SubscriptionParams) {
|
||||
this.relay = relay
|
||||
this.filters = filters
|
||||
this.id = id
|
||||
this.alreadyHaveEvent = params.alreadyHaveEvent
|
||||
this.receivedEvent = params.receivedEvent
|
||||
this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout
|
||||
|
||||
this.oneose = params.oneose
|
||||
this.onclose = params.onclose
|
||||
this.onevent =
|
||||
params.onevent ||
|
||||
(event => {
|
||||
console.warn(
|
||||
`onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`,
|
||||
event,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
public fire() {
|
||||
this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1))
|
||||
|
||||
// only now we start counting the eoseTimeout
|
||||
this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout)
|
||||
}
|
||||
|
||||
public receivedEose() {
|
||||
if (this.eosed) return
|
||||
clearTimeout(this.eoseTimeoutHandle)
|
||||
this.eosed = true
|
||||
this.oneose?.()
|
||||
}
|
||||
|
||||
public close(reason: string = 'closed by caller') {
|
||||
if (!this.closed) {
|
||||
// if the connection was closed by the user calling .close() we will send a CLOSE message
|
||||
// otherwise this._open will be already set to false so we will skip this
|
||||
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
|
||||
this.closed = true
|
||||
}
|
||||
this.onclose?.(reason)
|
||||
}
|
||||
}
|
||||
|
||||
export type SubscriptionParams = {
|
||||
onevent?: (evt: Event) => void
|
||||
oneose?: () => void
|
||||
onclose?: (reason: string) => void
|
||||
alreadyHaveEvent?: (id: string) => boolean
|
||||
receivedEvent?: (relay: Relay, id: string) => void
|
||||
eoseTimeout?: number
|
||||
}
|
||||
|
||||
export type CountResolver = {
|
||||
resolve: (count: number) => void
|
||||
reject: (err: Error) => void
|
||||
}
|
||||
|
||||
export type EventPublishResolver = {
|
||||
resolve: (reason: string) => void
|
||||
reject: (err: Error) => void
|
||||
}
|
||||
export * from './abstract-relay.ts'
|
||||
|
||||
@@ -9,10 +9,10 @@
|
||||
"skipLibCheck": true,
|
||||
"esModuleInterop": true,
|
||||
"emitDeclarationOnly": true,
|
||||
"allowImportingTsExtensions": true,
|
||||
"outDir": "lib/types",
|
||||
"resolveJsonModule": true,
|
||||
"rootDir": ".",
|
||||
"allowImportingTsExtensions": true,
|
||||
"types": ["bun-types"]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import { describe, test, expect } from 'bun:test'
|
||||
import { buildEvent } from './test-helpers.ts'
|
||||
import { Queue, insertEventIntoAscendingList, insertEventIntoDescendingList, binarySearch } from './utils.ts'
|
||||
|
||||
import type { Event } from './pure.ts'
|
||||
import type { Event } from './core.ts'
|
||||
|
||||
describe('inserting into a desc sorted list of events', () => {
|
||||
test('insert into an empty list', async () => {
|
||||
|
||||
Reference in New Issue
Block a user