399 lines
11 KiB
TypeScript
399 lines
11 KiB
TypeScript
/* global WebSocket */
|
|
|
|
import { verifySignature, validateEvent, type Event } from './event.ts'
|
|
import { matchFilters, type Filter } from './filter.ts'
|
|
import { getHex64, getSubscriptionId } from './fakejson.ts'
|
|
import { MessageQueue } from './utils.ts'
|
|
|
|
type RelayEvent = {
|
|
connect: () => void | Promise<void>
|
|
disconnect: () => void | Promise<void>
|
|
error: () => void | Promise<void>
|
|
notice: (msg: string) => void | Promise<void>
|
|
auth: (challenge: string) => void | Promise<void>
|
|
}
|
|
export type CountPayload = {
|
|
count: number
|
|
}
|
|
export type SubEvent = {
|
|
event: (event: Event) => void | Promise<void>
|
|
count: (payload: CountPayload) => void | Promise<void>
|
|
eose: () => void | Promise<void>
|
|
}
|
|
export type Relay = {
|
|
url: string
|
|
status: number
|
|
connect: () => Promise<void>
|
|
close: () => void
|
|
sub: (filters: Filter[], opts?: SubscriptionOptions) => Sub
|
|
list: (filters: Filter[], opts?: SubscriptionOptions) => Promise<Event[]>
|
|
get: (filter: Filter, opts?: SubscriptionOptions) => Promise<Event | null>
|
|
count: (filters: Filter[], opts?: SubscriptionOptions) => Promise<CountPayload | null>
|
|
publish: (event: Event) => Promise<void>
|
|
auth: (event: Event) => Promise<void>
|
|
off: <T extends keyof RelayEvent, U extends RelayEvent[T]>(event: T, listener: U) => void
|
|
on: <T extends keyof RelayEvent, U extends RelayEvent[T]>(event: T, listener: U) => void
|
|
}
|
|
export type Sub = {
|
|
sub: (filters: Filter[], opts: SubscriptionOptions) => Sub
|
|
unsub: () => void
|
|
on: <T extends keyof SubEvent, U extends SubEvent[T]>(event: T, listener: U) => void
|
|
off: <T extends keyof SubEvent, U extends SubEvent[T]>(event: T, listener: U) => void
|
|
events: AsyncGenerator<Event, void, unknown>
|
|
}
|
|
|
|
export type SubscriptionOptions = {
|
|
id?: string
|
|
verb?: 'REQ' | 'COUNT'
|
|
skipVerification?: boolean
|
|
alreadyHaveEvent?: null | ((id: string, relay: string) => boolean)
|
|
eoseSubTimeout?: number
|
|
}
|
|
|
|
const newListeners = (): { [TK in keyof RelayEvent]: RelayEvent[TK][] } => ({
|
|
connect: [],
|
|
disconnect: [],
|
|
error: [],
|
|
notice: [],
|
|
auth: [],
|
|
})
|
|
|
|
export function relayInit(
|
|
url: string,
|
|
options: {
|
|
getTimeout?: number
|
|
listTimeout?: number
|
|
countTimeout?: number
|
|
} = {},
|
|
): Relay {
|
|
let { listTimeout = 3000, getTimeout = 3000, countTimeout = 3000 } = options
|
|
|
|
var ws: WebSocket
|
|
var openSubs: { [id: string]: { filters: Filter[] } & SubscriptionOptions } = {}
|
|
var listeners = newListeners()
|
|
var subListeners: {
|
|
[subid: string]: { [TK in keyof SubEvent]: SubEvent[TK][] }
|
|
} = {}
|
|
var pubListeners: {
|
|
[eventid: string]: {
|
|
resolve: (_: unknown) => void
|
|
reject: (err: Error) => void
|
|
}
|
|
} = {}
|
|
|
|
var connectionPromise: Promise<void> | undefined
|
|
async function connectRelay(): Promise<void> {
|
|
if (connectionPromise) return connectionPromise
|
|
connectionPromise = new Promise((resolve, reject) => {
|
|
try {
|
|
ws = new WebSocket(url)
|
|
} catch (err) {
|
|
reject(err)
|
|
}
|
|
|
|
ws.onopen = () => {
|
|
listeners.connect.forEach(cb => cb())
|
|
resolve()
|
|
}
|
|
ws.onerror = () => {
|
|
connectionPromise = undefined
|
|
listeners.error.forEach(cb => cb())
|
|
reject()
|
|
}
|
|
ws.onclose = async () => {
|
|
connectionPromise = undefined
|
|
listeners.disconnect.forEach(cb => cb())
|
|
}
|
|
|
|
let incomingMessageQueue: MessageQueue = new MessageQueue()
|
|
let handleNextInterval: any
|
|
|
|
ws.onmessage = e => {
|
|
incomingMessageQueue.enqueue(e.data)
|
|
if (!handleNextInterval) {
|
|
handleNextInterval = setInterval(handleNext, 0)
|
|
}
|
|
}
|
|
|
|
function handleNext() {
|
|
if (incomingMessageQueue.size === 0) {
|
|
clearInterval(handleNextInterval)
|
|
handleNextInterval = null
|
|
return
|
|
}
|
|
|
|
var json = incomingMessageQueue.dequeue()
|
|
if (!json) return
|
|
|
|
let subid = getSubscriptionId(json)
|
|
if (subid) {
|
|
let so = openSubs[subid]
|
|
if (so && so.alreadyHaveEvent && so.alreadyHaveEvent(getHex64(json, 'id'), url)) {
|
|
return
|
|
}
|
|
}
|
|
|
|
try {
|
|
let data = JSON.parse(json)
|
|
|
|
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
|
|
// will naturally be caught by the encompassing try..catch block
|
|
|
|
switch (data[0]) {
|
|
case 'EVENT': {
|
|
let id = data[1]
|
|
let event = data[2]
|
|
if (
|
|
validateEvent(event) &&
|
|
openSubs[id] &&
|
|
(openSubs[id].skipVerification || verifySignature(event)) &&
|
|
matchFilters(openSubs[id].filters, event)
|
|
) {
|
|
openSubs[id]
|
|
;(subListeners[id]?.event || []).forEach(cb => cb(event))
|
|
}
|
|
return
|
|
}
|
|
case 'COUNT':
|
|
let id = data[1]
|
|
let payload = data[2]
|
|
if (openSubs[id]) {
|
|
;(subListeners[id]?.count || []).forEach(cb => cb(payload))
|
|
}
|
|
return
|
|
case 'EOSE': {
|
|
let id = data[1]
|
|
if (id in subListeners) {
|
|
subListeners[id].eose.forEach(cb => cb())
|
|
subListeners[id].eose = [] // 'eose' only happens once per sub, so stop listeners here
|
|
}
|
|
return
|
|
}
|
|
case 'OK': {
|
|
let id: string = data[1]
|
|
let ok: boolean = data[2]
|
|
let reason: string = data[3] || ''
|
|
if (id in pubListeners) {
|
|
let { resolve, reject } = pubListeners[id]
|
|
if (ok) resolve(null)
|
|
else reject(new Error(reason))
|
|
}
|
|
return
|
|
}
|
|
case 'NOTICE':
|
|
let notice = data[1]
|
|
listeners.notice.forEach(cb => cb(notice))
|
|
return
|
|
case 'AUTH': {
|
|
let challenge = data[1]
|
|
listeners.auth?.forEach(cb => cb(challenge))
|
|
return
|
|
}
|
|
}
|
|
} catch (err) {
|
|
return
|
|
}
|
|
}
|
|
})
|
|
|
|
return connectionPromise
|
|
}
|
|
|
|
function connected() {
|
|
return ws?.readyState === 1
|
|
}
|
|
|
|
async function connect(): Promise<void> {
|
|
if (connected()) return // ws already open
|
|
await connectRelay()
|
|
}
|
|
|
|
async function trySend(params: [string, ...any]) {
|
|
let msg = JSON.stringify(params)
|
|
if (!connected()) {
|
|
await new Promise(resolve => setTimeout(resolve, 1000))
|
|
if (!connected()) {
|
|
return
|
|
}
|
|
}
|
|
try {
|
|
ws.send(msg)
|
|
} catch (err) {
|
|
console.log(err)
|
|
}
|
|
}
|
|
|
|
const sub = (
|
|
filters: Filter[],
|
|
{
|
|
verb = 'REQ',
|
|
skipVerification = false,
|
|
alreadyHaveEvent = null,
|
|
id = Math.random().toString().slice(2),
|
|
}: SubscriptionOptions = {},
|
|
): Sub => {
|
|
let subid = id
|
|
|
|
openSubs[subid] = {
|
|
id: subid,
|
|
filters,
|
|
skipVerification,
|
|
alreadyHaveEvent,
|
|
}
|
|
trySend([verb, subid, ...filters])
|
|
|
|
let subscription: Sub = {
|
|
sub: (newFilters, newOpts = {}) =>
|
|
sub(newFilters || filters, {
|
|
skipVerification: newOpts.skipVerification || skipVerification,
|
|
alreadyHaveEvent: newOpts.alreadyHaveEvent || alreadyHaveEvent,
|
|
id: subid,
|
|
}),
|
|
unsub: () => {
|
|
delete openSubs[subid]
|
|
delete subListeners[subid]
|
|
trySend(['CLOSE', subid])
|
|
},
|
|
on: (type, cb) => {
|
|
subListeners[subid] = subListeners[subid] || {
|
|
event: [],
|
|
count: [],
|
|
eose: [],
|
|
}
|
|
subListeners[subid][type].push(cb)
|
|
},
|
|
off: (type, cb): void => {
|
|
let listeners = subListeners[subid]
|
|
let idx = listeners[type].indexOf(cb)
|
|
if (idx >= 0) listeners[type].splice(idx, 1)
|
|
},
|
|
get events() {
|
|
return eventsGenerator(subscription)
|
|
},
|
|
}
|
|
|
|
return subscription
|
|
}
|
|
|
|
function _publishEvent(event: Event, type: string) {
|
|
return new Promise((resolve, reject) => {
|
|
if (!event.id) {
|
|
reject(new Error(`event ${event} has no id`))
|
|
return
|
|
}
|
|
|
|
let id = event.id
|
|
trySend([type, event])
|
|
pubListeners[id] = { resolve, reject }
|
|
})
|
|
}
|
|
|
|
return {
|
|
url,
|
|
sub,
|
|
on: <T extends keyof RelayEvent, U extends RelayEvent[T]>(type: T, cb: U): void => {
|
|
listeners[type].push(cb)
|
|
if (type === 'connect' && ws?.readyState === 1) {
|
|
// i would love to know why we need this
|
|
;(cb as () => void)()
|
|
}
|
|
},
|
|
off: <T extends keyof RelayEvent, U extends RelayEvent[T]>(type: T, cb: U): void => {
|
|
let index = listeners[type].indexOf(cb)
|
|
if (index !== -1) listeners[type].splice(index, 1)
|
|
},
|
|
list: (filters, opts?: SubscriptionOptions) =>
|
|
new Promise(resolve => {
|
|
let s = sub(filters, opts)
|
|
let events: Event[] = []
|
|
let timeout = setTimeout(() => {
|
|
s.unsub()
|
|
resolve(events)
|
|
}, listTimeout)
|
|
s.on('eose', () => {
|
|
s.unsub()
|
|
clearTimeout(timeout)
|
|
resolve(events)
|
|
})
|
|
s.on('event', event => {
|
|
events.push(event)
|
|
})
|
|
}),
|
|
get: (filter, opts?: SubscriptionOptions) =>
|
|
new Promise(resolve => {
|
|
let s = sub([filter], opts)
|
|
let timeout = setTimeout(() => {
|
|
s.unsub()
|
|
resolve(null)
|
|
}, getTimeout)
|
|
s.on('event', event => {
|
|
s.unsub()
|
|
clearTimeout(timeout)
|
|
resolve(event)
|
|
})
|
|
}),
|
|
count: (filters: Filter[]): Promise<CountPayload | null> =>
|
|
new Promise(resolve => {
|
|
let s = sub(filters, { ...sub, verb: 'COUNT' })
|
|
let timeout = setTimeout(() => {
|
|
s.unsub()
|
|
resolve(null)
|
|
}, countTimeout)
|
|
s.on('count', (event: CountPayload) => {
|
|
s.unsub()
|
|
clearTimeout(timeout)
|
|
resolve(event)
|
|
})
|
|
}),
|
|
async publish(event): Promise<void> {
|
|
await _publishEvent(event, 'EVENT')
|
|
},
|
|
async auth(event): Promise<void> {
|
|
await _publishEvent(event, 'AUTH')
|
|
},
|
|
connect,
|
|
close(): void {
|
|
listeners = newListeners()
|
|
subListeners = {}
|
|
pubListeners = {}
|
|
if (ws?.readyState === WebSocket.OPEN) {
|
|
ws.close()
|
|
}
|
|
},
|
|
get status() {
|
|
return ws?.readyState ?? 3
|
|
},
|
|
}
|
|
}
|
|
|
|
export async function* eventsGenerator(sub: Sub): AsyncGenerator<Event, void, unknown> {
|
|
let nextResolve: ((event: Event) => void) | undefined
|
|
const eventQueue: Event[] = []
|
|
|
|
const pushToQueue = (event: Event) => {
|
|
if (nextResolve) {
|
|
nextResolve(event)
|
|
nextResolve = undefined
|
|
} else {
|
|
eventQueue.push(event)
|
|
}
|
|
}
|
|
|
|
sub.on('event', pushToQueue)
|
|
|
|
try {
|
|
while (true) {
|
|
if (eventQueue.length > 0) {
|
|
yield eventQueue.shift()!
|
|
} else {
|
|
const event = await new Promise<Event>(resolve => {
|
|
nextResolve = resolve
|
|
})
|
|
yield event
|
|
}
|
|
}
|
|
} finally {
|
|
sub.off('event', pushToQueue)
|
|
}
|
|
}
|