From 7a640092d0c1df28ef57611ae5bde952ae59842b Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 16 Dec 2023 18:56:15 -0300 Subject: [PATCH] rewrite relay.ts to be much simpler. --- event.ts | 4 +- relay.ts | 592 ++++++++++++++++++++++--------------------------------- utils.ts | 82 +++----- 3 files changed, 265 insertions(+), 413 deletions(-) diff --git a/event.ts b/event.ts index 1632548..2878462 100644 --- a/event.ts +++ b/event.ts @@ -49,7 +49,7 @@ export function getEventHash(event: UnsignedEvent): string { const isRecord = (obj: unknown): obj is Record => obj instanceof Object -export function validateEvent(event: T): event is T & UnsignedEvent { +export function validateEvent(event: UnsignedEvent): boolean { if (!isRecord(event)) return false if (typeof event.kind !== 'number') return false if (typeof event.content !== 'string') return false @@ -70,7 +70,7 @@ export function validateEvent(event: T): event is T & UnsignedEvent { } /** Verify the event's signature. This function mutates the event with a `verified` symbol, making it idempotent. */ -export function verifySignature(event: Event): event is VerifiedEvent { +export function verifySignature(event: Event): boolean { if (typeof event[verifiedSymbol] === 'boolean') return event[verifiedSymbol] const hash = getEventHash(event) diff --git a/relay.ts b/relay.ts index 9d06a46..0011a98 100644 --- a/relay.ts +++ b/relay.ts @@ -1,398 +1,284 @@ /* global WebSocket */ -import { verifySignature, validateEvent, type Event } from './event.ts' +import { verifySignature, validateEvent, type Event, EventTemplate } from './event.ts' import { matchFilters, type Filter } from './filter.ts' import { getHex64, getSubscriptionId } from './fakejson.ts' -import { MessageQueue } from './utils.ts' +import { Queue, normalizeURL } from './utils.ts' +import { nip42 } from './index.ts' -type RelayEvent = { - connect: () => void | Promise - disconnect: () => void | Promise - error: () => void | Promise - notice: (msg: string) => void | Promise - auth: (challenge: string) => void | Promise -} -export type CountPayload = { - count: number -} -export type SubEvent = { - event: (event: Event) => void | Promise - count: (payload: CountPayload) => void | Promise - eose: () => void | Promise -} -export type Relay = { - url: string - status: number - connect: () => Promise - close: () => void - sub: (filters: Filter[], opts?: SubscriptionOptions) => Sub - list: (filters: Filter[], opts?: SubscriptionOptions) => Promise - get: (filter: Filter, opts?: SubscriptionOptions) => Promise - count: (filters: Filter[], opts?: SubscriptionOptions) => Promise - publish: (event: Event) => Promise - auth: (event: Event) => Promise - off: (event: T, listener: U) => void - on: (event: T, listener: U) => void -} -export type Sub = { - sub: (filters: Filter[], opts: SubscriptionOptions) => Sub - unsub: () => void - on: (event: T, listener: U) => void - off: (event: T, listener: U) => void - events: AsyncGenerator +export function relayConnect(url: string) { + const relay = new Relay(url) + relay.connect() + return relay } -export type SubscriptionOptions = { - id?: string - verb?: 'REQ' | 'COUNT' - skipVerification?: boolean - alreadyHaveEvent?: null | ((id: string, relay: string) => boolean) - eoseSubTimeout?: number -} +class Subscription { + public readonly relay: Relay + public readonly id: string + public closed: boolean = false + public eosed: boolean = false -const newListeners = (): { [TK in keyof RelayEvent]: RelayEvent[TK][] } => ({ - connect: [], - disconnect: [], - error: [], - notice: [], - auth: [], -}) + public alreadyHaveEvent: ((id: string) => boolean) | null = null + public receivedEvent: ((id: string) => boolean) | null = null + public readonly filters: Filter[] -export function relayInit( - url: string, - options: { - getTimeout?: number - listTimeout?: number - countTimeout?: number - } = {}, -): Relay { - let { listTimeout = 3000, getTimeout = 3000, countTimeout = 3000 } = options + public onevent: (evt: Event) => void + public oneose: (() => void) | null = null + public onclose: ((reason: string) => void) | null = null - var ws: WebSocket - var openSubs: { [id: string]: { filters: Filter[] } & SubscriptionOptions } = {} - var listeners = newListeners() - var subListeners: { - [subid: string]: { [TK in keyof SubEvent]: SubEvent[TK][] } - } = {} - var pubListeners: { - [eventid: string]: { - resolve: (_: unknown) => void - reject: (err: Error) => void + constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) { + this.relay = relay + this.filters = filters + this.id = params.id + this.onevent = params.onevent + this.oneose = params.oneose || null + this.onclose = params.onclose || null + this.alreadyHaveEvent = params.alreadyHaveEvent || null + this.receivedEvent = params.receivedEvent || null + } + + public close(reason: string) { + if (!this.closed) { + // if the connection was closed by the user calling .close() we will send a CLOSE message + // otherwise this._open will be already set to false so we will skip this + this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']') + this.closed = true } - } = {} + this.onclose?.(reason) + } +} - var connectionPromise: Promise | undefined - async function connectRelay(): Promise { - if (connectionPromise) return connectionPromise - connectionPromise = new Promise((resolve, reject) => { +type SubscriptionParams = { + id: string + onevent: (evt: Event) => void + oneose?: () => void + onclose?: (reason: string) => void + alreadyHaveEvent: ((id: string) => boolean) | null + receivedEvent: ((id: string) => boolean) | null +} + +type CountResolver = { + resolve: (count: number) => void + reject: (err: Error) => void +} + +type EventPublishResolver = { + resolve: (reason: string) => void + reject: (err: Error) => void +} + +class Relay { + public readonly url: string + private _connected: boolean = false + + public trusted: boolean = false + public onclose: (() => void) | null = null + public onnotice: (msg: string) => void = console.log + + private connectionPromise: Promise | undefined + private openSubs = new Map() + private openCountRequests = new Map() + private openEventPublishes = new Map() + private ws: WebSocket | undefined + private incomingMessageQueue = new Queue() + private handleNextInterval: ReturnType | null = null + private challenge: string | undefined + private serial: number = 0 + + constructor(url: string) { + this.url = normalizeURL(url) + } + + private closeAllSubscriptions(reason: string) { + for (let [_, sub] of this.openSubs) { + sub.close(reason) + } + this.openSubs.clear() + + for (let [_, ep] of this.openEventPublishes) { + ep.reject(new Error(reason)) + } + this.openEventPublishes.clear() + + for (let [_, cr] of this.openCountRequests) { + cr.reject(new Error(reason)) + } + this.openCountRequests.clear() + } + + public get connected(): boolean { + return this._connected + } + + public async connect(): Promise { + if (this.connectionPromise) return this.connectionPromise + this.connectionPromise = new Promise((resolve, reject) => { try { - ws = new WebSocket(url) + this.ws = new WebSocket(this.url) } catch (err) { reject(err) + return } - ws.onopen = () => { - listeners.connect.forEach(cb => cb()) + this.ws.onopen = () => { + this._connected = true resolve() } - ws.onerror = () => { - connectionPromise = undefined - listeners.error.forEach(cb => cb()) + + this.ws.onerror = () => { reject() - } - ws.onclose = async () => { - connectionPromise = undefined - listeners.disconnect.forEach(cb => cb()) - } - - let incomingMessageQueue: MessageQueue = new MessageQueue() - let handleNextInterval: any - - ws.onmessage = e => { - incomingMessageQueue.enqueue(e.data) - if (!handleNextInterval) { - handleNextInterval = setInterval(handleNext, 0) + if (this._connected) { + this.onclose?.() + this.closeAllSubscriptions('relay connection errored') + this._connected = false } } - function handleNext() { - if (incomingMessageQueue.size === 0) { - clearInterval(handleNextInterval) - handleNextInterval = null - return - } + this.ws.onclose = async () => { + this.connectionPromise = undefined + this.onclose?.() + this.closeAllSubscriptions('relay connection closed') + this._connected = false + } - var json = incomingMessageQueue.dequeue() - if (!json) return - - let subid = getSubscriptionId(json) - if (subid) { - let so = openSubs[subid] - if (so && so.alreadyHaveEvent && so.alreadyHaveEvent(getHex64(json, 'id'), url)) { - return - } - } - - try { - let data = JSON.parse(json) - - // we won't do any checks against the data since all failures (i.e. invalid messages from relays) - // will naturally be caught by the encompassing try..catch block - - switch (data[0]) { - case 'EVENT': { - let id = data[1] - let event = data[2] - if ( - validateEvent(event) && - openSubs[id] && - (openSubs[id].skipVerification || verifySignature(event)) && - matchFilters(openSubs[id].filters, event) - ) { - openSubs[id] - ;(subListeners[id]?.event || []).forEach(cb => cb(event)) - } - return - } - case 'COUNT': - let id = data[1] - let payload = data[2] - if (openSubs[id]) { - ;(subListeners[id]?.count || []).forEach(cb => cb(payload)) - } - return - case 'EOSE': { - let id = data[1] - if (id in subListeners) { - subListeners[id].eose.forEach(cb => cb()) - subListeners[id].eose = [] // 'eose' only happens once per sub, so stop listeners here - } - return - } - case 'OK': { - let id: string = data[1] - let ok: boolean = data[2] - let reason: string = data[3] || '' - if (id in pubListeners) { - let { resolve, reject } = pubListeners[id] - if (ok) resolve(null) - else reject(new Error(reason)) - } - return - } - case 'NOTICE': - let notice = data[1] - listeners.notice.forEach(cb => cb(notice)) - return - case 'AUTH': { - let challenge = data[1] - listeners.auth?.forEach(cb => cb(challenge)) - return - } - } - } catch (err) { - return + this.ws.onmessage = ev => { + this.incomingMessageQueue.enqueue(ev.data as string) + if (!this.handleNextInterval) { + this.handleNextInterval = setInterval(this.handleNext.bind(this), 0) } } }) - - return connectionPromise } - function connected() { - return ws?.readyState === 1 - } + private handleNext() { + const json = this.incomingMessageQueue.dequeue() + if (!json) { + clearInterval(this.handleNextInterval as ReturnType) + this.handleNextInterval = null + return + } - async function connect(): Promise { - if (connected()) return // ws already open - await connectRelay() - } + const subid = getSubscriptionId(json) + if (subid) { + const so = this.openSubs.get(subid as string) + if (!so) { + // this is an EVENT message, but for a subscription we don't have, so just stop here + return + } - async function trySend(params: [string, ...any]) { - let msg = JSON.stringify(params) - if (!connected()) { - await new Promise(resolve => setTimeout(resolve, 1000)) - if (!connected()) { + // this will be called only when this message is a EVENT message for a subscription we have + // we do this before parsing the JSON to not have to do that for duplicate events + // since JSON parsing is slow + const id = getHex64(json, 'id') + so.receivedEvent?.(id) // this is so the client knows this relay had this event + if (so.alreadyHaveEvent?.(id)) { + // if we had already seen this event we can just stop here return } } + try { - ws.send(msg) + let data = JSON.parse(json) + // we won't do any checks against the data since all failures (i.e. invalid messages from relays) + // will naturally be caught by the encompassing try..catch block + + switch (data[0]) { + case 'EVENT': { + const so = this.openSubs.get(data[1] as string) as Subscription + const event = data[2] as Event + if ((this.trusted || (validateEvent(event) && verifySignature(event))) && matchFilters(so.filters, event)) { + so.onevent(event) + } + return + } + case 'COUNT': { + const id: string = data[1] + const payload = data[2] as { count: number } + const cr = this.openCountRequests.get(id) as CountResolver + if (cr) { + cr.resolve(payload.count) + this.openCountRequests.delete(id) + } + return + } + case 'EOSE': { + const so = this.openSubs.get(data[1] as string) + if (!so || so.eosed) return + so.eosed = true + so.oneose?.() + return + } + case 'OK': { + const id: string = data[1] + const ok: boolean = data[2] + const reason: string = data[3] + const ep = this.openEventPublishes.get(id) as EventPublishResolver + if (ok) ep.resolve(reason) + else ep.reject(new Error(reason)) + this.openEventPublishes.delete(id) + return + } + case 'CLOSED': { + const id: string = data[1] + const so = this.openSubs.get(id) + if (!so) return + so.closed = true + so.close(data[2] as string) + this.openSubs.delete(id) + return + } + case 'NOTICE': + this.onnotice(data[1] as string) + return + case 'AUTH': { + this.challenge = data[1] as string + return + } + } } catch (err) { - console.log(err) + return } } - const sub = ( - filters: Filter[], - { - verb = 'REQ', - skipVerification = false, - alreadyHaveEvent = null, - id = Math.random().toString().slice(2), - }: SubscriptionOptions = {}, - ): Sub => { - let subid = id + public async send(message: string) { + await this.connect() + this.ws?.send(message) + } - openSubs[subid] = { - id: subid, - filters, - skipVerification, - alreadyHaveEvent, - } - trySend([verb, subid, ...filters]) + public async auth(signAuthEvent: (authEvent: EventTemplate) => Promise) { + if (!this.challenge) throw new Error("can't perform auth, no challenge was received") + const evt = nip42.makeAuthEvent(this.url, this.challenge) + await Promise.all([signAuthEvent(evt), this.connect()]) + this.ws?.send('["AUTH",' + JSON.stringify(evt) + ']') + } - let subscription: Sub = { - sub: (newFilters, newOpts = {}) => - sub(newFilters || filters, { - skipVerification: newOpts.skipVerification || skipVerification, - alreadyHaveEvent: newOpts.alreadyHaveEvent || alreadyHaveEvent, - id: subid, - }), - unsub: () => { - delete openSubs[subid] - delete subListeners[subid] - trySend(['CLOSE', subid]) - }, - on: (type, cb) => { - subListeners[subid] = subListeners[subid] || { - event: [], - count: [], - eose: [], - } - subListeners[subid][type].push(cb) - }, - off: (type, cb): void => { - let listeners = subListeners[subid] - let idx = listeners[type].indexOf(cb) - if (idx >= 0) listeners[type].splice(idx, 1) - }, - get events() { - return eventsGenerator(subscription) - }, - } + public async publish(event: Event) { + await this.connect() + const ret = new Promise((resolve, reject) => { + this.openEventPublishes.set(event.id, { resolve, reject }) + }) + this.ws?.send('["EVENT",' + JSON.stringify(event) + ']') + return ret + } + public async count(filters: Filter[], params: { id?: string | null }) { + await this.connect() + this.serial++ + const id = params?.id || 'count:' + this.serial + const ret = new Promise((resolve, reject) => { + this.openCountRequests.set(id, { resolve, reject }) + }) + this.ws?.send('["COUNT","' + id + '"' + JSON.stringify(filters) + ']') + return ret + } + + public async subscribe(filters: Filter[], params: SubscriptionParams & { id: string | undefined }) { + await this.connect() + this.serial++ + params.id = params.id || 'sub:' + this.serial + const subscription = new Subscription(this, filters, params) + this.openSubs.set(params.id, subscription) + this.ws?.send('["REQ","' + params.id + '"' + JSON.stringify(filters) + ']') return subscription } - - function _publishEvent(event: Event, type: string) { - return new Promise((resolve, reject) => { - if (!event.id) { - reject(new Error(`event ${event} has no id`)) - return - } - - let id = event.id - trySend([type, event]) - pubListeners[id] = { resolve, reject } - }) - } - - return { - url, - sub, - on: (type: T, cb: U): void => { - listeners[type].push(cb) - if (type === 'connect' && ws?.readyState === 1) { - // i would love to know why we need this - ;(cb as () => void)() - } - }, - off: (type: T, cb: U): void => { - let index = listeners[type].indexOf(cb) - if (index !== -1) listeners[type].splice(index, 1) - }, - list: (filters, opts?: SubscriptionOptions) => - new Promise(resolve => { - let s = sub(filters, opts) - let events: Event[] = [] - let timeout = setTimeout(() => { - s.unsub() - resolve(events) - }, listTimeout) - s.on('eose', () => { - s.unsub() - clearTimeout(timeout) - resolve(events) - }) - s.on('event', event => { - events.push(event) - }) - }), - get: (filter, opts?: SubscriptionOptions) => - new Promise(resolve => { - let s = sub([filter], opts) - let timeout = setTimeout(() => { - s.unsub() - resolve(null) - }, getTimeout) - s.on('event', event => { - s.unsub() - clearTimeout(timeout) - resolve(event) - }) - }), - count: (filters: Filter[]): Promise => - new Promise(resolve => { - let s = sub(filters, { ...sub, verb: 'COUNT' }) - let timeout = setTimeout(() => { - s.unsub() - resolve(null) - }, countTimeout) - s.on('count', (event: CountPayload) => { - s.unsub() - clearTimeout(timeout) - resolve(event) - }) - }), - async publish(event): Promise { - await _publishEvent(event, 'EVENT') - }, - async auth(event): Promise { - await _publishEvent(event, 'AUTH') - }, - connect, - close(): void { - listeners = newListeners() - subListeners = {} - pubListeners = {} - if (ws?.readyState === WebSocket.OPEN) { - ws.close() - } - }, - get status() { - return ws?.readyState ?? 3 - }, - } -} - -export async function* eventsGenerator(sub: Sub): AsyncGenerator { - let nextResolve: ((event: Event) => void) | undefined - const eventQueue: Event[] = [] - - const pushToQueue = (event: Event) => { - if (nextResolve) { - nextResolve(event) - nextResolve = undefined - } else { - eventQueue.push(event) - } - } - - sub.on('event', pushToQueue) - - try { - while (true) { - if (eventQueue.length > 0) { - yield eventQueue.shift()! - } else { - const event = await new Promise(resolve => { - nextResolve = resolve - }) - yield event - } - } - } finally { - sub.off('event', pushToQueue) - } } diff --git a/utils.ts b/utils.ts index affe6e8..cce4f6b 100644 --- a/utils.ts +++ b/utils.ts @@ -92,78 +92,44 @@ export function insertEventIntoAscendingList(sortedArray: Event[], event: Event) return sortedArray } -export class MessageNode { - private _value: string - private _next: MessageNode | null +export class QueueNode { + public value: V + public next: QueueNode | null - public get value(): string { - return this._value - } - public set value(message: string) { - this._value = message - } - public get next(): MessageNode | null { - return this._next - } - public set next(node: MessageNode | null) { - this._next = node - } - - constructor(message: string) { - this._value = message - this._next = null + constructor(message: V) { + this.value = message + this.next = null } } -export class MessageQueue { - private _first: MessageNode | null - private _last: MessageNode | null - - public get first(): MessageNode | null { - return this._first - } - public set first(messageNode: MessageNode | null) { - this._first = messageNode - } - public get last(): MessageNode | null { - return this._last - } - public set last(messageNode: MessageNode | null) { - this._last = messageNode - } - private _size: number - public get size(): number { - return this._size - } - public set size(v: number) { - this._size = v - } +export class Queue { + public first: QueueNode | null + public last: QueueNode | null constructor() { - this._first = null - this._last = null - this._size = 0 + this.first = null + this.last = null } - enqueue(message: string): boolean { - const newNode = new MessageNode(message) - if (this._size === 0 || !this._last) { - this._first = newNode - this._last = newNode + + enqueue(value: V): boolean { + const newNode = new QueueNode(value) + if (!this.last) { + this.first = newNode + this.last = newNode } else { - this._last.next = newNode - this._last = newNode + this.last.next = newNode + this.last = newNode } - this._size++ return true } - dequeue(): string | null { - if (this._size === 0 || !this._first) return null - let prev = this._first - this._first = prev.next + dequeue(): V | null { + if (!this.first) return null + + let prev = this.first + this.first = prev.next prev.next = null - this._size-- return prev.value } }