initial refactor of cb api

This commit is contained in:
monica
2022-12-04 21:57:15 -06:00
parent 92fb339afb
commit b955ba2a09
3 changed files with 163 additions and 140 deletions

View File

@@ -1,6 +1,6 @@
import {generatePrivateKey, getPublicKey} from './keys.js' import { generatePrivateKey, getPublicKey } from './keys.js'
import {relayConnect} from './relay.js' import { relayInit } from './relay.js'
import {relayPool} from './pool.js' import { relayPool } from './pool.js'
import { import {
getBlankEvent, getBlankEvent,
signEvent, signEvent,
@@ -9,11 +9,11 @@ import {
serializeEvent, serializeEvent,
getEventHash getEventHash
} from './event.js' } from './event.js'
import {matchFilter, matchFilters} from './filter.js' import { matchFilter, matchFilters } from './filter.js'
export { export {
generatePrivateKey, generatePrivateKey,
relayConnect, relayInit,
relayPool, relayPool,
signEvent, signEvent,
validateEvent, validateEvent,
@@ -25,3 +25,4 @@ export {
matchFilter, matchFilter,
matchFilters matchFilters
} }

168
pool.js
View File

@@ -1,5 +1,5 @@
import {getEventHash, verifySignature, signEvent} from './event.js' import { getEventHash, verifySignature, signEvent } from './event.js'
import {relayConnect, normalizeRelayURL} from './relay.js' import { relayInit, normalizeRelayURL } from './relay.js'
export function relayPool() { export function relayPool() {
var globalPrivateKey var globalPrivateKey
@@ -14,74 +14,65 @@ export function relayPool() {
// been published -- or at least attempted to be published -- to all relays // been published -- or at least attempted to be published -- to all relays
wait: false wait: false
} }
//map with all the relays where the url is the id // map with all the relays where the url is the id
//Map<string,{relay:Relay,policy:RelayPolicy> // Map<string,{relay:Relay,policy:RelayPolicy>
const relays = {} const relays = {}
const noticeCallbacks = []
function propagateNotice(notice, relayURL) {
for (let i = 0; i < noticeCallbacks.length; i++) {
let {relay} = relays[relayURL]
noticeCallbacks[i](notice, relay)
}
}
const activeSubscriptions = {} const activeSubscriptions = {}
const poolListeners = { notice: [], connection: [], disconnection: [], error: [] }
//sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function } // sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function }
const sub = ({cb, filter, beforeSend}, id, cbEose) => { const sub = ({ filter, beforeSend, skipVerification }, id) => {
//check if it has an id, if not assign one // check if it has an id, if not assign one
if (!id) id = Math.random().toString().slice(2) if (!id) id = Math.random().toString().slice(2)
const subListeners = {}
const subControllers = Object.fromEntries( const subControllers = Object.fromEntries(
//Convert the map<string,Relay> to a Relay[] // Convert the map<string,Relay> to a Relay[]
Object.values(relays) Object.values(relays)
//takes only relays that can be read // takes only relays that can be read
.filter(({policy}) => policy.read) .filter(({ policy }) => policy.read)
//iterate all the rellies and create the array [url:string,sub:SubscriptionCallback] // iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners]
.map(({relay}) => [ .map(({ relay }) => [
relay.url, relay.url,
relay.sub({cb: event => cb(event, relay.url), filter, beforeSend}, id, relay.sub({ filter, beforeSend, skipVerification }, id),
() => cbEose(relay.url))
]) ])
) )
const activeCallback = cb //assign callback for the suscription const activeFilters = filter // assigng filter for the suscrition
const activeFilters = filter //assigng filter for the suscrition const activeBeforeSend = beforeSend // assign before send fucntion
const activeBeforeSend = beforeSend //assign before send fucntion const activeSkipVerification = skipVerification // assign skipVerification
//Unsub deletes itself // Unsub deletes itself
const unsub = () => { const unsub = () => {
//iterate the map of subControllers and call the unsub function of it relays // iterate the map of subControllers and call the unsub function of it relays
Object.values(subControllers).forEach(sub => sub.unsub()) Object.values(subControllers).forEach(sub => sub.unsub())
delete activeSubscriptions[id] delete activeSubscriptions[id]
} }
const sub = ({ const sub = ({
cb = activeCallback,
filter = activeFilters, filter = activeFilters,
beforeSend = activeBeforeSend beforeSend = activeBeforeSend,
skipVerification = activeSkipVerification
}) => { }) => {
//Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) // Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.)
Object.entries(subControllers).map(([relayURL, sub]) => [ Object.entries(subControllers).forEach(([relayURL, sub]) => {
relayURL, sub.sub({ filter, beforeSend, skipVerification }, id)
sub.sub({cb: event => cb(event, relayURL), filter, beforeSend}, id, })
() => cbEose(relayURL))
]) // returns the current suscripcion
//returns the current suscripcion
return activeSubscriptions[id] return activeSubscriptions[id]
} }
//addRelay adds a relay to the subControllers map so the current subscription can use it // addRelay adds a relay to the subControllers map so the current subscription can use it
const addRelay = relay => { const addRelay = relay => {
subControllers[relay.url] = relay.sub( for (let type of Object.keys(subListeners)) {
{cb: event => cb(event, relay.url), filter, beforeSend}, if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id))
id, () => cbEose(relay.url) }
) subControllers[relay.url] = relay.sub({ filter, beforeSend, skipVerification }, id) // TODO filter/before send should reference updated filter/beforesend
return activeSubscriptions[id] return activeSubscriptions[id]
} }
//removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay // removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay
const removeRelay = relayURL => { const removeRelay = relayURL => {
if (relayURL in subControllers) { if (relayURL in subControllers) {
subControllers[relayURL].unsub() subControllers[relayURL].unsub()
@@ -89,13 +80,29 @@ export function relayPool() {
} }
return activeSubscriptions[id] return activeSubscriptions[id]
} }
// on creates listener for sub ('EVENT', 'EOSE', etc)
const on = (type, cb) => {
subListeners[type].push(cb)
Object.values(relays).forEach(({ relay }) => relay.on(type, cb, id))
return activeSubscriptions[id]
}
// off destroys listener for sub ('EVENT', 'EOSE', etc)
const off = (type, cb) => {
if (!subListeners[type].length) return
let index = subListeners[type].indexOf(cb)
if (index !== -1) subListeners[type].splice(index, 1)
Object.values(relays).forEach(({ relay }) => relay.off(type, cb, id))
return activeSubscriptions[id]
}
//add the object created to activeSubscriptions map // add the object created to activeSubscriptions map
activeSubscriptions[id] = { activeSubscriptions[id] = {
sub, sub,
unsub, unsub,
addRelay, addRelay,
removeRelay removeRelay,
on,
off
} }
return activeSubscriptions[id] return activeSubscriptions[id]
@@ -113,43 +120,47 @@ export function relayPool() {
setPolicy(key, value) { setPolicy(key, value) {
poolPolicy[key] = value poolPolicy[key] = value
}, },
//addRelay adds a relay to the pool and to all its subscriptions // addRelay adds a relay to the pool and to all its subscriptions
addRelay(url, policy = {read: true, write: true}) { addRelay(url, policy = { read: true, write: true }) {
let relayURL = normalizeRelayURL(url) let relayURL = normalizeRelayURL(url)
if (relayURL in relays) return if (relayURL in relays) return
let relay = relayConnect(url, notice => { let relay = relayInit(url)
propagateNotice(notice, relayURL)
}) for (let type of Object.keys(poolListeners)) {
relays[relayURL] = {relay, policy} let cbs = poolListeners[type] || []
if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb))
}
relay.connect()
relays[relayURL] = { relay: relay, policy }
if (policy.read) { if (policy.read) {
Object.values(activeSubscriptions).forEach(subscription => Object.values(activeSubscriptions).forEach(subscription => {
subscription.addRelay(relay) subscription.addRelay(relay)
) })
} }
return relay return relay
}, },
//remove relay deletes the relay from the pool and from all its subscriptions // remove relay deletes the relay from the pool and from all its subscriptions
removeRelay(url) { removeRelay(url) {
let relayURL = normalizeRelayURL(url) let relayURL = normalizeRelayURL(url)
let data = relays[relayURL] let data = relays[relayURL]
if (!data) return if (!data) return
let {relay} = data let { relay } = data
Object.values(activeSubscriptions).forEach(subscription => Object.values(activeSubscriptions).forEach(subscription =>
subscription.removeRelay(relay) subscription.removeRelay(relay)
) )
relay.close() relay.close()
delete relays[relayURL] delete relays[relayURL]
}, },
//getRelayList return an array with all the relays stored // getRelayList return an array with all the relays stored
getRelayList() { getRelayList() {
return Object.values(relays) return Object.values(relays)
}, },
relayChangePolicy(url, policy = {read: true, write: true}) { relayChangePolicy(url, policy = { read: true, write: true }) {
let relayURL = normalizeRelayURL(url) let relayURL = normalizeRelayURL(url)
let data = relays[relayURL] let data = relays[relayURL]
if (!data) return if (!data) return
@@ -158,19 +169,22 @@ export function relayPool() {
return relays[relayURL] return relays[relayURL]
}, },
onNotice(cb) { on(type, cb) {
noticeCallbacks.push(cb) poolListeners[type] = poolListeners[type] || []
poolListeners[type].push(cb)
Object.values(relays).forEach(({ relay }) => relay.on(type, cb))
}, },
offNotice(cb) { off(type, cb) {
let index = noticeCallbacks.indexOf(cb) let index = poolListeners[type].indexOf(cb)
if (index !== -1) noticeCallbacks.splice(index, 1) if (index !== -1) poolListeners[type].splice(index, 1)
Object.values(relays).forEach(({ relay }) => relay.off(type, cb))
}, },
//publish send a event to the relays // publish send a event to the relays
async publish(event, statusCallback) { async publish(event, statusCallback) {
event.id = getEventHash(event) event.id = getEventHash(event)
//if the event is not signed then sign it // if the event is not signed then sign it
if (!event.sig) { if (!event.sig) {
event.tags = event.tags || [] event.tags = event.tags || []
@@ -195,9 +209,9 @@ export function relayPool() {
} }
} }
//get the writable relays // get the writable relays
let writeable = Object.values(relays) let writeable = Object.values(relays)
.filter(({policy}) => policy.write) .filter(({ policy }) => policy.write)
.sort(() => Math.random() - 0.5) // random .sort(() => Math.random() - 0.5) // random
let maxTargets = poolPolicy.randomChoice let maxTargets = poolPolicy.randomChoice
@@ -206,10 +220,10 @@ export function relayPool() {
let successes = 0 let successes = 0
//if the pool policy set to want until event send // if the pool policy set to want until event send
if (poolPolicy.wait) { if (poolPolicy.wait) {
for (let i = 0; i < writeable.length; i++) { for (let i = 0; i < writeable.length; i++) {
let {relay} = writeable[i] let { relay } = writeable[i]
try { try {
await new Promise(async (resolve, reject) => { await new Promise(async (resolve, reject) => {
@@ -231,9 +245,9 @@ export function relayPool() {
/***/ /***/
} }
} }
//if the pool policy dont want to wait until event send // if the pool policy dont want to wait until event send
} else { } else {
writeable.forEach(async ({relay}) => { writeable.forEach(async ({ relay }) => {
let callback = statusCallback let callback = statusCallback
? status => statusCallback(status, relay.url) ? status => statusCallback(status, relay.url)
: null : null

124
relay.js
View File

@@ -2,8 +2,8 @@
import 'websocket-polyfill' import 'websocket-polyfill'
import {verifySignature, validateEvent} from './event.js' import { verifySignature, validateEvent } from './event.js'
import {matchFilters} from './filter.js' import { matchFilters } from './filter.js'
export function normalizeRelayURL(url) { export function normalizeRelayURL(url) {
let [host, ...qs] = url.trim().split('?') let [host, ...qs] = url.trim().split('?')
@@ -13,12 +13,19 @@ export function normalizeRelayURL(url) {
return [host, ...qs].join('?') return [host, ...qs].join('?')
} }
export function relayConnect(url, onNotice = () => {}, onError = () => {}) { export function relayInit(url) {
url = normalizeRelayURL(url) url = normalizeRelayURL(url)
var ws, resolveOpen, untilOpen, wasClosed var ws, resolveOpen, untilOpen, wasClosed
var openSubs = {} var openSubs = {}
var isSetToSkipVerification = {} var listeners = {
event: { '_': [] },
eose: { '_': [] },
connection: { '_': [] },
disconnection: { '_': [] },
error: { '_': [] },
notice: { '_': [] },
}
let attemptNumber = 1 let attemptNumber = 1
let nextAttemptSeconds = 1 let nextAttemptSeconds = 1
@@ -28,32 +35,28 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
}) })
} }
var eventListeners = {} function connectRelay() {
var eoseListeners = {}
function connect() {
ws = new WebSocket(url) ws = new WebSocket(url)
ws.onopen = () => { ws.onopen = () => {
console.log('connected to', url) console.log('connected to', url)
listeners.connection._.forEach(cb => cb(url))
resolveOpen() resolveOpen()
// restablish old subscriptions // restablish old subscriptions
if (wasClosed) { if (wasClosed) {
wasClosed = false wasClosed = false
for (let channel in openSubs) { for (let id in openSubs) {
let filters = openSubs[channel] sub(openSubs[id], id)
let eventCb = eventListeners[channel]
let eoseCb = eoseListeners[channel]
sub({eventCb, filter: filters}, channel, eoseCb)
} }
} }
} }
ws.onerror = err => { ws.onerror = err => {
console.log('error connecting to relay', url) console.log('error connecting to relay', url)
onError(err) listeners.error._.forEach(cb => cb(err))
} }
ws.onclose = () => { ws.onclose = async () => {
listeners.disconnection._.forEach(cb => cb(url))
resetOpenState() resetOpenState()
attemptNumber++ attemptNumber++
nextAttemptSeconds += attemptNumber ** 3 nextAttemptSeconds += attemptNumber ** 3
@@ -63,11 +66,7 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
console.log( console.log(
`relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` `relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.`
) )
setTimeout(async () => { setTimeout(await connect(), nextAttemptSeconds * 1000)
try {
connect()
} catch (err) {}
}, nextAttemptSeconds * 1000)
wasClosed = true wasClosed = true
} }
@@ -87,31 +86,29 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
// ignore empty or malformed notice // ignore empty or malformed notice
return return
} }
console.log(`message from relay ${url}: ${data[1]}`) if (listeners.notice._.length) listeners.notice._.forEach(cb => cb(data[1]))
onNotice(data[1])
return return
case 'EOSE': case 'EOSE':
if (data.length !== 2) { if (data.length !== 2) {
// ignore malformed EOSE // ignore malformed EOSE
return return
} }
console.log(`Channel ${data[1]}: End-of-stored-events`) if (listeners.eose[data[1]]?.length) listeners.eose[data[1]].forEach(cb => cb())
if (eoseListeners[data[1]]) { if (listeners.eose._.length) listeners.eose._.forEach(cb => cb())
eoseListeners[data[1]]()
}
return return
case 'EVENT': case 'EVENT':
if (data.length !== 3) { if (data.length !== 3) {
// ignore malformed EVENT // ignore malformed EVENT
return return
} }
let channel = data[1] let id = data[1]
let event = data[2] let event = data[2]
if (validateEvent(event) && if (validateEvent(event) && openSubs[id] &&
(isSetToSkipVerification[channel] || verifySignature(event)) && (openSubs[id].skipVerification || verifySignature(event)) &&
eventListeners[channel] && matchFilters(openSubs[id].filter, event)
matchFilters(openSubs[channel], event)) { ) {
eventListeners[channel](event) if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb(event))
if (listeners.event._.length) listeners.event._.forEach(cb => cb(event))
} }
return return
} }
@@ -121,9 +118,11 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
resetOpenState() resetOpenState()
try { async function connect() {
connect() try {
} catch (err) {} connectRelay()
} catch (err) { }
}
async function trySend(params) { async function trySend(params) {
let msg = JSON.stringify(params) let msg = JSON.stringify(params)
@@ -132,65 +131,73 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
ws.send(msg) ws.send(msg)
} }
const sub = ( const sub = ({ filter, beforeSend, skipVerification }, id = Math.random().toString().slice(2)) => {
{cb, filter, beforeSend, skipVerification},
channel = Math.random().toString().slice(2),
eoseCb
) => {
var filters = [] var filters = []
if (Array.isArray(filter)) { if (Array.isArray(filter)) {
filters = filter filters = filter
} else { } else {
filters.push(filter) filters.push(filter)
} }
filter = filters
if (beforeSend) { if (beforeSend) {
const beforeSendResult = beforeSend({filter, relay: url, channel}) const beforeSendResult = beforeSend({ filter, relay: url, id })
filters = beforeSendResult.filter filter = beforeSendResult.filter
} }
trySend(['REQ', channel, ...filters]) trySend(['REQ', id, ...filter])
eventListeners[channel] = cb openSubs[id] = {
eoseListeners[channel] = eoseCb filter,
openSubs[channel] = filters beforeSend,
isSetToSkipVerification[channel] = skipVerification skipVerification,
}
const activeCallback = cb const activeFilters = filter
const activeFilters = filters
const activeBeforeSend = beforeSend const activeBeforeSend = beforeSend
return { return {
sub: ({ sub: ({
cb = activeCallback,
filter = activeFilters, filter = activeFilters,
beforeSend = activeBeforeSend beforeSend = activeBeforeSend
}) => sub({cb, filter, beforeSend, skipVerification}, channel, eoseCb), }) => sub({ filter, beforeSend, skipVerification }, id),
unsub: () => { unsub: () => {
delete openSubs[channel] delete openSubs[id]
delete eventListeners[channel] delete listeners.event[id]
delete eoseListeners[channel] delete listeners.eose[id]
delete isSetToSkipVerification[channel] trySend(['CLOSE', id])
trySend(['CLOSE', channel])
} }
} }
} }
function on(type, cb, id = '_') {
listeners[type][id] = listeners[type][id] || []
listeners[type][id].push(cb)
}
function off(type, cb, id = '_') {
if (!listeners[type][id].length) return
let index = listeners[type][id].indexOf(cb)
if (index !== -1) listeners[type][id].splice(index, 1)
}
return { return {
url, url,
sub, sub,
on,
off,
async publish(event, statusCallback) { async publish(event, statusCallback) {
try { try {
await trySend(['EVENT', event]) await trySend(['EVENT', event])
if (statusCallback) { if (statusCallback) {
statusCallback(0) statusCallback(0)
let {unsub} = sub( let { unsub } = sub(
{ {
cb: () => { cb: () => {
statusCallback(1) statusCallback(1)
unsub() unsub()
clearTimeout(willUnsub) clearTimeout(willUnsub)
}, },
filter: {ids: [event.id]} filter: { ids: [event.id] }
}, },
`monitor-${event.id.slice(0, 5)}` `monitor-${event.id.slice(0, 5)}`
) )
@@ -200,6 +207,7 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
if (statusCallback) statusCallback(-1) if (statusCallback) statusCallback(-1)
} }
}, },
connect,
close() { close() {
ws.close() ws.close()
}, },