diff --git a/index.js b/index.js index d7d4c31..a742c3a 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,6 @@ -import {generatePrivateKey, getPublicKey} from './keys.js' -import {relayConnect} from './relay.js' -import {relayPool} from './pool.js' +import { generatePrivateKey, getPublicKey } from './keys.js' +import { relayInit } from './relay.js' +import { relayPool } from './pool.js' import { getBlankEvent, signEvent, @@ -9,11 +9,11 @@ import { serializeEvent, getEventHash } from './event.js' -import {matchFilter, matchFilters} from './filter.js' +import { matchFilter, matchFilters } from './filter.js' export { generatePrivateKey, - relayConnect, + relayInit, relayPool, signEvent, validateEvent, @@ -25,3 +25,4 @@ export { matchFilter, matchFilters } + diff --git a/pool.js b/pool.js index 5e60c28..748cdd3 100644 --- a/pool.js +++ b/pool.js @@ -1,5 +1,5 @@ -import {getEventHash, verifySignature, signEvent} from './event.js' -import {relayConnect, normalizeRelayURL} from './relay.js' +import { getEventHash, verifySignature, signEvent } from './event.js' +import { relayInit, normalizeRelayURL } from './relay.js' export function relayPool() { var globalPrivateKey @@ -14,74 +14,65 @@ export function relayPool() { // been published -- or at least attempted to be published -- to all relays wait: false } - - //map with all the relays where the url is the id - //Map + + // map with all the relays where the url is the id + // Map const relays = {} - const noticeCallbacks = [] - - function propagateNotice(notice, relayURL) { - for (let i = 0; i < noticeCallbacks.length; i++) { - let {relay} = relays[relayURL] - noticeCallbacks[i](notice, relay) - } - } - const activeSubscriptions = {} + const poolListeners = { notice: [], connection: [], disconnection: [], error: [] } - //sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function } - const sub = ({cb, filter, beforeSend}, id, cbEose) => { - - //check if it has an id, if not assign one + // sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function } + const sub = ({ filter, beforeSend, skipVerification }, id) => { + + // check if it has an id, if not assign one if (!id) id = Math.random().toString().slice(2) + const subListeners = {} const subControllers = Object.fromEntries( - //Convert the map to a Relay[] + // Convert the map to a Relay[] Object.values(relays) - //takes only relays that can be read - .filter(({policy}) => policy.read) - //iterate all the rellies and create the array [url:string,sub:SubscriptionCallback] - .map(({relay}) => [ + // takes only relays that can be read + .filter(({ policy }) => policy.read) + // iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners] + .map(({ relay }) => [ relay.url, - relay.sub({cb: event => cb(event, relay.url), filter, beforeSend}, id, - () => cbEose(relay.url)) + relay.sub({ filter, beforeSend, skipVerification }, id), ]) ) - const activeCallback = cb //assign callback for the suscription - const activeFilters = filter //assigng filter for the suscrition - const activeBeforeSend = beforeSend //assign before send fucntion + const activeFilters = filter // assigng filter for the suscrition + const activeBeforeSend = beforeSend // assign before send fucntion + const activeSkipVerification = skipVerification // assign skipVerification - //Unsub deletes itself + // Unsub deletes itself const unsub = () => { - //iterate the map of subControllers and call the unsub function of it relays + // iterate the map of subControllers and call the unsub function of it relays Object.values(subControllers).forEach(sub => sub.unsub()) - delete activeSubscriptions[id] + delete activeSubscriptions[id] } - - + + const sub = ({ - cb = activeCallback, filter = activeFilters, - beforeSend = activeBeforeSend + beforeSend = activeBeforeSend, + skipVerification = activeSkipVerification }) => { - //Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) - Object.entries(subControllers).map(([relayURL, sub]) => [ - relayURL, - sub.sub({cb: event => cb(event, relayURL), filter, beforeSend}, id, - () => cbEose(relayURL)) - ]) - //returns the current suscripcion + // Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) + Object.entries(subControllers).forEach(([relayURL, sub]) => { + sub.sub({ filter, beforeSend, skipVerification }, id) + }) + + // returns the current suscripcion return activeSubscriptions[id] } - //addRelay adds a relay to the subControllers map so the current subscription can use it + // addRelay adds a relay to the subControllers map so the current subscription can use it const addRelay = relay => { - subControllers[relay.url] = relay.sub( - {cb: event => cb(event, relay.url), filter, beforeSend}, - id, () => cbEose(relay.url) - ) + for (let type of Object.keys(subListeners)) { + if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id)) + } + subControllers[relay.url] = relay.sub({ filter, beforeSend, skipVerification }, id) // TODO filter/before send should reference updated filter/beforesend return activeSubscriptions[id] } - //removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay + // removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay const removeRelay = relayURL => { if (relayURL in subControllers) { subControllers[relayURL].unsub() @@ -89,13 +80,29 @@ export function relayPool() { } return activeSubscriptions[id] } + // on creates listener for sub ('EVENT', 'EOSE', etc) + const on = (type, cb) => { + subListeners[type].push(cb) + Object.values(relays).forEach(({ relay }) => relay.on(type, cb, id)) + return activeSubscriptions[id] + } + // off destroys listener for sub ('EVENT', 'EOSE', etc) + const off = (type, cb) => { + if (!subListeners[type].length) return + let index = subListeners[type].indexOf(cb) + if (index !== -1) subListeners[type].splice(index, 1) + Object.values(relays).forEach(({ relay }) => relay.off(type, cb, id)) + return activeSubscriptions[id] + } - //add the object created to activeSubscriptions map + // add the object created to activeSubscriptions map activeSubscriptions[id] = { sub, unsub, addRelay, - removeRelay + removeRelay, + on, + off } return activeSubscriptions[id] @@ -113,43 +120,47 @@ export function relayPool() { setPolicy(key, value) { poolPolicy[key] = value }, - //addRelay adds a relay to the pool and to all its subscriptions - addRelay(url, policy = {read: true, write: true}) { + // addRelay adds a relay to the pool and to all its subscriptions + addRelay(url, policy = { read: true, write: true }) { let relayURL = normalizeRelayURL(url) if (relayURL in relays) return - let relay = relayConnect(url, notice => { - propagateNotice(notice, relayURL) - }) - relays[relayURL] = {relay, policy} + let relay = relayInit(url) + + for (let type of Object.keys(poolListeners)) { + let cbs = poolListeners[type] || [] + if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb)) + } + relay.connect() + relays[relayURL] = { relay: relay, policy } if (policy.read) { - Object.values(activeSubscriptions).forEach(subscription => + Object.values(activeSubscriptions).forEach(subscription => { subscription.addRelay(relay) - ) + }) } return relay }, - //remove relay deletes the relay from the pool and from all its subscriptions + // remove relay deletes the relay from the pool and from all its subscriptions removeRelay(url) { let relayURL = normalizeRelayURL(url) let data = relays[relayURL] if (!data) return - let {relay} = data + let { relay } = data Object.values(activeSubscriptions).forEach(subscription => subscription.removeRelay(relay) ) relay.close() delete relays[relayURL] }, - //getRelayList return an array with all the relays stored + // getRelayList return an array with all the relays stored getRelayList() { return Object.values(relays) }, - relayChangePolicy(url, policy = {read: true, write: true}) { + relayChangePolicy(url, policy = { read: true, write: true }) { let relayURL = normalizeRelayURL(url) let data = relays[relayURL] if (!data) return @@ -158,19 +169,22 @@ export function relayPool() { return relays[relayURL] }, - onNotice(cb) { - noticeCallbacks.push(cb) + on(type, cb) { + poolListeners[type] = poolListeners[type] || [] + poolListeners[type].push(cb) + Object.values(relays).forEach(({ relay }) => relay.on(type, cb)) }, - offNotice(cb) { - let index = noticeCallbacks.indexOf(cb) - if (index !== -1) noticeCallbacks.splice(index, 1) + off(type, cb) { + let index = poolListeners[type].indexOf(cb) + if (index !== -1) poolListeners[type].splice(index, 1) + Object.values(relays).forEach(({ relay }) => relay.off(type, cb)) }, - - //publish send a event to the relays + + // publish send a event to the relays async publish(event, statusCallback) { event.id = getEventHash(event) - - //if the event is not signed then sign it + + // if the event is not signed then sign it if (!event.sig) { event.tags = event.tags || [] @@ -195,9 +209,9 @@ export function relayPool() { } } - //get the writable relays + // get the writable relays let writeable = Object.values(relays) - .filter(({policy}) => policy.write) + .filter(({ policy }) => policy.write) .sort(() => Math.random() - 0.5) // random let maxTargets = poolPolicy.randomChoice @@ -206,10 +220,10 @@ export function relayPool() { let successes = 0 - //if the pool policy set to want until event send + // if the pool policy set to want until event send if (poolPolicy.wait) { for (let i = 0; i < writeable.length; i++) { - let {relay} = writeable[i] + let { relay } = writeable[i] try { await new Promise(async (resolve, reject) => { @@ -231,9 +245,9 @@ export function relayPool() { /***/ } } - //if the pool policy dont want to wait until event send + // if the pool policy dont want to wait until event send } else { - writeable.forEach(async ({relay}) => { + writeable.forEach(async ({ relay }) => { let callback = statusCallback ? status => statusCallback(status, relay.url) : null diff --git a/relay.js b/relay.js index 73fc3ae..32a8aab 100644 --- a/relay.js +++ b/relay.js @@ -2,8 +2,8 @@ import 'websocket-polyfill' -import {verifySignature, validateEvent} from './event.js' -import {matchFilters} from './filter.js' +import { verifySignature, validateEvent } from './event.js' +import { matchFilters } from './filter.js' export function normalizeRelayURL(url) { let [host, ...qs] = url.trim().split('?') @@ -13,12 +13,19 @@ export function normalizeRelayURL(url) { return [host, ...qs].join('?') } -export function relayConnect(url, onNotice = () => {}, onError = () => {}) { +export function relayInit(url) { url = normalizeRelayURL(url) var ws, resolveOpen, untilOpen, wasClosed var openSubs = {} - var isSetToSkipVerification = {} + var listeners = { + event: { '_': [] }, + eose: { '_': [] }, + connection: { '_': [] }, + disconnection: { '_': [] }, + error: { '_': [] }, + notice: { '_': [] }, + } let attemptNumber = 1 let nextAttemptSeconds = 1 @@ -28,32 +35,28 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { }) } - var eventListeners = {} - var eoseListeners = {} - - function connect() { + function connectRelay() { ws = new WebSocket(url) ws.onopen = () => { console.log('connected to', url) + listeners.connection._.forEach(cb => cb(url)) resolveOpen() // restablish old subscriptions if (wasClosed) { wasClosed = false - for (let channel in openSubs) { - let filters = openSubs[channel] - let eventCb = eventListeners[channel] - let eoseCb = eoseListeners[channel] - sub({eventCb, filter: filters}, channel, eoseCb) + for (let id in openSubs) { + sub(openSubs[id], id) } } } ws.onerror = err => { console.log('error connecting to relay', url) - onError(err) + listeners.error._.forEach(cb => cb(err)) } - ws.onclose = () => { + ws.onclose = async () => { + listeners.disconnection._.forEach(cb => cb(url)) resetOpenState() attemptNumber++ nextAttemptSeconds += attemptNumber ** 3 @@ -63,11 +66,7 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { console.log( `relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` ) - setTimeout(async () => { - try { - connect() - } catch (err) {} - }, nextAttemptSeconds * 1000) + setTimeout(await connect(), nextAttemptSeconds * 1000) wasClosed = true } @@ -87,31 +86,29 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { // ignore empty or malformed notice return } - console.log(`message from relay ${url}: ${data[1]}`) - onNotice(data[1]) + if (listeners.notice._.length) listeners.notice._.forEach(cb => cb(data[1])) return case 'EOSE': if (data.length !== 2) { // ignore malformed EOSE return } - console.log(`Channel ${data[1]}: End-of-stored-events`) - if (eoseListeners[data[1]]) { - eoseListeners[data[1]]() - } + if (listeners.eose[data[1]]?.length) listeners.eose[data[1]].forEach(cb => cb()) + if (listeners.eose._.length) listeners.eose._.forEach(cb => cb()) return case 'EVENT': if (data.length !== 3) { // ignore malformed EVENT return } - let channel = data[1] + let id = data[1] let event = data[2] - if (validateEvent(event) && - (isSetToSkipVerification[channel] || verifySignature(event)) && - eventListeners[channel] && - matchFilters(openSubs[channel], event)) { - eventListeners[channel](event) + if (validateEvent(event) && openSubs[id] && + (openSubs[id].skipVerification || verifySignature(event)) && + matchFilters(openSubs[id].filter, event) + ) { + if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb(event)) + if (listeners.event._.length) listeners.event._.forEach(cb => cb(event)) } return } @@ -121,9 +118,11 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { resetOpenState() - try { - connect() - } catch (err) {} + async function connect() { + try { + connectRelay() + } catch (err) { } + } async function trySend(params) { let msg = JSON.stringify(params) @@ -132,65 +131,73 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { ws.send(msg) } - const sub = ( - {cb, filter, beforeSend, skipVerification}, - channel = Math.random().toString().slice(2), - eoseCb - ) => { + const sub = ({ filter, beforeSend, skipVerification }, id = Math.random().toString().slice(2)) => { var filters = [] if (Array.isArray(filter)) { filters = filter } else { filters.push(filter) } + filter = filters if (beforeSend) { - const beforeSendResult = beforeSend({filter, relay: url, channel}) - filters = beforeSendResult.filter + const beforeSendResult = beforeSend({ filter, relay: url, id }) + filter = beforeSendResult.filter } - trySend(['REQ', channel, ...filters]) - eventListeners[channel] = cb - eoseListeners[channel] = eoseCb - openSubs[channel] = filters - isSetToSkipVerification[channel] = skipVerification + trySend(['REQ', id, ...filter]) + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } - const activeCallback = cb - const activeFilters = filters + const activeFilters = filter const activeBeforeSend = beforeSend return { sub: ({ - cb = activeCallback, filter = activeFilters, beforeSend = activeBeforeSend - }) => sub({cb, filter, beforeSend, skipVerification}, channel, eoseCb), + }) => sub({ filter, beforeSend, skipVerification }, id), unsub: () => { - delete openSubs[channel] - delete eventListeners[channel] - delete eoseListeners[channel] - delete isSetToSkipVerification[channel] - trySend(['CLOSE', channel]) + delete openSubs[id] + delete listeners.event[id] + delete listeners.eose[id] + trySend(['CLOSE', id]) } } } + function on(type, cb, id = '_') { + listeners[type][id] = listeners[type][id] || [] + listeners[type][id].push(cb) + } + + function off(type, cb, id = '_') { + if (!listeners[type][id].length) return + let index = listeners[type][id].indexOf(cb) + if (index !== -1) listeners[type][id].splice(index, 1) + } + return { url, sub, + on, + off, async publish(event, statusCallback) { try { await trySend(['EVENT', event]) if (statusCallback) { statusCallback(0) - let {unsub} = sub( + let { unsub } = sub( { cb: () => { statusCallback(1) unsub() clearTimeout(willUnsub) }, - filter: {ids: [event.id]} + filter: { ids: [event.id] } }, `monitor-${event.id.slice(0, 5)}` ) @@ -200,6 +207,7 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { if (statusCallback) statusCallback(-1) } }, + connect, close() { ws.close() },