fix Queue, tweaks on relay.ts and make relay.test.ts pass.

This commit is contained in:
fiatjaf
2023-12-17 00:27:03 -03:00
parent 7a640092d0
commit 420a6910e9
8 changed files with 207 additions and 202 deletions

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,15 +1,14 @@
import { test, expect } from 'bun:test' import { test, expect } from 'bun:test'
import 'websocket-polyfill'
import { makeAuthEvent } from './nip42.ts' import { makeAuthEvent } from './nip42.ts'
import { relayInit } from './relay.ts' import { relayConnect } from './relay.ts'
test('auth flow', () => { test('auth flow', () => {
const relay = relayInit('wss://nostr.wine') const relay = relayConnect('wss://nostr.wine')
const auth = makeAuthEvent(relay.url, 'chachacha') const auth = makeAuthEvent(relay.url, 'chachacha')
expect(auth.tags).toHaveLength(2) expect(auth.tags).toHaveLength(2)
expect(auth.tags[0]).toEqual(['relay', 'wss://nostr.wine']) expect(auth.tags[0]).toEqual(['relay', 'wss://nostr.wine/'])
expect(auth.tags[1]).toEqual(['challenge', 'chachacha']) expect(auth.tags[1]).toEqual(['challenge', 'chachacha'])
expect(auth.kind).toEqual(22242) expect(auth.kind).toEqual(22242)
}) })

View File

@@ -185,7 +185,6 @@
"node-fetch": "^2.6.9", "node-fetch": "^2.6.9",
"prettier": "^3.0.3", "prettier": "^3.0.3",
"tsd": "^0.22.0", "tsd": "^0.22.0",
"typescript": "^5.0.4", "typescript": "^5.0.4"
"websocket-polyfill": "^0.0.3"
} }
} }

View File

@@ -1,5 +1,4 @@
import { test, expect } from 'bun:test' import { test, expect } from 'bun:test'
import 'websocket-polyfill'
import { finishEvent, type Event } from './event.ts' import { finishEvent, type Event } from './event.ts'
import { generatePrivateKey, getPublicKey } from './keys.ts' import { generatePrivateKey, getPublicKey } from './keys.ts'

View File

