diff --git a/README.md b/README.md index a700576..033c125 100644 --- a/README.md +++ b/README.md @@ -14,41 +14,59 @@ pool.setPrivateKey('') // optional pool.addRelay('ws://some.relay.com', {read: true, write: true}) pool.addRelay('ws://other.relay.cool', {read: true, write: true}) -// example callback function for a subscription -function onEvent(event, relay) { - console.log(`got an event from ${relay.url} which is already validated.`, event) +// example callback functions for listeners +// callback functions take an object argument with following keys: +// - relay: relay url +// - type: type of listener +// - id: sub id for sub specific listeners ('EVENT' or 'EOSE') +// - event: event object, only for 'event' listener +// - notice: notice message, only for 'notice' listener +function onEvent({event, relay, type, id}) { + console.log(`got an event from ${relay} which is already validated.`, event) } +function onEose({relay, type, id}) { /* callback function here */} +function onNotice({relay, type, notice}) { /* callback function here */} +function onConnection({relay, type}) { /* callback function here */} + +// listen for messages for pool +pool.on('event', onEvent) +pool.on('connection', onConnection) +pool.on('notice', onNotice) // subscribing to a single user // author is the user's public key -pool.sub({cb: onEvent, filter: {author: ''}}) +pool.sub({filter: {author: ''}}) // or bulk follow -pool.sub({cb:(event, relay) => {...}, filter: {authors: ['', '', ..., '']}}) +pool.sub({filter: {authors: ['', '', ..., '']}}) // reuse a subscription channel -const mySubscription = pool.sub({cb: ..., filter: ....}) +const mySubscription = pool.sub({filter: ...., skipVerification: false, beforeSend: ....}) mySubscription.sub({filter: ....}) -mySubscription.sub({cb: ...}) +mySubscription.sub({skipVerification: true}) + +// listen for messages for subscription +mySubscription.on('event', onEvent) +mySubscription.on('eose', onEose) + +// close subscription mySubscription.unsub() // get specific event -const specificChannel = pool.sub({ - cb: (event, relay) => { +const specificChannel = pool.sub({ filter: {id: ''}}) + .on('event', ({event, relay}) => { console.log('got specific event from relay', event, relay) specificChannel.unsub() - }, - filter: {id: ''} -}) + }) // or get a specific event plus all the events that reference it in the 'e' tag -pool.sub({ cb: (event, relay) => { ... }, filter: [{id: ''}, {'#e': ''}] }) +pool.sub({ filter: [{id: ''}, {'#e': ''}] }) // get all events -pool.sub({cb: (event, relay) => {...}, filter: {}}) +pool.sub({ filter: {} }) // get recent events -pool.sub({cb: (event, relay) => {...}, filter: {since: timestamp}}) +pool.sub({ filter: {since: timestamp} }) // publishing events(inside an async function): const ev = await pool.publish(eventObject, (status, url) => { diff --git a/pool.js b/pool.js index 748cdd3..c7fe148 100644 --- a/pool.js +++ b/pool.js @@ -18,6 +18,7 @@ export function relayPool() { // map with all the relays where the url is the id // Map const relays = {} + const openSubs = {} const activeSubscriptions = {} const poolListeners = { notice: [], connection: [], disconnection: [], error: [] } @@ -26,7 +27,14 @@ export function relayPool() { // check if it has an id, if not assign one if (!id) id = Math.random().toString().slice(2) - const subListeners = {} + // save sub settings + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } + + const subListeners = { event: [], eose: [] } const subControllers = Object.fromEntries( // Convert the map to a Relay[] Object.values(relays) @@ -35,30 +43,33 @@ export function relayPool() { // iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners] .map(({ relay }) => [ relay.url, - relay.sub({ filter, beforeSend, skipVerification }, id), + relay.sub(openSubs[id], id), ]) ) - const activeFilters = filter // assigng filter for the suscrition - const activeBeforeSend = beforeSend // assign before send fucntion - const activeSkipVerification = skipVerification // assign skipVerification - // Unsub deletes itself const unsub = () => { // iterate the map of subControllers and call the unsub function of it relays Object.values(subControllers).forEach(sub => sub.unsub()) + delete openSubs[id] delete activeSubscriptions[id] } const sub = ({ - filter = activeFilters, - beforeSend = activeBeforeSend, - skipVerification = activeSkipVerification - }) => { - // Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) + filter = openSubs[id].filter, + beforeSend = openSubs[id].beforeSend, + skipVerification = openSubs[id].skipVerification } + ) => { + // update sub settings + openSubs[id] = { + filter, + beforeSend, + skipVerification, + } + // update relay subs Object.entries(subControllers).forEach(([relayURL, sub]) => { - sub.sub({ filter, beforeSend, skipVerification }, id) + sub.sub(openSubs[id], id) }) // returns the current suscripcion @@ -69,13 +80,14 @@ export function relayPool() { for (let type of Object.keys(subListeners)) { if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id)) } - subControllers[relay.url] = relay.sub({ filter, beforeSend, skipVerification }, id) // TODO filter/before send should reference updated filter/beforesend + subControllers[relay.url] = relay.sub(openSubs[id], id) return activeSubscriptions[id] } // removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay const removeRelay = relayURL => { if (relayURL in subControllers) { subControllers[relayURL].unsub() + delete subControllers[relayURL] if (Object.keys(subControllers).length === 0) unsub() } return activeSubscriptions[id] @@ -83,7 +95,7 @@ export function relayPool() { // on creates listener for sub ('EVENT', 'EOSE', etc) const on = (type, cb) => { subListeners[type].push(cb) - Object.values(relays).forEach(({ relay }) => relay.on(type, cb, id)) + Object.values(relays).filter(({ policy }) => policy.read).forEach(({ relay }) => relay.on(type, cb, id)) return activeSubscriptions[id] } // off destroys listener for sub ('EVENT', 'EOSE', etc) @@ -131,14 +143,12 @@ export function relayPool() { let cbs = poolListeners[type] || [] if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb)) } - relay.connect() - relays[relayURL] = { relay: relay, policy } if (policy.read) { - Object.values(activeSubscriptions).forEach(subscription => { - subscription.addRelay(relay) - }) + Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay)) } + relay.connect() + relays[relayURL] = { relay, policy } return relay }, @@ -149,9 +159,7 @@ export function relayPool() { if (!data) return let { relay } = data - Object.values(activeSubscriptions).forEach(subscription => - subscription.removeRelay(relay) - ) + Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL)) relay.close() delete relays[relayURL] }, @@ -165,8 +173,13 @@ export function relayPool() { let data = relays[relayURL] if (!data) return - relays[relayURL].policy = policy + let { relay } = data + if (relays[relayURL].policy.read === true && policy.read === false) + Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL)) + else if (relays[relayURL].policy.read === false && policy.read === true) + Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay)); + relays[relayURL].policy = policy return relays[relayURL] }, on(type, cb) { @@ -220,7 +233,7 @@ export function relayPool() { let successes = 0 - // if the pool policy set to want until event send + // if the pool policy set to wait until event send if (poolPolicy.wait) { for (let i = 0; i < writeable.length; i++) { let { relay } = writeable[i] diff --git a/relay.js b/relay.js index 32a8aab..d55a41b 100644 --- a/relay.js +++ b/relay.js @@ -14,9 +14,9 @@ export function normalizeRelayURL(url) { } export function relayInit(url) { - url = normalizeRelayURL(url) + let relay = normalizeRelayURL(url) // set relay url - var ws, resolveOpen, untilOpen, wasClosed + var ws, resolveOpen, untilOpen, wasClosed, closed var openSubs = {} var listeners = { event: { '_': [] }, @@ -36,11 +36,10 @@ export function relayInit(url) { } function connectRelay() { - ws = new WebSocket(url) + ws = new WebSocket(relay) ws.onopen = () => { - console.log('connected to', url) - listeners.connection._.forEach(cb => cb(url)) + listeners.connection._.forEach(cb => cb({ type: 'connection', relay })) resolveOpen() // restablish old subscriptions @@ -51,12 +50,12 @@ export function relayInit(url) { } } } - ws.onerror = err => { - console.log('error connecting to relay', url) - listeners.error._.forEach(cb => cb(err)) + ws.onerror = error => { + listeners.error._.forEach(cb => cb({ type: 'error', relay, error })) } ws.onclose = async () => { - listeners.disconnection._.forEach(cb => cb(url)) + listeners.disconnection._.forEach(cb => cb({ type: 'disconnection', relay })) + if (closed) return resetOpenState() attemptNumber++ nextAttemptSeconds += attemptNumber ** 3 @@ -64,9 +63,13 @@ export function relayInit(url) { nextAttemptSeconds = 14400 // 4 hours } console.log( - `relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` + `relay ${relay} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` ) - setTimeout(await connect(), nextAttemptSeconds * 1000) + setTimeout(async () => { + try { + connectRelay() + } catch (err) { } + }, nextAttemptSeconds * 1000) wasClosed = true } @@ -81,36 +84,33 @@ export function relayInit(url) { if (data.length >= 1) { switch (data[0]) { - case 'NOTICE': - if (data.length !== 2) { - // ignore empty or malformed notice - return - } - if (listeners.notice._.length) listeners.notice._.forEach(cb => cb(data[1])) - return - case 'EOSE': - if (data.length !== 2) { - // ignore malformed EOSE - return - } - if (listeners.eose[data[1]]?.length) listeners.eose[data[1]].forEach(cb => cb()) - if (listeners.eose._.length) listeners.eose._.forEach(cb => cb()) - return case 'EVENT': - if (data.length !== 3) { - // ignore malformed EVENT - return - } + if (data.length !== 3) return // ignore empty or malformed EVENT + let id = data[1] let event = data[2] if (validateEvent(event) && openSubs[id] && (openSubs[id].skipVerification || verifySignature(event)) && matchFilters(openSubs[id].filter, event) ) { - if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb(event)) - if (listeners.event._.length) listeners.event._.forEach(cb => cb(event)) + if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb({ type: 'event', relay, id, event })) + if (listeners.event._.length) listeners.event._.forEach(cb => cb({ type: 'event', relay, id, event })) } return + case 'EOSE': { + if (data.length !== 2) return // ignore empty or malformed EOSE + + let id = data[1] + if (listeners.eose[id]?.length) listeners.eose[data[1]].forEach(cb => cb({ type: 'eose', relay, id })) + if (listeners.eose._.length) listeners.eose._.forEach(cb => cb({ type: 'eose', relay, id })) + return + } + case 'NOTICE': + if (data.length !== 2) return // ignore empty or malformed NOTICE + + let notice = data[1] + if (listeners.notice._.length) listeners.notice._.forEach(cb => cb({ type: 'notice', relay, notice })) + return } } } @@ -119,6 +119,7 @@ export function relayInit(url) { resetOpenState() async function connect() { + if (ws?.readyState && ws.readyState === 1) return // ws already open try { connectRelay() } catch (err) { } @@ -141,25 +142,23 @@ export function relayInit(url) { filter = filters if (beforeSend) { - const beforeSendResult = beforeSend({ filter, relay: url, id }) + const beforeSendResult = beforeSend({ filter, relay, id }) filter = beforeSendResult.filter } - trySend(['REQ', id, ...filter]) openSubs[id] = { filter, beforeSend, skipVerification, } - - const activeFilters = filter - const activeBeforeSend = beforeSend + trySend(['REQ', id, ...filter]) return { sub: ({ - filter = activeFilters, - beforeSend = activeBeforeSend - }) => sub({ filter, beforeSend, skipVerification }, id), + filter = openSubs[id].filter, + beforeSend = openSubs[id].beforeSend, + skipVerification = openSubs[id].skipVerification } + ) => sub({ filter, beforeSend, skipVerification }, id), unsub: () => { delete openSubs[id] delete listeners.event[id] @@ -189,18 +188,14 @@ export function relayInit(url) { try { await trySend(['EVENT', event]) if (statusCallback) { + let id = `monitor-${event.id.slice(0, 5)}` statusCallback(0) - let { unsub } = sub( - { - cb: () => { - statusCallback(1) - unsub() - clearTimeout(willUnsub) - }, - filter: { ids: [event.id] } - }, - `monitor-${event.id.slice(0, 5)}` - ) + let { unsub } = sub({ filter: { ids: [event.id] } }, id) + on('event', () => { + statusCallback(1) + unsub() + clearTimeout(willUnsub) + }, id) let willUnsub = setTimeout(unsub, 5000) } } catch (err) { @@ -209,6 +204,7 @@ export function relayInit(url) { }, connect, close() { + closed = true // prevent ws from trying to reconnect ws.close() }, get status() {