adapt to new nip-01, change "attempt" notification logic.

This commit is contained in:
fiatjaf
2021-02-15 00:15:24 -03:00
parent d355bbead4
commit c03f8d1c56
3 changed files with 66 additions and 89 deletions

84
pool.js
View File

@@ -1,10 +1,11 @@
import R from 'ramda'
import {getEventHash, signEvent} from './event'
import {relayConnect, normalizeRelayURL} from './relay'
export function relayPool(globalPrivateKey) {
const relays = {}
const globalSub = []
const attemptCallbacks = []
const eventCallbacks = []
const noticeCallbacks = []
@@ -20,21 +21,19 @@ export function relayPool(globalPrivateKey) {
noticeCallbacks[i](notice, relay)
}
}
function propagateAttempt(eventId, status, relayURL) {
for (let i = 0; i < attemptCallbacks.length; i++) {
let {relay} = relays[relayURL]
attemptCallbacks[i](eventId, status, relay)
}
}
async function relaysEach(fn, policyFilter) {
for (let relayURL in relays) {
let {relay, policy} = relays[relayURL]
if (policyFilter.write && policy.write) {
await fn(relay)
} else if (policyFilter.read && policy.read) {
await fn(relay)
}
const sub = async (cb, params) => {
const subControllers = R.map(relay => {
return relay.sub(params, cb.bind(null, relay))
}, R.filter(R.pipe(R.prop('policy'), R.prop('write'), R.equals(true)), relays))
return {
sub: (cb, params) =>
R.map(
R.pipe(R.prop('sub'), R.flip(R.apply)([cb, params])),
subControllers
),
unsub: () => R.map(R.pipe(R.prop('unsub'), R.call), subControllers)
}
}
@@ -72,9 +71,6 @@ export function relayPool(globalPrivateKey) {
relay.close()
delete relays[relayURL]
},
onEvent(cb) {
eventCallbacks.push(cb)
},
offEvent(cb) {
let index = eventCallbacks.indexOf(cb)
if (index !== -1) eventCallbacks.splice(index, 1)
@@ -86,14 +82,7 @@ export function relayPool(globalPrivateKey) {
let index = noticeCallbacks.indexOf(cb)
if (index !== -1) noticeCallbacks.splice(index, 1)
},
onAttempt(cb) {
attemptCallbacks.push(cb)
},
offAttempt(cb) {
let index = attemptCallbacks.indexOf(cb)
if (index !== -1) attemptCallbacks.splice(index, 1)
},
async publish(event) {
async publish(event, statusCallback) {
if (!event.sig) {
event.tags = event.tags || []
@@ -107,36 +96,23 @@ export function relayPool(globalPrivateKey) {
}
}
await relaysEach(
async relay => {
try {
await relay.publish(event)
propagateAttempt(event.id, 'sent', relay.url)
} catch (err) {
propagateAttempt(event.id, 'failed', relay.url)
}
},
{write: true}
)
await R.map(async relay => {
try {
await relay.publish(event)
statusCallback(0, relay.url)
let {unsub} = relay.sub(
() => {
statusCallback(1, relay.url)
},
{id: event.id}
)
setTimeout(unsub, 5000)
} catch (err) {
statusCallback(-1, relay.url)
}
}, R.filter(R.pipe(R.prop('policy'), R.prop('write'), R.equals(true)), relays))
return event
},
async subKey(key) {
globalSub[key] = true
await relaysEach(async relay => relay.subKey(key), {read: true})
},
async unsubKey(key) {
delete globalSub[key]
await relaysEach(async relay => relay.unsubKey(key), {read: true})
},
async reqFeed(params = {}) {
await relaysEach(async relay => relay.reqFeed(params), {read: true})
},
async reqEvent(params) {
await relaysEach(async relay => relay.reqEvent(params), {read: true})
},
async reqKey(params) {
await relaysEach(async relay => relay.reqKey(params), {read: true})
}
}
}