Merge pull request #41 from monlovesmango/cb-api

refactor of cb api
This commit is contained in:
fiatjaf
2022-12-17 13:06:30 -03:00
committed by GitHub
4 changed files with 246 additions and 196 deletions

View File

@@ -14,41 +14,59 @@ pool.setPrivateKey('<hex>') // optional
pool.addRelay('ws://some.relay.com', {read: true, write: true})
pool.addRelay('ws://other.relay.cool', {read: true, write: true})
// example callback function for a subscription
function onEvent(event, relay) {
console.log(`got an event from ${relay.url} which is already validated.`, event)
// example callback functions for listeners
// callback functions take an object argument with following keys:
// - relay: relay url
// - type: type of listener
// - id: sub id for sub specific listeners ('EVENT' or 'EOSE')
// - event: event object, only for 'event' listener
// - notice: notice message, only for 'notice' listener
function onEvent({event, relay, type, id}) {
console.log(`got an event from ${relay} which is already validated.`, event)
}
function onEose({relay, type, id}) { /* callback function here */}
function onNotice({relay, type, notice}) { /* callback function here */}
function onConnection({relay, type}) { /* callback function here */}
// listen for messages for pool
pool.on('event', onEvent)
pool.on('connection', onConnection)
pool.on('notice', onNotice)
// subscribing to a single user
// author is the user's public key
pool.sub({cb: onEvent, filter: {author: '<hex>'}})
pool.sub({filter: {author: '<hex>'}})
// or bulk follow
pool.sub({cb:(event, relay) => {...}, filter: {authors: ['<hex1>', '<hex2>', ..., '<hexn>']}})
pool.sub({filter: {authors: ['<hex1>', '<hex2>', ..., '<hexn>']}})
// reuse a subscription channel
const mySubscription = pool.sub({cb: ..., filter: ....})
const mySubscription = pool.sub({filter: ...., skipVerification: false, beforeSend: ....})
mySubscription.sub({filter: ....})
mySubscription.sub({cb: ...})
mySubscription.sub({skipVerification: true})
// listen for messages for subscription
mySubscription.on('event', onEvent)
mySubscription.on('eose', onEose)
// close subscription
mySubscription.unsub()
// get specific event
const specificChannel = pool.sub({
cb: (event, relay) => {
const specificChannel = pool.sub({ filter: {id: '<hex>'}})
.on('event', ({event, relay}) => {
console.log('got specific event from relay', event, relay)
specificChannel.unsub()
},
filter: {id: '<hex>'}
})
})
// or get a specific event plus all the events that reference it in the 'e' tag
pool.sub({ cb: (event, relay) => { ... }, filter: [{id: '<hex>'}, {'#e': '<hex>'}] })
pool.sub({ filter: [{id: '<hex>'}, {'#e': '<hex>'}] })
// get all events
pool.sub({cb: (event, relay) => {...}, filter: {}})
pool.sub({ filter: {} })
// get recent events
pool.sub({cb: (event, relay) => {...}, filter: {since: timestamp}})
pool.sub({ filter: {since: timestamp} })
// publishing events(inside an async function):
const ev = await pool.publish(eventObject, (status, url) => {

View File

@@ -1,6 +1,6 @@
import {generatePrivateKey, getPublicKey} from './keys.js'
import {relayConnect} from './relay.js'
import {relayPool} from './pool.js'
import { generatePrivateKey, getPublicKey } from './keys.js'
import { relayInit } from './relay.js'
import { relayPool } from './pool.js'
import {
getBlankEvent,
signEvent,
@@ -9,11 +9,11 @@ import {
serializeEvent,
getEventHash
} from './event.js'
import {matchFilter, matchFilters} from './filter.js'
import { matchFilter, matchFilters } from './filter.js'
export {
generatePrivateKey,
relayConnect,
relayInit,
relayPool,
signEvent,
validateEvent,
@@ -25,3 +25,4 @@ export {
matchFilter,
matchFilters
}

197
pool.js
View File

@@ -1,5 +1,5 @@
import {getEventHash, verifySignature, signEvent} from './event.js'
import {relayConnect, normalizeRelayURL} from './relay.js'
import { getEventHash, verifySignature, signEvent } from './event.js'
import { relayInit, normalizeRelayURL } from './relay.js'
export function relayPool() {
var globalPrivateKey
@@ -14,88 +14,107 @@ export function relayPool() {
// been published -- or at least attempted to be published -- to all relays
wait: false
}
//map with all the relays where the url is the id
//Map<string,{relay:Relay,policy:RelayPolicy>
// map with all the relays where the url is the id
// Map<string,{relay:Relay,policy:RelayPolicy>
const relays = {}
const noticeCallbacks = []
function propagateNotice(notice, relayURL) {
for (let i = 0; i < noticeCallbacks.length; i++) {
let {relay} = relays[relayURL]
noticeCallbacks[i](notice, relay)
}
}
const openSubs = {}
const activeSubscriptions = {}
const poolListeners = { notice: [], connection: [], disconnection: [], error: [] }
//sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function }
const sub = ({cb, filter, beforeSend}, id, cbEose) => {
//check if it has an id, if not assign one
// sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function }
const sub = ({ filter, beforeSend, skipVerification }, id) => {
// check if it has an id, if not assign one
if (!id) id = Math.random().toString().slice(2)
// save sub settings
openSubs[id] = {
filter,
beforeSend,
skipVerification,
}
const subListeners = { event: [], eose: [] }
const subControllers = Object.fromEntries(
//Convert the map<string,Relay> to a Relay[]
// Convert the map<string,Relay> to a Relay[]
Object.values(relays)
//takes only relays that can be read
.filter(({policy}) => policy.read)
//iterate all the rellies and create the array [url:string,sub:SubscriptionCallback]
.map(({relay}) => [
// takes only relays that can be read
.filter(({ policy }) => policy.read)
// iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners]
.map(({ relay }) => [
relay.url,
relay.sub({cb: event => cb(event, relay.url), filter, beforeSend}, id,
() => cbEose(relay.url))
relay.sub(openSubs[id], id),
])
)
const activeCallback = cb //assign callback for the suscription
const activeFilters = filter //assigng filter for the suscrition
const activeBeforeSend = beforeSend //assign before send fucntion
//Unsub deletes itself
// Unsub deletes itself
const unsub = () => {
//iterate the map of subControllers and call the unsub function of it relays
// iterate the map of subControllers and call the unsub function of it relays
Object.values(subControllers).forEach(sub => sub.unsub())
delete activeSubscriptions[id]
delete openSubs[id]
delete activeSubscriptions[id]
}
const sub = ({
cb = activeCallback,
filter = activeFilters,
beforeSend = activeBeforeSend
}) => {
//Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.)
Object.entries(subControllers).map(([relayURL, sub]) => [
relayURL,
sub.sub({cb: event => cb(event, relayURL), filter, beforeSend}, id,
() => cbEose(relayURL))
])
//returns the current suscripcion
filter = openSubs[id].filter,
beforeSend = openSubs[id].beforeSend,
skipVerification = openSubs[id].skipVerification }
) => {
// update sub settings
openSubs[id] = {
filter,
beforeSend,
skipVerification,
}
// update relay subs
Object.entries(subControllers).forEach(([relayURL, sub]) => {
sub.sub(openSubs[id], id)
})
// returns the current suscripcion
return activeSubscriptions[id]
}
//addRelay adds a relay to the subControllers map so the current subscription can use it
// addRelay adds a relay to the subControllers map so the current subscription can use it
const addRelay = relay => {
subControllers[relay.url] = relay.sub(
{cb: event => cb(event, relay.url), filter, beforeSend},
id, () => cbEose(relay.url)
)
for (let type of Object.keys(subListeners)) {
if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id))
}
subControllers[relay.url] = relay.sub(openSubs[id], id)
return activeSubscriptions[id]
}
//removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay
// removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay
const removeRelay = relayURL => {
if (relayURL in subControllers) {
subControllers[relayURL].unsub()
delete subControllers[relayURL]
if (Object.keys(subControllers).length === 0) unsub()
}
return activeSubscriptions[id]
}
// on creates listener for sub ('EVENT', 'EOSE', etc)
const on = (type, cb) => {
subListeners[type].push(cb)
Object.values(relays).filter(({ policy }) => policy.read).forEach(({ relay }) => relay.on(type, cb, id))
return activeSubscriptions[id]
}
// off destroys listener for sub ('EVENT', 'EOSE', etc)
const off = (type, cb) => {
if (!subListeners[type].length) return
let index = subListeners[type].indexOf(cb)
if (index !== -1) subListeners[type].splice(index, 1)
Object.values(relays).forEach(({ relay }) => relay.off(type, cb, id))
return activeSubscriptions[id]
}
//add the object created to activeSubscriptions map
// add the object created to activeSubscriptions map
activeSubscriptions[id] = {
sub,
unsub,
addRelay,
removeRelay
removeRelay,
on,
off
}
return activeSubscriptions[id]
@@ -113,64 +132,72 @@ export function relayPool() {
setPolicy(key, value) {
poolPolicy[key] = value
},
//addRelay adds a relay to the pool and to all its subscriptions
addRelay(url, policy = {read: true, write: true}) {
// addRelay adds a relay to the pool and to all its subscriptions
addRelay(url, policy = { read: true, write: true }) {
let relayURL = normalizeRelayURL(url)
if (relayURL in relays) return
let relay = relayConnect(url, notice => {
propagateNotice(notice, relayURL)
})
relays[relayURL] = {relay, policy}
let relay = relayInit(url)
for (let type of Object.keys(poolListeners)) {
let cbs = poolListeners[type] || []
if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb))
}
if (policy.read) {
Object.values(activeSubscriptions).forEach(subscription =>
subscription.addRelay(relay)
)
Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay))
}
relay.connect()
relays[relayURL] = { relay, policy }
return relay
},
//remove relay deletes the relay from the pool and from all its subscriptions
// remove relay deletes the relay from the pool and from all its subscriptions
removeRelay(url) {
let relayURL = normalizeRelayURL(url)
let data = relays[relayURL]
if (!data) return
let {relay} = data
Object.values(activeSubscriptions).forEach(subscription =>
subscription.removeRelay(relay)
)
let { relay } = data
Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL))
relay.close()
delete relays[relayURL]
},
//getRelayList return an array with all the relays stored
// getRelayList return an array with all the relays stored
getRelayList() {
return Object.values(relays)
},
relayChangePolicy(url, policy = {read: true, write: true}) {
relayChangePolicy(url, policy = { read: true, write: true }) {
let relayURL = normalizeRelayURL(url)
let data = relays[relayURL]
if (!data) return
relays[relayURL].policy = policy
let { relay } = data
if (relays[relayURL].policy.read === true && policy.read === false)
Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL))
else if (relays[relayURL].policy.read === false && policy.read === true)
Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay));
relays[relayURL].policy = policy
return relays[relayURL]
},
onNotice(cb) {
noticeCallbacks.push(cb)
on(type, cb) {
poolListeners[type] = poolListeners[type] || []
poolListeners[type].push(cb)
Object.values(relays).forEach(({ relay }) => relay.on(type, cb))
},
offNotice(cb) {
let index = noticeCallbacks.indexOf(cb)
if (index !== -1) noticeCallbacks.splice(index, 1)
off(type, cb) {
let index = poolListeners[type].indexOf(cb)
if (index !== -1) poolListeners[type].splice(index, 1)
Object.values(relays).forEach(({ relay }) => relay.off(type, cb))
},
//publish send a event to the relays
// publish send a event to the relays
async publish(event, statusCallback) {
event.id = getEventHash(event)
//if the event is not signed then sign it
// if the event is not signed then sign it
if (!event.sig) {
event.tags = event.tags || []
@@ -195,9 +222,9 @@ export function relayPool() {
}
}
//get the writable relays
// get the writable relays
let writeable = Object.values(relays)
.filter(({policy}) => policy.write)
.filter(({ policy }) => policy.write)
.sort(() => Math.random() - 0.5) // random
let maxTargets = poolPolicy.randomChoice
@@ -206,10 +233,10 @@ export function relayPool() {
let successes = 0
//if the pool policy set to want until event send
// if the pool policy set to wait until event send
if (poolPolicy.wait) {
for (let i = 0; i < writeable.length; i++) {
let {relay} = writeable[i]
let { relay } = writeable[i]
try {
await new Promise(async (resolve, reject) => {
@@ -231,9 +258,9 @@ export function relayPool() {
/***/
}
}
//if the pool policy dont want to wait until event send
// if the pool policy dont want to wait until event send
} else {
writeable.forEach(async ({relay}) => {
writeable.forEach(async ({ relay }) => {
let callback = statusCallback
? status => statusCallback(status, relay.url)
: null

186
relay.js
View File

@@ -2,8 +2,8 @@
import 'websocket-polyfill'
import {verifySignature, validateEvent} from './event.js'
import {matchFilters} from './filter.js'
import { verifySignature, validateEvent } from './event.js'
import { matchFilters } from './filter.js'
export function normalizeRelayURL(url) {
let [host, ...qs] = url.trim().split('?')
@@ -13,12 +13,19 @@ export function normalizeRelayURL(url) {
return [host, ...qs].join('?')
}
export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
url = normalizeRelayURL(url)
export function relayInit(url) {
let relay = normalizeRelayURL(url) // set relay url
var ws, resolveOpen, untilOpen, wasClosed
var ws, resolveOpen, untilOpen, wasClosed, closed
var openSubs = {}
var isSetToSkipVerification = {}
var listeners = {
event: { '_': [] },
eose: { '_': [] },
connection: { '_': [] },
disconnection: { '_': [] },
error: { '_': [] },
notice: { '_': [] },
}
let attemptNumber = 1
let nextAttemptSeconds = 1
@@ -28,32 +35,27 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
})
}
var eventListeners = {}
var eoseListeners = {}
function connect() {
ws = new WebSocket(url)
function connectRelay() {
ws = new WebSocket(relay)
ws.onopen = () => {
console.log('connected to', url)
listeners.connection._.forEach(cb => cb({ type: 'connection', relay }))
resolveOpen()
// restablish old subscriptions
if (wasClosed) {
wasClosed = false
for (let channel in openSubs) {
let filters = openSubs[channel]
let eventCb = eventListeners[channel]
let eoseCb = eoseListeners[channel]
sub({eventCb, filter: filters}, channel, eoseCb)
for (let id in openSubs) {
sub(openSubs[id], id)
}
}
}
ws.onerror = err => {
console.log('error connecting to relay', url)
onError(err)
ws.onerror = error => {
listeners.error._.forEach(cb => cb({ type: 'error', relay, error }))
}
ws.onclose = () => {
ws.onclose = async () => {
listeners.disconnection._.forEach(cb => cb({ type: 'disconnection', relay }))
if (closed) return
resetOpenState()
attemptNumber++
nextAttemptSeconds += attemptNumber ** 3
@@ -61,12 +63,12 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
nextAttemptSeconds = 14400 // 4 hours
}
console.log(
`relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.`
`relay ${relay} connection closed. reconnecting in ${nextAttemptSeconds} seconds.`
)
setTimeout(async () => {
try {
connect()
} catch (err) {}
connectRelay()
} catch (err) { }
}, nextAttemptSeconds * 1000)
wasClosed = true
@@ -82,38 +84,33 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
if (data.length >= 1) {
switch (data[0]) {
case 'NOTICE':
if (data.length !== 2) {
// ignore empty or malformed notice
return
}
console.log(`message from relay ${url}: ${data[1]}`)
onNotice(data[1])
return
case 'EOSE':
if (data.length !== 2) {
// ignore malformed EOSE
return
}
console.log(`Channel ${data[1]}: End-of-stored-events`)
if (eoseListeners[data[1]]) {
eoseListeners[data[1]]()
}
return
case 'EVENT':
if (data.length !== 3) {
// ignore malformed EVENT
return
}
let channel = data[1]
if (data.length !== 3) return // ignore empty or malformed EVENT
let id = data[1]
let event = data[2]
if (validateEvent(event) &&
(isSetToSkipVerification[channel] || verifySignature(event)) &&
eventListeners[channel] &&
matchFilters(openSubs[channel], event)) {
eventListeners[channel](event)
if (validateEvent(event) && openSubs[id] &&
(openSubs[id].skipVerification || verifySignature(event)) &&
matchFilters(openSubs[id].filter, event)
) {
if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb({ type: 'event', relay, id, event }))
if (listeners.event._.length) listeners.event._.forEach(cb => cb({ type: 'event', relay, id, event }))
}
return
case 'EOSE': {
if (data.length !== 2) return // ignore empty or malformed EOSE
let id = data[1]
if (listeners.eose[id]?.length) listeners.eose[data[1]].forEach(cb => cb({ type: 'eose', relay, id }))
if (listeners.eose._.length) listeners.eose._.forEach(cb => cb({ type: 'eose', relay, id }))
return
}
case 'NOTICE':
if (data.length !== 2) return // ignore empty or malformed NOTICE
let notice = data[1]
if (listeners.notice._.length) listeners.notice._.forEach(cb => cb({ type: 'notice', relay, notice }))
return
}
}
}
@@ -121,9 +118,12 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
resetOpenState()
try {
connect()
} catch (err) {}
async function connect() {
if (ws?.readyState && ws.readyState === 1) return // ws already open
try {
connectRelay()
} catch (err) { }
}
async function trySend(params) {
let msg = JSON.stringify(params)
@@ -132,75 +132,79 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) {
ws.send(msg)
}
const sub = (
{cb, filter, beforeSend, skipVerification},
channel = Math.random().toString().slice(2),
eoseCb
) => {
const sub = ({ filter, beforeSend, skipVerification }, id = Math.random().toString().slice(2)) => {
var filters = []
if (Array.isArray(filter)) {
filters = filter
} else {
filters.push(filter)
}
filter = filters
if (beforeSend) {
const beforeSendResult = beforeSend({filter, relay: url, channel})
filters = beforeSendResult.filter
const beforeSendResult = beforeSend({ filter, relay, id })
filter = beforeSendResult.filter
}
trySend(['REQ', channel, ...filters])
eventListeners[channel] = cb
eoseListeners[channel] = eoseCb
openSubs[channel] = filters
isSetToSkipVerification[channel] = skipVerification
const activeCallback = cb
const activeFilters = filters
const activeBeforeSend = beforeSend
openSubs[id] = {
filter,
beforeSend,
skipVerification,
}
trySend(['REQ', id, ...filter])
return {
sub: ({
cb = activeCallback,
filter = activeFilters,
beforeSend = activeBeforeSend
}) => sub({cb, filter, beforeSend, skipVerification}, channel, eoseCb),
filter = openSubs[id].filter,
beforeSend = openSubs[id].beforeSend,
skipVerification = openSubs[id].skipVerification }
) => sub({ filter, beforeSend, skipVerification }, id),
unsub: () => {
delete openSubs[channel]
delete eventListeners[channel]
delete eoseListeners[channel]
delete isSetToSkipVerification[channel]
trySend(['CLOSE', channel])
delete openSubs[id]
delete listeners.event[id]
delete listeners.eose[id]
trySend(['CLOSE', id])
}
}
}
function on(type, cb, id = '_') {
listeners[type][id] = listeners[type][id] || []
listeners[type][id].push(cb)
}
function off(type, cb, id = '_') {
if (!listeners[type][id].length) return
let index = listeners[type][id].indexOf(cb)
if (index !== -1) listeners[type][id].splice(index, 1)
}
return {
url,
sub,
on,
off,
async publish(event, statusCallback) {
try {
await trySend(['EVENT', event])
if (statusCallback) {
let id = `monitor-${event.id.slice(0, 5)}`
statusCallback(0)
let {unsub} = sub(
{
cb: () => {
statusCallback(1)
unsub()
clearTimeout(willUnsub)
},
filter: {ids: [event.id]}
},
`monitor-${event.id.slice(0, 5)}`
)
let { unsub } = sub({ filter: { ids: [event.id] } }, id)
on('event', () => {
statusCallback(1)
unsub()
clearTimeout(willUnsub)
}, id)
let willUnsub = setTimeout(unsub, 5000)
}
} catch (err) {
if (statusCallback) statusCallback(-1)
}
},
connect,
close() {
closed = true // prevent ws from trying to reconnect
ws.close()
},
get status() {