relay: `sub.events` async iterator

This commit is contained in:
Alex Gleason 2023-09-09 14:57:49 -05:00 committed by fiatjaf_
parent 401b9c7864
commit c18f050468
2 changed files with 58 additions and 0 deletions

View File

@ -57,6 +57,19 @@ test('querying', async () => {
expect(t2).toEqual(true)
}, 10000)
test('async iterator', async () => {
let sub = relay.sub([
{
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],
},
])
for await (const event of sub.events) {
expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027')
break
}
})
test('get()', async () => {
let event = await relay.get({
ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'],

View File

@ -39,6 +39,7 @@ export type Sub<K extends number = number> = {
unsub: () => void
on: <T extends keyof SubEvent<K>, U extends SubEvent<K>[T]>(event: T, listener: U) => void
off: <T extends keyof SubEvent<K>, U extends SubEvent<K>[T]>(event: T, listener: U) => void
events: AsyncGenerator<Event<K>, void, unknown>
}
export type SubscriptionOptions = {
@ -241,6 +242,47 @@ export function relayInit(
}
trySend([verb, subid, ...filters])
async function* eventsGenerator(): AsyncGenerator<Event<K>, void, unknown> {
let nextResolve: ((event: Event<K>) => void) | undefined
const eventQueue: Event<K>[] = []
const pushToQueue = (event: Event<K>) => {
if (nextResolve) {
nextResolve(event)
nextResolve = undefined
} else {
eventQueue.push(event)
}
}
// Register the event listener
if (!subListeners[subid]) {
subListeners[subid] = {
event: [],
count: [],
eose: []
}
}
subListeners[subid].event.push(pushToQueue)
try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
} else {
const event = await new Promise<Event<K>>((resolve) => {
nextResolve = resolve
})
yield event
}
}
} finally {
// Unregister the event listener when the generator is done
const idx = subListeners[subid].event.indexOf(pushToQueue)
if (idx >= 0) subListeners[subid].event.splice(idx, 1)
}
}
return {
sub: (newFilters, newOpts = {}) =>
sub(newFilters || filters, {
@ -266,6 +308,9 @@ export function relayInit(
let idx = listeners[type].indexOf(cb)
if (idx >= 0) listeners[type].splice(idx, 1)
},
get events() {
return eventsGenerator()
}
}
}