@@ -1,125 +1,99 @@
import { test, expect } from 'bun:test' import { test, expect, afterEach, beforeEach } from 'bun:test'
import 'websocket-polyfill'
import { finishEvent } from './event.ts' import { finishEvent } from './event.ts'
import { generatePrivateKey, getPublicKey } from './keys.ts' import { generatePrivateKey, getPublicKey } from './keys.ts'
import { relayInit } from './relay.ts' import { Relay } from './relay.ts'
let relay = relayInit('wss://relay.damus.io/') let relay = new Relay('wss://public.relaying.io')
beforeAll(() => { beforeEach(() => {
relay.connect() relay.connect()
}) })
afterAll(() => { afterEach(() => {
relay.close() relay.close()
}) })
test('connectivity', () => { test('connectivity', async () => {
return expect( await relay.connect()
new Promise(resolve => { expect(relay.connected).toBeTrue()
relay.on('connect', () => {
resolve(true)
})
relay.on('error', () => {
resolve(false)
})
}),
).resolves.toBe(true)
}) })
test('querying', async () => { test('querying', async () => {
var resolve1: (value: boolean) => void let resolve1: () => void
var resolve2: (value: boolean) => void let resolve2: () => void
let sub = relay.sub([ let waiting = Promise.all([
{ new Promise<void>(resolve => {
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
},
])
sub.on('event', event => {
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027')
resolve1(true)
})
sub.on('eose', () => {
resolve2(true)
})
let [t1, t2] = await Promise.all([
new Promise<boolean>(resolve => {
resolve1 = resolve resolve1 = resolve
}), }),
new Promise<boolean>(resolve => { new Promise<void>(resolve => {
resolve2 = resolve resolve2 = resolve
}), }),
]) ])
expect(t1).toEqual(true) relay.subscribe(
expect(t2).toEqual(true) [
{
ids: ['3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4'],
},
],
{
onevent(event) {
expect(event).toHaveProperty('id', '3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4')
expect(event).toHaveProperty('content', '+')
expect(event).toHaveProperty('kind', 7)
resolve1()
},
oneose() {
resolve2()
},
},
)
let [t1, t2] = await waiting
expect(t1).toBeUndefined()
expect(t2).toBeUndefined()
}, 10000) }, 10000)
test('async iterator', async () => { test('listening and publishing and closing', async () => {
let sub = relay.sub([
{
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
},
])
for await (const event of sub.events) {
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027')
break
}
})
test('get()', async () => {
let event = await relay.get({
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
})
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027')
})
test('list()', async () => {
let events = await relay.list([
{
authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'],
kinds: [1],
limit: 2,
},
])
expect(events.length).toEqual(2)
})
test('listening (twice) and publishing', async () => {
let sk = generatePrivateKey() let sk = generatePrivateKey()
let pk = getPublicKey(sk) let pk = getPublicKey(sk)
var resolve1: (value: boolean) => void var resolve1: (_: void) => void
var resolve2: (value: boolean) => void var resolve2: (_: void) => void
let sub = relay.sub([ let waiting = Promise.all([
{ new Promise(resolve => {
kinds: [27572], resolve1 = resolve
authors: [pk], }),
}, new Promise(resolve => {
resolve2 = resolve
}),
]) ])
sub.on('event', event => { let sub = await relay.subscribe(
expect(event).toHaveProperty('pubkey', pk) [
expect(event).toHaveProperty('kind', 27572) {
expect(event).toHaveProperty('content', 'nostr-tools test suite') kinds: [23571],
resolve1(true) authors: [pk],
}) },
sub.on('event', event => { ],
expect(event).toHaveProperty('pubkey', pk) {
expect(event).toHaveProperty('kind', 27572) onevent(event) {
expect(event).toHaveProperty('content', 'nostr-tools test suite') expect(event).toHaveProperty('pubkey', pk)
resolve2(true) expect(event).toHaveProperty('kind', 23571)
}) expect(event).toHaveProperty('content', 'nostr-tools test suite')
resolve1()
},
onclose() {
resolve2()
},
},
)
let event = finishEvent( let event = finishEvent(
{ {
kind: 27572, kind: 23571,
created_at: Math.floor(Date.now() / 1000), created_at: Math.floor(Date.now() / 1000),
tags: [], tags: [],
content: 'nostr-tools test suite', content: 'nostr-tools test suite',
@@ -127,15 +101,10 @@ test('listening (twice) and publishing', async () => {
sk, sk,
) )
relay.publish(event) await relay.publish(event)
return expect( sub.close()
Promise.all([
new Promise(resolve => { let [t1, t2] = await waiting
resolve1 = resolve expect(t1).toBeUndefined()
}), expect(t2).toBeUndefined()
new Promise(resolve => {
resolve2 = resolve
}),
]),
).resolves.toEqual([true, true])
}) })

184
relay.ts
View File

@@ -12,68 +12,13 @@ export function relayConnect(url: string) {
return relay return relay
} }
class Subscription { export class Relay {
public readonly relay: Relay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
public alreadyHaveEvent: ((id: string) => boolean) | null = null
public receivedEvent: ((id: string) => boolean) | null = null
public readonly filters: Filter[]
public onevent: (evt: Event) => void
public oneose: (() => void) | null = null
public onclose: ((reason: string) => void) | null = null
constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = params.id
this.onevent = params.onevent
this.oneose = params.oneose || null
this.onclose = params.onclose || null
this.alreadyHaveEvent = params.alreadyHaveEvent || null
this.receivedEvent = params.receivedEvent || null
}
public close(reason: string) {
if (!this.closed) {
// if the connection was closed by the user calling .close() we will send a CLOSE message
// otherwise this._open will be already set to false so we will skip this
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
this.closed = true
}
this.onclose?.(reason)
}
}
type SubscriptionParams = {
id: string
onevent: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent: ((id: string) => boolean) | null
receivedEvent: ((id: string) => boolean) | null
}
type CountResolver = {
resolve: (count: number) => void
reject: (err: Error) => void
}
type EventPublishResolver = {
resolve: (reason: string) => void
reject: (err: Error) => void
}
class Relay {
public readonly url: string public readonly url: string
private _connected: boolean = false private _connected: boolean = false
public trusted: boolean = false public trusted: boolean = false
public onclose: (() => void) | null = null public onclose: (() => void) | null = null
public onnotice: (msg: string) => void = console.log public onnotice: (msg: string) => void = msg => console.log(`NOTICE from ${this.url}: ${msg}`)
private connectionPromise: Promise<void> | undefined private connectionPromise: Promise<void> | undefined
private openSubs = new Map<string, Subscription>() private openSubs = new Map<string, Subscription>()
@@ -81,7 +26,7 @@ class Relay {
private openEventPublishes = new Map<string, EventPublishResolver>() private openEventPublishes = new Map<string, EventPublishResolver>()
private ws: WebSocket | undefined private ws: WebSocket | undefined
private incomingMessageQueue = new Queue<string>() private incomingMessageQueue = new Queue<string>()
private handleNextInterval: ReturnType<typeof setInterval> | null = null private queueRunning = false
private challenge: string | undefined private challenge: string | undefined
private serial: number = 0 private serial: number = 0
@@ -112,6 +57,8 @@ class Relay {
public async connect(): Promise<void> { public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise if (this.connectionPromise) return this.connectionPromise
this.challenge = undefined
this.connectionPromise = new Promise((resolve, reject) => { this.connectionPromise = new Promise((resolve, reject) => {
try { try {
this.ws = new WebSocket(this.url) this.ws = new WebSocket(this.url)
@@ -125,8 +72,8 @@ class Relay {
resolve() resolve()
} }
this.ws.onerror = () => { this.ws.onerror = ev => {
reject() reject((ev as any).message)
if (this._connected) { if (this._connected) {
this.onclose?.() this.onclose?.()
this.closeAllSubscriptions('relay connection errored') this.closeAllSubscriptions('relay connection errored')
@@ -143,19 +90,30 @@ class Relay {
this.ws.onmessage = ev => { this.ws.onmessage = ev => {
this.incomingMessageQueue.enqueue(ev.data as string) this.incomingMessageQueue.enqueue(ev.data as string)
if (!this.handleNextInterval) { if (!this.queueRunning) {
this.handleNextInterval = setInterval(this.handleNext.bind(this), 0) this.runQueue()
} }
} }
}) })
return this.connectionPromise
} }
private handleNext() { private async runQueue() {
this.queueRunning = true
while (true) {
if (false === this.handleNext()) {
break
}
await Promise.resolve()
}
this.queueRunning = false
}
private handleNext(): undefined | false {
const json = this.incomingMessageQueue.dequeue() const json = this.incomingMessageQueue.dequeue()
if (!json) { if (!json) {
clearInterval(this.handleNextInterval as ReturnType<typeof setInterval>) return false
this.handleNextInterval = null
return
} }
const subid = getSubscriptionId(json) const subid = getSubscriptionId(json)
@@ -249,36 +207,106 @@ class Relay {
if (!this.challenge) throw new Error("can't perform auth, no challenge was received") if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
const evt = nip42.makeAuthEvent(this.url, this.challenge) const evt = nip42.makeAuthEvent(this.url, this.challenge)
await Promise.all([signAuthEvent(evt), this.connect()]) await Promise.all([signAuthEvent(evt), this.connect()])
this.ws?.send('["AUTH",' + JSON.stringify(evt) + ']') this.send('["AUTH",' + JSON.stringify(evt) + ']')
} }
public async publish(event: Event) { public async publish(event: Event): Promise<string> {
await this.connect() await this.connect()
const ret = new Promise((resolve, reject) => { const ret = new Promise<string>((resolve, reject) => {
this.openEventPublishes.set(event.id, { resolve, reject }) this.openEventPublishes.set(event.id, { resolve, reject })
}) })
this.ws?.send('["EVENT",' + JSON.stringify(event) + ']') this.send('["EVENT",' + JSON.stringify(event) + ']')
return ret return ret
} }
public async count(filters: Filter[], params: { id?: string | null }) { public async count(filters: Filter[], params: { id?: string | null }): Promise<number> {
await this.connect() await this.connect()
this.serial++ this.serial++
const id = params?.id || 'count:' + this.serial const id = params?.id || 'count:' + this.serial
const ret = new Promise((resolve, reject) => { const ret = new Promise<number>((resolve, reject) => {
this.openCountRequests.set(id, { resolve, reject }) this.openCountRequests.set(id, { resolve, reject })
}) })
this.ws?.send('["COUNT","' + id + '"' + JSON.stringify(filters) + ']') this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']')
return ret return ret
} }
public async subscribe(filters: Filter[], params: SubscriptionParams & { id: string | undefined }) { public async subscribe(filters: Filter[], params: Partial<SubscriptionParams>) {
await this.connect() await this.connect()
this.serial++ this.serial++
params.id = params.id || 'sub:' + this.serial const id = params.id || 'sub:' + this.serial
const subscription = new Subscription(this, filters, params) const subscription = new Subscription(this, filters, {
this.openSubs.set(params.id, subscription) onevent: event => {
this.ws?.send('["REQ","' + params.id + '"' + JSON.stringify(filters) + ']') console.warn(
`onevent() callback not defined for subscription '${id}' in relay ${this.url}. event received:`,
event,
)
},
...params,
id,
})
this.openSubs.set(id, subscription)
this.send('["REQ","' + id + '",' + JSON.stringify(filters).substring(1))
return subscription return subscription
} }
public close() {
this.closeAllSubscriptions('relay connection closed by us')
this._connected = false
this.ws?.close()
}
}
export class Subscription {
public readonly relay: Relay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
public alreadyHaveEvent: ((id: string) => boolean) | undefined
public receivedEvent: ((id: string) => boolean) | undefined
public readonly filters: Filter[]
public onevent: (evt: Event) => void
public oneose: (() => void) | undefined
public onclose: ((reason: string) => void) | undefined
constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = params.id
this.onevent = params.onevent
this.oneose = params.oneose
this.onclose = params.onclose
this.alreadyHaveEvent = params.alreadyHaveEvent
this.receivedEvent = params.receivedEvent
}
public close(reason: string = 'closed by caller') {
if (!this.closed) {
// if the connection was closed by the user calling .close() we will send a CLOSE message
// otherwise this._open will be already set to false so we will skip this
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
this.closed = true
}
this.onclose?.(reason)
}
}
export type SubscriptionParams = {
id: string
onevent: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent?: (id: string) => boolean
receivedEvent?: (id: string) => boolean
}
export type CountResolver = {
resolve: (count: number) => void
reject: (err: Error) => void
}
export type EventPublishResolver = {
resolve: (reason: string) => void
reject: (err: Error) => void
} }

View File

@@ -1,6 +1,6 @@
import { describe, test, expect } from 'bun:test' import { describe, test, expect } from 'bun:test'
import { buildEvent } from './test-helpers.ts' import { buildEvent } from './test-helpers.ts'
import { MessageQueue, insertEventIntoAscendingList, insertEventIntoDescendingList } from './utils.ts' import { Queue, insertEventIntoAscendingList, insertEventIntoDescendingList } from './utils.ts'
import type { Event } from './event.ts' import type { Event } from './event.ts'
@@ -216,27 +216,25 @@ describe('inserting into a asc sorted list of events', () => {
describe('enque a message into MessageQueue', () => { describe('enque a message into MessageQueue', () => {
test('enque into an empty queue', () => { test('enque into an empty queue', () => {
const queue = new MessageQueue() const queue = new Queue()
queue.enqueue('node1') queue.enqueue('node1')
expect(queue.first!.value).toBe('node1') expect(queue.first!.value).toBe('node1')
}) })
test('enque into a non-empty queue', () => { test('enque into a non-empty queue', () => {
const queue = new MessageQueue() const queue = new Queue()
queue.enqueue('node1') queue.enqueue('node1')
queue.enqueue('node3') queue.enqueue('node3')
queue.enqueue('node2') queue.enqueue('node2')
expect(queue.first!.value).toBe('node1') expect(queue.first!.value).toBe('node1')
expect(queue.last!.value).toBe('node2') expect(queue.last!.value).toBe('node2')
expect(queue.size).toBe(3)
}) })
test('dequeue from an empty queue', () => { test('dequeue from an empty queue', () => {
const queue = new MessageQueue() const queue = new Queue()
const item1 = queue.dequeue() const item1 = queue.dequeue()
expect(item1).toBe(null) expect(item1).toBe(null)
expect(queue.size).toBe(0)
}) })
test('dequeue from a non-empty queue', () => { test('dequeue from a non-empty queue', () => {
const queue = new MessageQueue() const queue = new Queue()
queue.enqueue('node1') queue.enqueue('node1')
queue.enqueue('node3') queue.enqueue('node3')
queue.enqueue('node2') queue.enqueue('node2')
@@ -246,14 +244,13 @@ describe('enque a message into MessageQueue', () => {
expect(item2).toBe('node3') expect(item2).toBe('node3')
}) })
test('dequeue more than in queue', () => { test('dequeue more than in queue', () => {
const queue = new MessageQueue() const queue = new Queue()
queue.enqueue('node1') queue.enqueue('node1')
queue.enqueue('node3') queue.enqueue('node3')
const item1 = queue.dequeue() const item1 = queue.dequeue()
expect(item1).toBe('node1') expect(item1).toBe('node1')
const item2 = queue.dequeue() const item2 = queue.dequeue()
expect(item2).toBe('node3') expect(item2).toBe('node3')
expect(queue.size).toBe(0)
const item3 = queue.dequeue() const item3 = queue.dequeue()
expect(item3).toBe(null) expect(item3).toBe(null)
}) })

View File

@@ -94,11 +94,11 @@ export function insertEventIntoAscendingList(sortedArray: Event[], event: Event)
export class QueueNode<V> { export class QueueNode<V> {
public value: V public value: V
public next: QueueNode<V> | null public next: QueueNode<V> | null = null
public prev: QueueNode<V> | null = null
constructor(message: V) { constructor(message: V) {
this.value = message this.value = message
this.next = null
} }
} }
@@ -114,9 +114,17 @@ export class Queue<V> {
enqueue(value: V): boolean { enqueue(value: V): boolean {
const newNode = new QueueNode(value) const newNode = new QueueNode(value)
if (!this.last) { if (!this.last) {
// list is empty
this.first = newNode this.first = newNode
this.last = newNode this.last = newNode
} else if (this.last === this.first) {
// list has a single element
this.last = newNode
this.last.prev = this.first
this.first.next = newNode
} else { } else {
// list has elements, add as last
newNode.prev = this.last
this.last.next = newNode this.last.next = newNode
this.last = newNode this.last = newNode
} }
@@ -126,10 +134,16 @@ export class Queue<V> {
dequeue(): V | null { dequeue(): V | null {
if (!this.first) return null if (!this.first) return null
let prev = this.first if (this.first === this.last) {
this.first = prev.next const target = this.first
prev.next = null this.first = null
this.last = null
return target.value
}
return prev.value const target = this.first
this.first = target.next
return target.value
} }
} }