remove ramda and rework logic.

This commit is contained in:
fiatjaf
2021-02-18 16:04:41 -03:00
parent 87de1310b5
commit 3844f68d1a
3 changed files with 46 additions and 42 deletions

72
pool.js
View File

@@ -6,15 +6,8 @@ const R = require('ramda')
export function relayPool(globalPrivateKey) {
const relays = {}
const globalSub = []
const eventCallbacks = []
const noticeCallbacks = []
function propagateEvent(event, context, relayURL) {
for (let i = 0; i < eventCallbacks.length; i++) {
let {relay} = relays[relayURL]
eventCallbacks[i](event, context, relay)
}
}
function propagateNotice(notice, relayURL) {
for (let i = 0; i < noticeCallbacks.length; i++) {
let {relay} = relays[relayURL]
@@ -22,23 +15,42 @@ export function relayPool(globalPrivateKey) {
}
}
const sub = async (cb, params) => {
const subControllers = R.map(({relay}) => {
return relay.sub(params, cb.bind(null, relay))
}, R.filter(R.pipe(R.prop('policy'), R.prop('write'), R.equals(true)), relays))
const activeSubscriptions = {}
return {
sub: (cb, params) =>
R.map(
R.pipe(R.prop('sub'), R.flip(R.apply)([cb, params])),
subControllers
),
unsub: () => R.map(R.pipe(R.prop('unsub'), R.call), subControllers)
const sub = async (id, cb, filter) => {
const subControllers = Object.fromEntries(
Object.values(relays)
.filter(({policy}) => policy.read)
.map(({relay}) => [
relay.url,
relay.sub({filter, cb: event => cb(event, relay)})
])
)
activeSubscriptions[id] = {
sub: ({cb = cb, filter = filter}) =>
Object.entries(subControllers).map(([relayURL, sub]) => [
relayURL,
sub(id, {cb, filter})
]),
addRelay: relay => {
subControllers[relay.url] = relay.sub({cb, filter})
},
removeRelay: relayURL => {
subControllers[relayURL].unsub()
if (Object.keys(subControllers).length === 0) unsub()
},
unsub: () => {
Object.values(subControllers).forEach(sub => sub.unsub())
delete activeSubscriptions[id]
}
}
return activeSubscriptions[id]
}
return {
sub,
sub: sub.bind(null, Math.random()),
relays,
setPrivateKey(privateKey) {
globalPrivateKey = privateKey
@@ -47,21 +59,14 @@ export function relayPool(globalPrivateKey) {
let relayURL = normalizeRelayURL(url)
if (relayURL in relays) return
let relay = relayConnect(
url,
(context, event) => {
propagateEvent(context, event, relayURL)
},
notice => {
propagateNotice(notice, relayURL)
}
)
let relay = relayConnect(url, notice => {
propagateNotice(notice, relayURL)
})
relays[relayURL] = {relay, policy}
// automatically subscribe to everybody on this
for (let key in globalSub) {
relay.subKey(key)
}
Object.values(activeSubscriptions).forEach(subscription =>
subscription.addRelay(relay)
)
return relay
},
@@ -69,6 +74,9 @@ export function relayPool(globalPrivateKey) {
let relayURL = normalizeRelayURL(url)
let {relay} = relays[relayURL]
if (!relay) return
Object.values(activeSubscriptions).forEach(subscription =>
subscription.removeRelay(relay)
)
relay.close()
delete relays[relayURL]
},