diff --git a/README.md b/README.md index a700576..033c125 100644 --- a/README.md +++ b/README.md @@ -14,41 +14,59 @@ pool.setPrivateKey('') // optional pool.addRelay('ws://some.relay.com', {read: true, write: true}) pool.addRelay('ws://other.relay.cool', {read: true, write: true}) -// example callback function for a subscription -function onEvent(event, relay) { - console.log(`got an event from ${relay.url} which is already validated.`, event) +// example callback functions for listeners +// callback functions take an object argument with following keys: +// - relay: relay url +// - type: type of listener +// - id: sub id for sub specific listeners ('EVENT' or 'EOSE') +// - event: event object, only for 'event' listener +// - notice: notice message, only for 'notice' listener +function onEvent({event, relay, type, id}) { + console.log(`got an event from ${relay} which is already validated.`, event) } +function onEose({relay, type, id}) { /* callback function here */} +function onNotice({relay, type, notice}) { /* callback function here */} +function onConnection({relay, type}) { /* callback function here */} + +// listen for messages for pool +pool.on('event', onEvent) +pool.on('connection', onConnection) +pool.on('notice', onNotice) // subscribing to a single user // author is the user's public key -pool.sub({cb: onEvent, filter: {author: ''}}) +pool.sub({filter: {author: ''}}) // or bulk follow -pool.sub({cb:(event, relay) => {...}, filter: {authors: ['', '', ..., '']}}) +pool.sub({filter: {authors: ['', '', ..., '']}}) // reuse a subscription channel -const mySubscription = pool.sub({cb: ..., filter: ....}) +const mySubscription = pool.sub({filter: ...., skipVerification: false, beforeSend: ....}) mySubscription.sub({filter: ....}) -mySubscription.sub({cb: ...}) +mySubscription.sub({skipVerification: true}) + +// listen for messages for subscription +mySubscription.on('event', onEvent) +mySubscription.on('eose', onEose) + +// close subscription mySubscription.unsub() // get specific event -const specificChannel = pool.sub({ - cb: (event, relay) => { +const specificChannel = pool.sub({ filter: {id: ''}}) + .on('event', ({event, relay}) => { console.log('got specific event from relay', event, relay) specificChannel.unsub() - }, - filter: {id: ''} -}) + }) // or get a specific event plus all the events that reference it in the 'e' tag -pool.sub({ cb: (event, relay) => { ... }, filter: [{id: ''}, {'#e': ''}] }) +pool.sub({ filter: [{id: ''}, {'#e': ''}] }) // get all events -pool.sub({cb: (event, relay) => {...}, filter: {}}) +pool.sub({ filter: {} }) // get recent events -pool.sub({cb: (event, relay) => {...}, filter: {since: timestamp}}) +pool.sub({ filter: {since: timestamp} }) // publishing events(inside an async function): const ev = await pool.publish(eventObject, (status, url) => { diff --git a/index.js b/index.js index d7d4c31..a742c3a 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,6 @@ -import {generatePrivateKey, getPublicKey} from './keys.js' -import {relayConnect} from './relay.js' -import {relayPool} from './pool.js' +import { generatePrivateKey, getPublicKey } from './keys.js' +import { relayInit } from './relay.js' +import { relayPool } from './pool.js' import { getBlankEvent, signEvent, @@ -9,11 +9,11 @@ import { serializeEvent, getEventHash } from './event.js' -import {matchFilter, matchFilters} from './filter.js' +import { matchFilter, matchFilters } from './filter.js' export { generatePrivateKey, - relayConnect, + relayInit, relayPool, signEvent, validateEvent, @@ -25,3 +25,4 @@ export { matchFilter, matchFilters } + diff --git a/pool.js b/pool.js index 5e60c28..c7fe148 100644 --- a/pool.js +++ b/pool.js @@ -1,5 +1,5 @@ -import {getEventHash, verifySignature, signEvent} from './event.js' -import {relayConnect, normalizeRelayURL} from './relay.js' +import { getEventHash, verifySignature, signEvent } from './event.js' +import { relayInit, normalizeRelayURL } from './relay.js' export function relayPool() { var globalPrivateKey @@ -14,88 +14,107 @@ export function relayPool() { // been published -- or at least attempted to be published -- to all relays wait: false } - - //map with all the relays where the url is the id - //Map + + // map with all the relays where the url is the id + // Map const relays = {} - const noticeCallbacks = [] - - function propagateNotice(notice, relayURL) { - for (let i = 0; i < noticeCallbacks.length; i++) { - let {relay} = relays[relayURL] - noticeCallbacks[i](notice, relay) - } - } - + const openSubs = {} const activeSubscriptions = {} + const poolListeners = { notice: [], connection: [], disconnection: [], error: [] } - //sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function } - const sub = ({cb, filter, beforeSend}, id, cbEose) => { - - //check if it has an id, if not assign one + // sub creates a Subscription object {sub:Function, unsub:Function, addRelay:Function,removeRelay :Function } + const sub = ({ filter, beforeSend, skipVerification }, id) => { + + // check if it has an id, if not assign one if (!id) id = Math.random().toString().slice(2) + // save sub settings + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } + + const subListeners = { event: [], eose: [] } const subControllers = Object.fromEntries( - //Convert the map to a Relay[] + // Convert the map to a Relay[] Object.values(relays) - //takes only relays that can be read - .filter(({policy}) => policy.read) - //iterate all the rellies and create the array [url:string,sub:SubscriptionCallback] - .map(({relay}) => [ + // takes only relays that can be read + .filter(({ policy }) => policy.read) + // iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners] + .map(({ relay }) => [ relay.url, - relay.sub({cb: event => cb(event, relay.url), filter, beforeSend}, id, - () => cbEose(relay.url)) + relay.sub(openSubs[id], id), ]) ) - const activeCallback = cb //assign callback for the suscription - const activeFilters = filter //assigng filter for the suscrition - const activeBeforeSend = beforeSend //assign before send fucntion - - //Unsub deletes itself + // Unsub deletes itself const unsub = () => { - //iterate the map of subControllers and call the unsub function of it relays + // iterate the map of subControllers and call the unsub function of it relays Object.values(subControllers).forEach(sub => sub.unsub()) - delete activeSubscriptions[id] + delete openSubs[id] + delete activeSubscriptions[id] } - - + + const sub = ({ - cb = activeCallback, - filter = activeFilters, - beforeSend = activeBeforeSend - }) => { - //Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) - Object.entries(subControllers).map(([relayURL, sub]) => [ - relayURL, - sub.sub({cb: event => cb(event, relayURL), filter, beforeSend}, id, - () => cbEose(relayURL)) - ]) - //returns the current suscripcion + filter = openSubs[id].filter, + beforeSend = openSubs[id].beforeSend, + skipVerification = openSubs[id].skipVerification } + ) => { + // update sub settings + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } + // update relay subs + Object.entries(subControllers).forEach(([relayURL, sub]) => { + sub.sub(openSubs[id], id) + }) + + // returns the current suscripcion return activeSubscriptions[id] } - //addRelay adds a relay to the subControllers map so the current subscription can use it + // addRelay adds a relay to the subControllers map so the current subscription can use it const addRelay = relay => { - subControllers[relay.url] = relay.sub( - {cb: event => cb(event, relay.url), filter, beforeSend}, - id, () => cbEose(relay.url) - ) + for (let type of Object.keys(subListeners)) { + if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id)) + } + subControllers[relay.url] = relay.sub(openSubs[id], id) return activeSubscriptions[id] } - //removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay + // removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay const removeRelay = relayURL => { if (relayURL in subControllers) { subControllers[relayURL].unsub() + delete subControllers[relayURL] if (Object.keys(subControllers).length === 0) unsub() } return activeSubscriptions[id] } + // on creates listener for sub ('EVENT', 'EOSE', etc) + const on = (type, cb) => { + subListeners[type].push(cb) + Object.values(relays).filter(({ policy }) => policy.read).forEach(({ relay }) => relay.on(type, cb, id)) + return activeSubscriptions[id] + } + // off destroys listener for sub ('EVENT', 'EOSE', etc) + const off = (type, cb) => { + if (!subListeners[type].length) return + let index = subListeners[type].indexOf(cb) + if (index !== -1) subListeners[type].splice(index, 1) + Object.values(relays).forEach(({ relay }) => relay.off(type, cb, id)) + return activeSubscriptions[id] + } - //add the object created to activeSubscriptions map + // add the object created to activeSubscriptions map activeSubscriptions[id] = { sub, unsub, addRelay, - removeRelay + removeRelay, + on, + off } return activeSubscriptions[id] @@ -113,64 +132,72 @@ export function relayPool() { setPolicy(key, value) { poolPolicy[key] = value }, - //addRelay adds a relay to the pool and to all its subscriptions - addRelay(url, policy = {read: true, write: true}) { + // addRelay adds a relay to the pool and to all its subscriptions + addRelay(url, policy = { read: true, write: true }) { let relayURL = normalizeRelayURL(url) if (relayURL in relays) return - let relay = relayConnect(url, notice => { - propagateNotice(notice, relayURL) - }) - relays[relayURL] = {relay, policy} + let relay = relayInit(url) + + for (let type of Object.keys(poolListeners)) { + let cbs = poolListeners[type] || [] + if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb)) + } if (policy.read) { - Object.values(activeSubscriptions).forEach(subscription => - subscription.addRelay(relay) - ) + Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay)) } + relay.connect() + relays[relayURL] = { relay, policy } return relay }, - //remove relay deletes the relay from the pool and from all its subscriptions + // remove relay deletes the relay from the pool and from all its subscriptions removeRelay(url) { let relayURL = normalizeRelayURL(url) let data = relays[relayURL] if (!data) return - let {relay} = data - Object.values(activeSubscriptions).forEach(subscription => - subscription.removeRelay(relay) - ) + let { relay } = data + Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL)) relay.close() delete relays[relayURL] }, - //getRelayList return an array with all the relays stored + // getRelayList return an array with all the relays stored getRelayList() { return Object.values(relays) }, - relayChangePolicy(url, policy = {read: true, write: true}) { + relayChangePolicy(url, policy = { read: true, write: true }) { let relayURL = normalizeRelayURL(url) let data = relays[relayURL] if (!data) return - relays[relayURL].policy = policy + let { relay } = data + if (relays[relayURL].policy.read === true && policy.read === false) + Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL)) + else if (relays[relayURL].policy.read === false && policy.read === true) + Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay)); + relays[relayURL].policy = policy return relays[relayURL] }, - onNotice(cb) { - noticeCallbacks.push(cb) + on(type, cb) { + poolListeners[type] = poolListeners[type] || [] + poolListeners[type].push(cb) + Object.values(relays).forEach(({ relay }) => relay.on(type, cb)) }, - offNotice(cb) { - let index = noticeCallbacks.indexOf(cb) - if (index !== -1) noticeCallbacks.splice(index, 1) + off(type, cb) { + let index = poolListeners[type].indexOf(cb) + if (index !== -1) poolListeners[type].splice(index, 1) + Object.values(relays).forEach(({ relay }) => relay.off(type, cb)) }, - - //publish send a event to the relays + + // publish send a event to the relays async publish(event, statusCallback) { event.id = getEventHash(event) - - //if the event is not signed then sign it + + // if the event is not signed then sign it if (!event.sig) { event.tags = event.tags || [] @@ -195,9 +222,9 @@ export function relayPool() { } } - //get the writable relays + // get the writable relays let writeable = Object.values(relays) - .filter(({policy}) => policy.write) + .filter(({ policy }) => policy.write) .sort(() => Math.random() - 0.5) // random let maxTargets = poolPolicy.randomChoice @@ -206,10 +233,10 @@ export function relayPool() { let successes = 0 - //if the pool policy set to want until event send + // if the pool policy set to wait until event send if (poolPolicy.wait) { for (let i = 0; i < writeable.length; i++) { - let {relay} = writeable[i] + let { relay } = writeable[i] try { await new Promise(async (resolve, reject) => { @@ -231,9 +258,9 @@ export function relayPool() { /***/ } } - //if the pool policy dont want to wait until event send + // if the pool policy dont want to wait until event send } else { - writeable.forEach(async ({relay}) => { + writeable.forEach(async ({ relay }) => { let callback = statusCallback ? status => statusCallback(status, relay.url) : null diff --git a/relay.js b/relay.js index 73fc3ae..d55a41b 100644 --- a/relay.js +++ b/relay.js @@ -2,8 +2,8 @@ import 'websocket-polyfill' -import {verifySignature, validateEvent} from './event.js' -import {matchFilters} from './filter.js' +import { verifySignature, validateEvent } from './event.js' +import { matchFilters } from './filter.js' export function normalizeRelayURL(url) { let [host, ...qs] = url.trim().split('?') @@ -13,12 +13,19 @@ export function normalizeRelayURL(url) { return [host, ...qs].join('?') } -export function relayConnect(url, onNotice = () => {}, onError = () => {}) { - url = normalizeRelayURL(url) +export function relayInit(url) { + let relay = normalizeRelayURL(url) // set relay url - var ws, resolveOpen, untilOpen, wasClosed + var ws, resolveOpen, untilOpen, wasClosed, closed var openSubs = {} - var isSetToSkipVerification = {} + var listeners = { + event: { '_': [] }, + eose: { '_': [] }, + connection: { '_': [] }, + disconnection: { '_': [] }, + error: { '_': [] }, + notice: { '_': [] }, + } let attemptNumber = 1 let nextAttemptSeconds = 1 @@ -28,32 +35,27 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { }) } - var eventListeners = {} - var eoseListeners = {} - - function connect() { - ws = new WebSocket(url) + function connectRelay() { + ws = new WebSocket(relay) ws.onopen = () => { - console.log('connected to', url) + listeners.connection._.forEach(cb => cb({ type: 'connection', relay })) resolveOpen() // restablish old subscriptions if (wasClosed) { wasClosed = false - for (let channel in openSubs) { - let filters = openSubs[channel] - let eventCb = eventListeners[channel] - let eoseCb = eoseListeners[channel] - sub({eventCb, filter: filters}, channel, eoseCb) + for (let id in openSubs) { + sub(openSubs[id], id) } } } - ws.onerror = err => { - console.log('error connecting to relay', url) - onError(err) + ws.onerror = error => { + listeners.error._.forEach(cb => cb({ type: 'error', relay, error })) } - ws.onclose = () => { + ws.onclose = async () => { + listeners.disconnection._.forEach(cb => cb({ type: 'disconnection', relay })) + if (closed) return resetOpenState() attemptNumber++ nextAttemptSeconds += attemptNumber ** 3 @@ -61,12 +63,12 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { nextAttemptSeconds = 14400 // 4 hours } console.log( - `relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` + `relay ${relay} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` ) setTimeout(async () => { try { - connect() - } catch (err) {} + connectRelay() + } catch (err) { } }, nextAttemptSeconds * 1000) wasClosed = true @@ -82,38 +84,33 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { if (data.length >= 1) { switch (data[0]) { - case 'NOTICE': - if (data.length !== 2) { - // ignore empty or malformed notice - return - } - console.log(`message from relay ${url}: ${data[1]}`) - onNotice(data[1]) - return - case 'EOSE': - if (data.length !== 2) { - // ignore malformed EOSE - return - } - console.log(`Channel ${data[1]}: End-of-stored-events`) - if (eoseListeners[data[1]]) { - eoseListeners[data[1]]() - } - return case 'EVENT': - if (data.length !== 3) { - // ignore malformed EVENT - return - } - let channel = data[1] + if (data.length !== 3) return // ignore empty or malformed EVENT + + let id = data[1] let event = data[2] - if (validateEvent(event) && - (isSetToSkipVerification[channel] || verifySignature(event)) && - eventListeners[channel] && - matchFilters(openSubs[channel], event)) { - eventListeners[channel](event) + if (validateEvent(event) && openSubs[id] && + (openSubs[id].skipVerification || verifySignature(event)) && + matchFilters(openSubs[id].filter, event) + ) { + if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb({ type: 'event', relay, id, event })) + if (listeners.event._.length) listeners.event._.forEach(cb => cb({ type: 'event', relay, id, event })) } return + case 'EOSE': { + if (data.length !== 2) return // ignore empty or malformed EOSE + + let id = data[1] + if (listeners.eose[id]?.length) listeners.eose[data[1]].forEach(cb => cb({ type: 'eose', relay, id })) + if (listeners.eose._.length) listeners.eose._.forEach(cb => cb({ type: 'eose', relay, id })) + return + } + case 'NOTICE': + if (data.length !== 2) return // ignore empty or malformed NOTICE + + let notice = data[1] + if (listeners.notice._.length) listeners.notice._.forEach(cb => cb({ type: 'notice', relay, notice })) + return } } } @@ -121,9 +118,12 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { resetOpenState() - try { - connect() - } catch (err) {} + async function connect() { + if (ws?.readyState && ws.readyState === 1) return // ws already open + try { + connectRelay() + } catch (err) { } + } async function trySend(params) { let msg = JSON.stringify(params) @@ -132,75 +132,79 @@ export function relayConnect(url, onNotice = () => {}, onError = () => {}) { ws.send(msg) } - const sub = ( - {cb, filter, beforeSend, skipVerification}, - channel = Math.random().toString().slice(2), - eoseCb - ) => { + const sub = ({ filter, beforeSend, skipVerification }, id = Math.random().toString().slice(2)) => { var filters = [] if (Array.isArray(filter)) { filters = filter } else { filters.push(filter) } + filter = filters if (beforeSend) { - const beforeSendResult = beforeSend({filter, relay: url, channel}) - filters = beforeSendResult.filter + const beforeSendResult = beforeSend({ filter, relay, id }) + filter = beforeSendResult.filter } - trySend(['REQ', channel, ...filters]) - eventListeners[channel] = cb - eoseListeners[channel] = eoseCb - openSubs[channel] = filters - isSetToSkipVerification[channel] = skipVerification - - const activeCallback = cb - const activeFilters = filters - const activeBeforeSend = beforeSend + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } + trySend(['REQ', id, ...filter]) return { sub: ({ - cb = activeCallback, - filter = activeFilters, - beforeSend = activeBeforeSend - }) => sub({cb, filter, beforeSend, skipVerification}, channel, eoseCb), + filter = openSubs[id].filter, + beforeSend = openSubs[id].beforeSend, + skipVerification = openSubs[id].skipVerification } + ) => sub({ filter, beforeSend, skipVerification }, id), unsub: () => { - delete openSubs[channel] - delete eventListeners[channel] - delete eoseListeners[channel] - delete isSetToSkipVerification[channel] - trySend(['CLOSE', channel]) + delete openSubs[id] + delete listeners.event[id] + delete listeners.eose[id] + trySend(['CLOSE', id]) } } } + function on(type, cb, id = '_') { + listeners[type][id] = listeners[type][id] || [] + listeners[type][id].push(cb) + } + + function off(type, cb, id = '_') { + if (!listeners[type][id].length) return + let index = listeners[type][id].indexOf(cb) + if (index !== -1) listeners[type][id].splice(index, 1) + } + return { url, sub, + on, + off, async publish(event, statusCallback) { try { await trySend(['EVENT', event]) if (statusCallback) { + let id = `monitor-${event.id.slice(0, 5)}` statusCallback(0) - let {unsub} = sub( - { - cb: () => { - statusCallback(1) - unsub() - clearTimeout(willUnsub) - }, - filter: {ids: [event.id]} - }, - `monitor-${event.id.slice(0, 5)}` - ) + let { unsub } = sub({ filter: { ids: [event.id] } }, id) + on('event', () => { + statusCallback(1) + unsub() + clearTimeout(willUnsub) + }, id) let willUnsub = setTimeout(unsub, 5000) } } catch (err) { if (statusCallback) statusCallback(-1) } }, + connect, close() { + closed = true // prevent ws from trying to reconnect ws.close() }, get status() {