rewrite relay.ts to be much simpler.

This commit is contained in:
fiatjaf
2023-12-16 18:56:15 -03:00
parent 3d541e537e
commit 7a640092d0
3 changed files with 265 additions and 413 deletions

View File

@@ -49,7 +49,7 @@ export function getEventHash(event: UnsignedEvent): string {
const isRecord = (obj: unknown): obj is Record<string, unknown> => obj instanceof Object
export function validateEvent<T>(event: T): event is T & UnsignedEvent {
export function validateEvent(event: UnsignedEvent): boolean {
if (!isRecord(event)) return false
if (typeof event.kind !== 'number') return false
if (typeof event.content !== 'string') return false
@@ -70,7 +70,7 @@ export function validateEvent<T>(event: T): event is T & UnsignedEvent {
}
/** Verify the event's signature. This function mutates the event with a `verified` symbol, making it idempotent. */
export function verifySignature(event: Event): event is VerifiedEvent {
export function verifySignature(event: Event): boolean {
if (typeof event[verifiedSymbol] === 'boolean') return event[verifiedSymbol]
const hash = getEventHash(event)

592
relay.ts
View File

@@ -1,398 +1,284 @@
/* global WebSocket */
import { verifySignature, validateEvent, type Event } from './event.ts'
import { verifySignature, validateEvent, type Event, EventTemplate } from './event.ts'
import { matchFilters, type Filter } from './filter.ts'
import { getHex64, getSubscriptionId } from './fakejson.ts'
import { MessageQueue } from './utils.ts'
import { Queue, normalizeURL } from './utils.ts'
import { nip42 } from './index.ts'
type RelayEvent = {
connect: () => void | Promise<void>
disconnect: () => void | Promise<void>
error: () => void | Promise<void>
notice: (msg: string) => void | Promise<void>
auth: (challenge: string) => void | Promise<void>
}
export type CountPayload = {
count: number
}
export type SubEvent = {
event: (event: Event) => void | Promise<void>
count: (payload: CountPayload) => void | Promise<void>
eose: () => void | Promise<void>
}
export type Relay = {
url: string
status: number
connect: () => Promise<void>
close: () => void
sub: (filters: Filter[], opts?: SubscriptionOptions) => Sub
list: (filters: Filter[], opts?: SubscriptionOptions) => Promise<Event[]>
get: (filter: Filter, opts?: SubscriptionOptions) => Promise<Event | null>
count: (filters: Filter[], opts?: SubscriptionOptions) => Promise<CountPayload | null>
publish: (event: Event) => Promise<void>
auth: (event: Event) => Promise<void>
off: <T extends keyof RelayEvent, U extends RelayEvent[T]>(event: T, listener: U) => void
on: <T extends keyof RelayEvent, U extends RelayEvent[T]>(event: T, listener: U) => void
}
export type Sub = {
sub: (filters: Filter[], opts: SubscriptionOptions) => Sub
unsub: () => void
on: <T extends keyof SubEvent, U extends SubEvent[T]>(event: T, listener: U) => void
off: <T extends keyof SubEvent, U extends SubEvent[T]>(event: T, listener: U) => void
events: AsyncGenerator<Event, void, unknown>
export function relayConnect(url: string) {
const relay = new Relay(url)
relay.connect()
return relay
}
export type SubscriptionOptions = {
id?: string
verb?: 'REQ' | 'COUNT'
skipVerification?: boolean
alreadyHaveEvent?: null | ((id: string, relay: string) => boolean)
eoseSubTimeout?: number
}
class Subscription {
public readonly relay: Relay
public readonly id: string
public closed: boolean = false
public eosed: boolean = false
const newListeners = (): { [TK in keyof RelayEvent]: RelayEvent[TK][] } => ({
connect: [],
disconnect: [],
error: [],
notice: [],
auth: [],
})
public alreadyHaveEvent: ((id: string) => boolean) | null = null
public receivedEvent: ((id: string) => boolean) | null = null
public readonly filters: Filter[]
export function relayInit(
url: string,
options: {
getTimeout?: number
listTimeout?: number
countTimeout?: number
} = {},
): Relay {
let { listTimeout = 3000, getTimeout = 3000, countTimeout = 3000 } = options
public onevent: (evt: Event) => void
public oneose: (() => void) | null = null
public onclose: ((reason: string) => void) | null = null
var ws: WebSocket
var openSubs: { [id: string]: { filters: Filter[] } & SubscriptionOptions } = {}
var listeners = newListeners()
var subListeners: {
[subid: string]: { [TK in keyof SubEvent]: SubEvent[TK][] }
} = {}
var pubListeners: {
[eventid: string]: {
resolve: (_: unknown) => void
reject: (err: Error) => void
constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) {
this.relay = relay
this.filters = filters
this.id = params.id
this.onevent = params.onevent
this.oneose = params.oneose || null
this.onclose = params.onclose || null
this.alreadyHaveEvent = params.alreadyHaveEvent || null
this.receivedEvent = params.receivedEvent || null
}
public close(reason: string) {
if (!this.closed) {
// if the connection was closed by the user calling .close() we will send a CLOSE message
// otherwise this._open will be already set to false so we will skip this
this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']')
this.closed = true
}
} = {}
this.onclose?.(reason)
}
}
var connectionPromise: Promise<void> | undefined
async function connectRelay(): Promise<void> {
if (connectionPromise) return connectionPromise
connectionPromise = new Promise((resolve, reject) => {
type SubscriptionParams = {
id: string
onevent: (evt: Event) => void
oneose?: () => void
onclose?: (reason: string) => void
alreadyHaveEvent: ((id: string) => boolean) | null
receivedEvent: ((id: string) => boolean) | null
}
type CountResolver = {
resolve: (count: number) => void
reject: (err: Error) => void
}
type EventPublishResolver = {
resolve: (reason: string) => void
reject: (err: Error) => void
}
class Relay {
public readonly url: string
private _connected: boolean = false
public trusted: boolean = false
public onclose: (() => void) | null = null
public onnotice: (msg: string) => void = console.log
private connectionPromise: Promise<void> | undefined
private openSubs = new Map<string, Subscription>()
private openCountRequests = new Map<string, CountResolver>()
private openEventPublishes = new Map<string, EventPublishResolver>()
private ws: WebSocket | undefined
private incomingMessageQueue = new Queue<string>()
private handleNextInterval: ReturnType<typeof setInterval> | null = null
private challenge: string | undefined
private serial: number = 0
constructor(url: string) {
this.url = normalizeURL(url)
}
private closeAllSubscriptions(reason: string) {
for (let [_, sub] of this.openSubs) {
sub.close(reason)
}
this.openSubs.clear()
for (let [_, ep] of this.openEventPublishes) {
ep.reject(new Error(reason))
}
this.openEventPublishes.clear()
for (let [_, cr] of this.openCountRequests) {
cr.reject(new Error(reason))
}
this.openCountRequests.clear()
}
public get connected(): boolean {
return this._connected
}
public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise
this.connectionPromise = new Promise((resolve, reject) => {
try {
ws = new WebSocket(url)
this.ws = new WebSocket(this.url)
} catch (err) {
reject(err)
return
}
ws.onopen = () => {
listeners.connect.forEach(cb => cb())
this.ws.onopen = () => {
this._connected = true
resolve()
}
ws.onerror = () => {
connectionPromise = undefined
listeners.error.forEach(cb => cb())
this.ws.onerror = () => {
reject()
}
ws.onclose = async () => {
connectionPromise = undefined
listeners.disconnect.forEach(cb => cb())
}
let incomingMessageQueue: MessageQueue = new MessageQueue()
let handleNextInterval: any
ws.onmessage = e => {
incomingMessageQueue.enqueue(e.data)
if (!handleNextInterval) {
handleNextInterval = setInterval(handleNext, 0)
if (this._connected) {
this.onclose?.()
this.closeAllSubscriptions('relay connection errored')
this._connected = false
}
}
function handleNext() {
if (incomingMessageQueue.size === 0) {
clearInterval(handleNextInterval)
handleNextInterval = null
return
}
this.ws.onclose = async () => {
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection closed')
this._connected = false
}
var json = incomingMessageQueue.dequeue()
if (!json) return
let subid = getSubscriptionId(json)
if (subid) {
let so = openSubs[subid]
if (so && so.alreadyHaveEvent && so.alreadyHaveEvent(getHex64(json, 'id'), url)) {
return
}
}
try {
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
let id = data[1]
let event = data[2]
if (
validateEvent(event) &&
openSubs[id] &&
(openSubs[id].skipVerification || verifySignature(event)) &&
matchFilters(openSubs[id].filters, event)
) {
openSubs[id]
;(subListeners[id]?.event || []).forEach(cb => cb(event))
}
return
}
case 'COUNT':
let id = data[1]
let payload = data[2]
if (openSubs[id]) {
;(subListeners[id]?.count || []).forEach(cb => cb(payload))
}
return
case 'EOSE': {
let id = data[1]
if (id in subListeners) {
subListeners[id].eose.forEach(cb => cb())
subListeners[id].eose = [] // 'eose' only happens once per sub, so stop listeners here
}
return
}
case 'OK': {
let id: string = data[1]
let ok: boolean = data[2]
let reason: string = data[3] || ''
if (id in pubListeners) {
let { resolve, reject } = pubListeners[id]
if (ok) resolve(null)
else reject(new Error(reason))
}
return
}
case 'NOTICE':
let notice = data[1]
listeners.notice.forEach(cb => cb(notice))
return
case 'AUTH': {
let challenge = data[1]
listeners.auth?.forEach(cb => cb(challenge))
return
}
}
} catch (err) {
return
this.ws.onmessage = ev => {
this.incomingMessageQueue.enqueue(ev.data as string)
if (!this.handleNextInterval) {
this.handleNextInterval = setInterval(this.handleNext.bind(this), 0)
}
}
})
return connectionPromise
}
function connected() {
return ws?.readyState === 1
}
private handleNext() {
const json = this.incomingMessageQueue.dequeue()
if (!json) {
clearInterval(this.handleNextInterval as ReturnType<typeof setInterval>)
this.handleNextInterval = null
return
}
async function connect(): Promise<void> {
if (connected()) return // ws already open
await connectRelay()
}
const subid = getSubscriptionId(json)
if (subid) {
const so = this.openSubs.get(subid as string)
if (!so) {
// this is an EVENT message, but for a subscription we don't have, so just stop here
return
}
async function trySend(params: [string, ...any]) {
let msg = JSON.stringify(params)
if (!connected()) {
await new Promise(resolve => setTimeout(resolve, 1000))
if (!connected()) {
// this will be called only when this message is a EVENT message for a subscription we have
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
so.receivedEvent?.(id) // this is so the client knows this relay had this event
if (so.alreadyHaveEvent?.(id)) {
// if we had already seen this event we can just stop here
return
}
}
try {
ws.send(msg)
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
const so = this.openSubs.get(data[1] as string) as Subscription
const event = data[2] as Event
if ((this.trusted || (validateEvent(event) && verifySignature(event))) && matchFilters(so.filters, event)) {
so.onevent(event)
}
return
}
case 'COUNT': {
const id: string = data[1]
const payload = data[2] as { count: number }
const cr = this.openCountRequests.get(id) as CountResolver
if (cr) {
cr.resolve(payload.count)
this.openCountRequests.delete(id)
}
return
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so || so.eosed) return
so.eosed = true
so.oneose?.()
return
}
case 'OK': {
const id: string = data[1]
const ok: boolean = data[2]
const reason: string = data[3]
const ep = this.openEventPublishes.get(id) as EventPublishResolver
if (ok) ep.resolve(reason)
else ep.reject(new Error(reason))
this.openEventPublishes.delete(id)
return
}
case 'CLOSED': {
const id: string = data[1]
const so = this.openSubs.get(id)
if (!so) return
so.closed = true
so.close(data[2] as string)
this.openSubs.delete(id)
return
}
case 'NOTICE':
this.onnotice(data[1] as string)
return
case 'AUTH': {
this.challenge = data[1] as string
return
}
}
} catch (err) {
console.log(err)
return
}
}
const sub = (
filters: Filter[],
{
verb = 'REQ',
skipVerification = false,
alreadyHaveEvent = null,
id = Math.random().toString().slice(2),
}: SubscriptionOptions = {},
): Sub => {
let subid = id
public async send(message: string) {
await this.connect()
this.ws?.send(message)
}
openSubs[subid] = {
id: subid,
filters,
skipVerification,
alreadyHaveEvent,
}
trySend([verb, subid, ...filters])
public async auth(signAuthEvent: (authEvent: EventTemplate) => Promise<void>) {
if (!this.challenge) throw new Error("can't perform auth, no challenge was received")
const evt = nip42.makeAuthEvent(this.url, this.challenge)
await Promise.all([signAuthEvent(evt), this.connect()])
this.ws?.send('["AUTH",' + JSON.stringify(evt) + ']')
}
let subscription: Sub = {
sub: (newFilters, newOpts = {}) =>
sub(newFilters || filters, {
skipVerification: newOpts.skipVerification || skipVerification,
alreadyHaveEvent: newOpts.alreadyHaveEvent || alreadyHaveEvent,
id: subid,
}),
unsub: () => {
delete openSubs[subid]
delete subListeners[subid]
trySend(['CLOSE', subid])
},
on: (type, cb) => {
subListeners[subid] = subListeners[subid] || {
event: [],
count: [],
eose: [],
}
subListeners[subid][type].push(cb)
},
off: (type, cb): void => {
let listeners = subListeners[subid]
let idx = listeners[type].indexOf(cb)
if (idx >= 0) listeners[type].splice(idx, 1)
},
get events() {
return eventsGenerator(subscription)
},
}
public async publish(event: Event) {
await this.connect()
const ret = new Promise((resolve, reject) => {
this.openEventPublishes.set(event.id, { resolve, reject })
})
this.ws?.send('["EVENT",' + JSON.stringify(event) + ']')
return ret
}
public async count(filters: Filter[], params: { id?: string | null }) {
await this.connect()
this.serial++
const id = params?.id || 'count:' + this.serial
const ret = new Promise((resolve, reject) => {
this.openCountRequests.set(id, { resolve, reject })
})
this.ws?.send('["COUNT","' + id + '"' + JSON.stringify(filters) + ']')
return ret
}
public async subscribe(filters: Filter[], params: SubscriptionParams & { id: string | undefined }) {
await this.connect()
this.serial++
params.id = params.id || 'sub:' + this.serial
const subscription = new Subscription(this, filters, params)
this.openSubs.set(params.id, subscription)
this.ws?.send('["REQ","' + params.id + '"' + JSON.stringify(filters) + ']')
return subscription
}
function _publishEvent(event: Event, type: string) {
return new Promise((resolve, reject) => {
if (!event.id) {
reject(new Error(`event ${event} has no id`))
return
}
let id = event.id
trySend([type, event])
pubListeners[id] = { resolve, reject }
})
}
return {
url,
sub,
on: <T extends keyof RelayEvent, U extends RelayEvent[T]>(type: T, cb: U): void => {
listeners[type].push(cb)
if (type === 'connect' && ws?.readyState === 1) {
// i would love to know why we need this
;(cb as () => void)()
}
},
off: <T extends keyof RelayEvent, U extends RelayEvent[T]>(type: T, cb: U): void => {
let index = listeners[type].indexOf(cb)
if (index !== -1) listeners[type].splice(index, 1)
},
list: (filters, opts?: SubscriptionOptions) =>
new Promise(resolve => {
let s = sub(filters, opts)
let events: Event[] = []
let timeout = setTimeout(() => {
s.unsub()
resolve(events)
}, listTimeout)
s.on('eose', () => {
s.unsub()
clearTimeout(timeout)
resolve(events)
})
s.on('event', event => {
events.push(event)
})
}),
get: (filter, opts?: SubscriptionOptions) =>
new Promise(resolve => {
let s = sub([filter], opts)
let timeout = setTimeout(() => {
s.unsub()
resolve(null)
}, getTimeout)
s.on('event', event => {
s.unsub()
clearTimeout(timeout)
resolve(event)
})
}),
count: (filters: Filter[]): Promise<CountPayload | null> =>
new Promise(resolve => {
let s = sub(filters, { ...sub, verb: 'COUNT' })
let timeout = setTimeout(() => {
s.unsub()
resolve(null)
}, countTimeout)
s.on('count', (event: CountPayload) => {
s.unsub()
clearTimeout(timeout)
resolve(event)
})
}),
async publish(event): Promise<void> {
await _publishEvent(event, 'EVENT')
},
async auth(event): Promise<void> {
await _publishEvent(event, 'AUTH')
},
connect,
close(): void {
listeners = newListeners()
subListeners = {}
pubListeners = {}
if (ws?.readyState === WebSocket.OPEN) {
ws.close()
}
},
get status() {
return ws?.readyState ?? 3
},
}
}
export async function* eventsGenerator(sub: Sub): AsyncGenerator<Event, void, unknown> {
let nextResolve: ((event: Event) => void) | undefined
const eventQueue: Event[] = []
const pushToQueue = (event: Event) => {
if (nextResolve) {
nextResolve(event)
nextResolve = undefined
} else {
eventQueue.push(event)
}
}
sub.on('event', pushToQueue)
try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
} else {
const event = await new Promise<Event>(resolve => {
nextResolve = resolve
})
yield event
}
}
} finally {
sub.off('event', pushToQueue)
}
}

View File

@@ -92,78 +92,44 @@ export function insertEventIntoAscendingList(sortedArray: Event[], event: Event)
return sortedArray
}
export class MessageNode {
private _value: string
private _next: MessageNode | null
export class QueueNode<V> {
public value: V
public next: QueueNode<V> | null
public get value(): string {
return this._value
}
public set value(message: string) {
this._value = message
}
public get next(): MessageNode | null {
return this._next
}
public set next(node: MessageNode | null) {
this._next = node
}
constructor(message: string) {
this._value = message
this._next = null
constructor(message: V) {
this.value = message
this.next = null
}
}
export class MessageQueue {
private _first: MessageNode | null
private _last: MessageNode | null
public get first(): MessageNode | null {
return this._first
}
public set first(messageNode: MessageNode | null) {
this._first = messageNode
}
public get last(): MessageNode | null {
return this._last
}
public set last(messageNode: MessageNode | null) {
this._last = messageNode
}
private _size: number
public get size(): number {
return this._size
}
public set size(v: number) {
this._size = v
}
export class Queue<V> {
public first: QueueNode<V> | null
public last: QueueNode<V> | null
constructor() {
this._first = null
this._last = null
this._size = 0
this.first = null
this.last = null
}
enqueue(message: string): boolean {
const newNode = new MessageNode(message)
if (this._size === 0 || !this._last) {
this._first = newNode
this._last = newNode
enqueue(value: V): boolean {
const newNode = new QueueNode(value)
if (!this.last) {
this.first = newNode
this.last = newNode
} else {
this._last.next = newNode
this._last = newNode
this.last.next = newNode
this.last = newNode
}
this._size++
return true
}
dequeue(): string | null {
if (this._size === 0 || !this._first) return null
let prev = this._first
this._first = prev.next
dequeue(): V | null {
if (!this.first) return null
let prev = this.first
this.first = prev.next
prev.next = null
this._size--
return prev.value
}
}