trick typescript into accepting our types.
This commit is contained in:
parent
55f032d0a4
commit
13bc2ad5a8
9
pool.ts
9
pool.ts
|
@ -1,4 +1,4 @@
|
||||||
import { relayInit, type Relay, type Sub, type SubscriptionOptions } from './relay.ts'
|
import { relayInit, eventsGenerator, type Relay, type Sub, type SubscriptionOptions } from './relay.ts'
|
||||||
import { normalizeURL } from './utils.ts'
|
import { normalizeURL } from './utils.ts'
|
||||||
|
|
||||||
import type { Event } from './event.ts'
|
import type { Event } from './event.ts'
|
||||||
|
@ -118,10 +118,10 @@ export class SimplePool {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
let greaterSub: Sub = {
|
let greaterSub: Sub<K> = {
|
||||||
sub(filters, opts) {
|
sub(filters, opts) {
|
||||||
subs.forEach(sub => sub.sub(filters, opts))
|
subs.forEach(sub => sub.sub(filters, opts))
|
||||||
return greaterSub
|
return greaterSub as any
|
||||||
},
|
},
|
||||||
unsub() {
|
unsub() {
|
||||||
subs.forEach(sub => sub.unsub())
|
subs.forEach(sub => sub.unsub())
|
||||||
|
@ -138,6 +138,9 @@ export class SimplePool {
|
||||||
eventListeners.delete(cb)
|
eventListeners.delete(cb)
|
||||||
} else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise<void>)
|
} else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise<void>)
|
||||||
},
|
},
|
||||||
|
get events() {
|
||||||
|
return eventsGenerator(greaterSub)
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return greaterSub
|
return greaterSub
|
||||||
|
|
82
relay.ts
82
relay.ts
|
@ -15,7 +15,7 @@ type RelayEvent = {
|
||||||
export type CountPayload = {
|
export type CountPayload = {
|
||||||
count: number
|
count: number
|
||||||
}
|
}
|
||||||
type SubEvent<K extends number> = {
|
export type SubEvent<K extends number> = {
|
||||||
event: (event: Event<K>) => void | Promise<void>
|
event: (event: Event<K>) => void | Promise<void>
|
||||||
count: (payload: CountPayload) => void | Promise<void>
|
count: (payload: CountPayload) => void | Promise<void>
|
||||||
eose: () => void | Promise<void>
|
eose: () => void | Promise<void>
|
||||||
|
@ -242,48 +242,7 @@ export function relayInit(
|
||||||
}
|
}
|
||||||
trySend([verb, subid, ...filters])
|
trySend([verb, subid, ...filters])
|
||||||
|
|
||||||
async function* eventsGenerator(): AsyncGenerator<Event<K>, void, unknown> {
|
let subscription: Sub<K> = {
|
||||||
let nextResolve: ((event: Event<K>) => void) | undefined
|
|
||||||
const eventQueue: Event<K>[] = []
|
|
||||||
|
|
||||||
const pushToQueue = (event: Event<K>) => {
|
|
||||||
if (nextResolve) {
|
|
||||||
nextResolve(event)
|
|
||||||
nextResolve = undefined
|
|
||||||
} else {
|
|
||||||
eventQueue.push(event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the event listener
|
|
||||||
if (!subListeners[subid]) {
|
|
||||||
subListeners[subid] = {
|
|
||||||
event: [],
|
|
||||||
count: [],
|
|
||||||
eose: []
|
|
||||||
}
|
|
||||||
}
|
|
||||||
subListeners[subid].event.push(pushToQueue)
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
if (eventQueue.length > 0) {
|
|
||||||
yield eventQueue.shift()!
|
|
||||||
} else {
|
|
||||||
const event = await new Promise<Event<K>>((resolve) => {
|
|
||||||
nextResolve = resolve
|
|
||||||
})
|
|
||||||
yield event
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
// Unregister the event listener when the generator is done
|
|
||||||
const idx = subListeners[subid].event.indexOf(pushToQueue)
|
|
||||||
if (idx >= 0) subListeners[subid].event.splice(idx, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
sub: (newFilters, newOpts = {}) =>
|
sub: (newFilters, newOpts = {}) =>
|
||||||
sub(newFilters || filters, {
|
sub(newFilters || filters, {
|
||||||
skipVerification: newOpts.skipVerification || skipVerification,
|
skipVerification: newOpts.skipVerification || skipVerification,
|
||||||
|
@ -309,9 +268,11 @@ export function relayInit(
|
||||||
if (idx >= 0) listeners[type].splice(idx, 1)
|
if (idx >= 0) listeners[type].splice(idx, 1)
|
||||||
},
|
},
|
||||||
get events() {
|
get events() {
|
||||||
return eventsGenerator()
|
return eventsGenerator(subscription)
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
function _publishEvent(event: Event<number>, type: string) {
|
function _publishEvent(event: Event<number>, type: string) {
|
||||||
|
@ -404,3 +365,34 @@ export function relayInit(
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function* eventsGenerator<K extends number>(sub: Sub<K>): AsyncGenerator<Event<K>, void, unknown> {
|
||||||
|
let nextResolve: ((event: Event<K>) => void) | undefined
|
||||||
|
const eventQueue: Event<K>[] = []
|
||||||
|
|
||||||
|
const pushToQueue = (event: Event<K>) => {
|
||||||
|
if (nextResolve) {
|
||||||
|
nextResolve(event)
|
||||||
|
nextResolve = undefined
|
||||||
|
} else {
|
||||||
|
eventQueue.push(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub.on('event', pushToQueue)
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
if (eventQueue.length > 0) {
|
||||||
|
yield eventQueue.shift()!
|
||||||
|
} else {
|
||||||
|
const event = await new Promise<Event<K>>(resolve => {
|
||||||
|
nextResolve = resolve
|
||||||
|
})
|
||||||
|
yield event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
sub.off('event', pushToQueue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue