finalize cb api

This commit is contained in:
monica
2022-12-07 22:26:51 -06:00
parent b955ba2a09
commit 100c77d2aa
3 changed files with 117 additions and 90 deletions

View File

@@ -14,41 +14,59 @@ pool.setPrivateKey('<hex>') // optional
pool.addRelay('ws://some.relay.com', {read: true, write: true}) pool.addRelay('ws://some.relay.com', {read: true, write: true})
pool.addRelay('ws://other.relay.cool', {read: true, write: true}) pool.addRelay('ws://other.relay.cool', {read: true, write: true})
// example callback function for a subscription // example callback functions for listeners
function onEvent(event, relay) { // callback functions take an object argument with following keys:
console.log(`got an event from ${relay.url} which is already validated.`, event) // - relay: relay url
// - type: type of listener
// - id: sub id for sub specific listeners ('EVENT' or 'EOSE')
// - event: event object, only for 'event' listener
// - notice: notice message, only for 'notice' listener
function onEvent({event, relay, type, id}) {
console.log(`got an event from ${relay} which is already validated.`, event)
} }
function onEose({relay, type, id}) { /* callback function here */}
function onNotice({relay, type, notice}) { /* callback function here */}
function onConnection({relay, type}) { /* callback function here */}
// listen for messages for pool
pool.on('event', onEvent)
pool.on('connection', onConnection)
pool.on('notice', onNotice)
// subscribing to a single user // subscribing to a single user
// author is the user's public key // author is the user's public key
pool.sub({cb: onEvent, filter: {author: '<hex>'}}) pool.sub({filter: {author: '<hex>'}})
// or bulk follow // or bulk follow
pool.sub({cb:(event, relay) => {...}, filter: {authors: ['<hex1>', '<hex2>', ..., '<hexn>']}}) pool.sub({filter: {authors: ['<hex1>', '<hex2>', ..., '<hexn>']}})
// reuse a subscription channel // reuse a subscription channel
const mySubscription = pool.sub({cb: ..., filter: ....}) const mySubscription = pool.sub({filter: ...., skipVerification: false, beforeSend: ....})
mySubscription.sub({filter: ....}) mySubscription.sub({filter: ....})
mySubscription.sub({cb: ...}) mySubscription.sub({skipVerification: true})
// listen for messages for subscription
mySubscription.on('event', onEvent)
mySubscription.on('eose', onEose)
// close subscription
mySubscription.unsub() mySubscription.unsub()
// get specific event // get specific event
const specificChannel = pool.sub({ const specificChannel = pool.sub({ filter: {id: '<hex>'}})
cb: (event, relay) => { .on('event', ({event, relay}) => {
console.log('got specific event from relay', event, relay) console.log('got specific event from relay', event, relay)
specificChannel.unsub() specificChannel.unsub()
}, })
filter: {id: '<hex>'}
})
// or get a specific event plus all the events that reference it in the 'e' tag // or get a specific event plus all the events that reference it in the 'e' tag
pool.sub({ cb: (event, relay) => { ... }, filter: [{id: '<hex>'}, {'#e': '<hex>'}] }) pool.sub({ filter: [{id: '<hex>'}, {'#e': '<hex>'}] })
// get all events // get all events
pool.sub({cb: (event, relay) => {...}, filter: {}}) pool.sub({ filter: {} })
// get recent events // get recent events
pool.sub({cb: (event, relay) => {...}, filter: {since: timestamp}}) pool.sub({ filter: {since: timestamp} })
// publishing events(inside an async function): // publishing events(inside an async function):
const ev = await pool.publish(eventObject, (status, url) => { const ev = await pool.publish(eventObject, (status, url) => {

61
pool.js
View File

@@ -18,6 +18,7 @@ export function relayPool() {
// map with all the relays where the url is the id // map with all the relays where the url is the id
// Map<string,{relay:Relay,policy:RelayPolicy> // Map<string,{relay:Relay,policy:RelayPolicy>
const relays = {} const relays = {}
const openSubs = {}
const activeSubscriptions = {} const activeSubscriptions = {}
const poolListeners = { notice: [], connection: [], disconnection: [], error: [] } const poolListeners = { notice: [], connection: [], disconnection: [], error: [] }
@@ -26,7 +27,14 @@ export function relayPool() {
// check if it has an id, if not assign one // check if it has an id, if not assign one
if (!id) id = Math.random().toString().slice(2) if (!id) id = Math.random().toString().slice(2)
const subListeners = {} // save sub settings
openSubs[id] = {
filter,
beforeSend,
skipVerification,
}
const subListeners = { event: [], eose: [] }
const subControllers = Object.fromEntries( const subControllers = Object.fromEntries(
// Convert the map<string,Relay> to a Relay[] // Convert the map<string,Relay> to a Relay[]
Object.values(relays) Object.values(relays)
@@ -35,30 +43,33 @@ export function relayPool() {
// iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners] // iterate all the relays and create the array [url:string,sub:SubscriptionCallback, listeners]
.map(({ relay }) => [ .map(({ relay }) => [
relay.url, relay.url,
relay.sub({ filter, beforeSend, skipVerification }, id), relay.sub(openSubs[id], id),
]) ])
) )
const activeFilters = filter // assigng filter for the suscrition
const activeBeforeSend = beforeSend // assign before send fucntion
const activeSkipVerification = skipVerification // assign skipVerification
// Unsub deletes itself // Unsub deletes itself
const unsub = () => { const unsub = () => {
// iterate the map of subControllers and call the unsub function of it relays // iterate the map of subControllers and call the unsub function of it relays
Object.values(subControllers).forEach(sub => sub.unsub()) Object.values(subControllers).forEach(sub => sub.unsub())
delete openSubs[id]
delete activeSubscriptions[id] delete activeSubscriptions[id]
} }
const sub = ({ const sub = ({
filter = activeFilters, filter = openSubs[id].filter,
beforeSend = activeBeforeSend, beforeSend = openSubs[id].beforeSend,
skipVerification = activeSkipVerification skipVerification = openSubs[id].skipVerification }
}) => { ) => {
// Iterates the subControllers and add them the atributes of our Subscription (callback, filter,etc.) // update sub settings
openSubs[id] = {
filter,
beforeSend,
skipVerification,
}
// update relay subs
Object.entries(subControllers).forEach(([relayURL, sub]) => { Object.entries(subControllers).forEach(([relayURL, sub]) => {
sub.sub({ filter, beforeSend, skipVerification }, id) sub.sub(openSubs[id], id)
}) })
// returns the current suscripcion // returns the current suscripcion
@@ -69,13 +80,14 @@ export function relayPool() {
for (let type of Object.keys(subListeners)) { for (let type of Object.keys(subListeners)) {
if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id)) if (subListeners[type].length) subListeners[type].forEach(cb => relay.on(type, cb, id))
} }
subControllers[relay.url] = relay.sub({ filter, beforeSend, skipVerification }, id) // TODO filter/before send should reference updated filter/beforesend subControllers[relay.url] = relay.sub(openSubs[id], id)
return activeSubscriptions[id] return activeSubscriptions[id]
} }
// removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay // removeRelay deletes a relay from the subControllers map, it also handles the unsubscription from the relay
const removeRelay = relayURL => { const removeRelay = relayURL => {
if (relayURL in subControllers) { if (relayURL in subControllers) {
subControllers[relayURL].unsub() subControllers[relayURL].unsub()
delete subControllers[relayURL]
if (Object.keys(subControllers).length === 0) unsub() if (Object.keys(subControllers).length === 0) unsub()
} }
return activeSubscriptions[id] return activeSubscriptions[id]
@@ -83,7 +95,7 @@ export function relayPool() {
// on creates listener for sub ('EVENT', 'EOSE', etc) // on creates listener for sub ('EVENT', 'EOSE', etc)
const on = (type, cb) => { const on = (type, cb) => {
subListeners[type].push(cb) subListeners[type].push(cb)
Object.values(relays).forEach(({ relay }) => relay.on(type, cb, id)) Object.values(relays).filter(({ policy }) => policy.read).forEach(({ relay }) => relay.on(type, cb, id))
return activeSubscriptions[id] return activeSubscriptions[id]
} }
// off destroys listener for sub ('EVENT', 'EOSE', etc) // off destroys listener for sub ('EVENT', 'EOSE', etc)
@@ -131,14 +143,12 @@ export function relayPool() {
let cbs = poolListeners[type] || [] let cbs = poolListeners[type] || []
if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb)) if (cbs.length) poolListeners[type].forEach(cb => relay.on(type, cb))
} }
relay.connect()
relays[relayURL] = { relay: relay, policy }
if (policy.read) { if (policy.read) {
Object.values(activeSubscriptions).forEach(subscription => { Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay))
subscription.addRelay(relay)
})
} }
relay.connect()
relays[relayURL] = { relay, policy }
return relay return relay
}, },
@@ -149,9 +159,7 @@ export function relayPool() {
if (!data) return if (!data) return
let { relay } = data let { relay } = data
Object.values(activeSubscriptions).forEach(subscription => Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL))
subscription.removeRelay(relay)
)
relay.close() relay.close()
delete relays[relayURL] delete relays[relayURL]
}, },
@@ -165,8 +173,13 @@ export function relayPool() {
let data = relays[relayURL] let data = relays[relayURL]
if (!data) return if (!data) return
relays[relayURL].policy = policy let { relay } = data
if (relays[relayURL].policy.read === true && policy.read === false)
Object.values(activeSubscriptions).forEach(sub => sub.removeRelay(relayURL))
else if (relays[relayURL].policy.read === false && policy.read === true)
Object.values(activeSubscriptions).forEach(sub => sub.addRelay(relay));
relays[relayURL].policy = policy
return relays[relayURL] return relays[relayURL]
}, },
on(type, cb) { on(type, cb) {
@@ -220,7 +233,7 @@ export function relayPool() {
let successes = 0 let successes = 0
// if the pool policy set to want until event send // if the pool policy set to wait until event send
if (poolPolicy.wait) { if (poolPolicy.wait) {
for (let i = 0; i < writeable.length; i++) { for (let i = 0; i < writeable.length; i++) {
let { relay } = writeable[i] let { relay } = writeable[i]

View File

@@ -14,9 +14,9 @@ export function normalizeRelayURL(url) {
} }
export function relayInit(url) { export function relayInit(url) {
url = normalizeRelayURL(url) let relay = normalizeRelayURL(url) // set relay url
var ws, resolveOpen, untilOpen, wasClosed var ws, resolveOpen, untilOpen, wasClosed, closed
var openSubs = {} var openSubs = {}
var listeners = { var listeners = {
event: { '_': [] }, event: { '_': [] },
@@ -36,11 +36,10 @@ export function relayInit(url) {
} }
function connectRelay() { function connectRelay() {
ws = new WebSocket(url) ws = new WebSocket(relay)
ws.onopen = () => { ws.onopen = () => {
console.log('connected to', url) listeners.connection._.forEach(cb => cb({ type: 'connection', relay }))
listeners.connection._.forEach(cb => cb(url))
resolveOpen() resolveOpen()
// restablish old subscriptions // restablish old subscriptions
@@ -51,12 +50,12 @@ export function relayInit(url) {
} }
} }
} }
ws.onerror = err => { ws.onerror = error => {
console.log('error connecting to relay', url) listeners.error._.forEach(cb => cb({ type: 'error', relay, error }))
listeners.error._.forEach(cb => cb(err))
} }
ws.onclose = async () => { ws.onclose = async () => {
listeners.disconnection._.forEach(cb => cb(url)) listeners.disconnection._.forEach(cb => cb({ type: 'disconnection', relay }))
if (closed) return
resetOpenState() resetOpenState()
attemptNumber++ attemptNumber++
nextAttemptSeconds += attemptNumber ** 3 nextAttemptSeconds += attemptNumber ** 3
@@ -64,9 +63,13 @@ export function relayInit(url) {
nextAttemptSeconds = 14400 // 4 hours nextAttemptSeconds = 14400 // 4 hours
} }
console.log( console.log(
`relay ${url} connection closed. reconnecting in ${nextAttemptSeconds} seconds.` `relay ${relay} connection closed. reconnecting in ${nextAttemptSeconds} seconds.`
) )
setTimeout(await connect(), nextAttemptSeconds * 1000) setTimeout(async () => {
try {
connectRelay()
} catch (err) { }
}, nextAttemptSeconds * 1000)
wasClosed = true wasClosed = true
} }
@@ -81,36 +84,33 @@ export function relayInit(url) {
if (data.length >= 1) { if (data.length >= 1) {
switch (data[0]) { switch (data[0]) {
case 'NOTICE':
if (data.length !== 2) {
// ignore empty or malformed notice
return
}
if (listeners.notice._.length) listeners.notice._.forEach(cb => cb(data[1]))
return
case 'EOSE':
if (data.length !== 2) {
// ignore malformed EOSE
return
}
if (listeners.eose[data[1]]?.length) listeners.eose[data[1]].forEach(cb => cb())
if (listeners.eose._.length) listeners.eose._.forEach(cb => cb())
return
case 'EVENT': case 'EVENT':
if (data.length !== 3) { if (data.length !== 3) return // ignore empty or malformed EVENT
// ignore malformed EVENT
return
}
let id = data[1] let id = data[1]
let event = data[2] let event = data[2]
if (validateEvent(event) && openSubs[id] && if (validateEvent(event) && openSubs[id] &&
(openSubs[id].skipVerification || verifySignature(event)) && (openSubs[id].skipVerification || verifySignature(event)) &&
matchFilters(openSubs[id].filter, event) matchFilters(openSubs[id].filter, event)
) { ) {
if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb(event)) if (listeners.event[id]?.length) listeners.event[id].forEach(cb => cb({ type: 'event', relay, id, event }))
if (listeners.event._.length) listeners.event._.forEach(cb => cb(event)) if (listeners.event._.length) listeners.event._.forEach(cb => cb({ type: 'event', relay, id, event }))
} }
return return
case 'EOSE': {
if (data.length !== 2) return // ignore empty or malformed EOSE
let id = data[1]
if (listeners.eose[id]?.length) listeners.eose[data[1]].forEach(cb => cb({ type: 'eose', relay, id }))
if (listeners.eose._.length) listeners.eose._.forEach(cb => cb({ type: 'eose', relay, id }))
return
}
case 'NOTICE':
if (data.length !== 2) return // ignore empty or malformed NOTICE
let notice = data[1]
if (listeners.notice._.length) listeners.notice._.forEach(cb => cb({ type: 'notice', relay, notice }))
return
} }
} }
} }
@@ -119,6 +119,7 @@ export function relayInit(url) {
resetOpenState() resetOpenState()
async function connect() { async function connect() {
if (ws?.readyState && ws.readyState === 1) return // ws already open
try { try {
connectRelay() connectRelay()
} catch (err) { } } catch (err) { }
@@ -141,25 +142,23 @@ export function relayInit(url) {
filter = filters filter = filters
if (beforeSend) { if (beforeSend) {
const beforeSendResult = beforeSend({ filter, relay: url, id }) const beforeSendResult = beforeSend({ filter, relay, id })
filter = beforeSendResult.filter filter = beforeSendResult.filter
} }
trySend(['REQ', id, ...filter])
openSubs[id] = { openSubs[id] = {
filter, filter,
beforeSend, beforeSend,
skipVerification, skipVerification,
} }
trySend(['REQ', id, ...filter])
const activeFilters = filter
const activeBeforeSend = beforeSend
return { return {
sub: ({ sub: ({
filter = activeFilters, filter = openSubs[id].filter,
beforeSend = activeBeforeSend beforeSend = openSubs[id].beforeSend,
}) => sub({ filter, beforeSend, skipVerification }, id), skipVerification = openSubs[id].skipVerification }
) => sub({ filter, beforeSend, skipVerification }, id),
unsub: () => { unsub: () => {
delete openSubs[id] delete openSubs[id]
delete listeners.event[id] delete listeners.event[id]
@@ -189,18 +188,14 @@ export function relayInit(url) {
try { try {
await trySend(['EVENT', event]) await trySend(['EVENT', event])
if (statusCallback) { if (statusCallback) {
let id = `monitor-${event.id.slice(0, 5)}`
statusCallback(0) statusCallback(0)
let { unsub } = sub( let { unsub } = sub({ filter: { ids: [event.id] } }, id)
{ on('event', () => {
cb: () => { statusCallback(1)
statusCallback(1) unsub()
unsub() clearTimeout(willUnsub)
clearTimeout(willUnsub) }, id)
},
filter: { ids: [event.id] }
},
`monitor-${event.id.slice(0, 5)}`
)
let willUnsub = setTimeout(unsub, 5000) let willUnsub = setTimeout(unsub, 5000)
} }
} catch (err) { } catch (err) {
@@ -209,6 +204,7 @@ export function relayInit(url) {
}, },
connect, connect,
close() { close() {
closed = true // prevent ws from trying to reconnect
ws.close() ws.close()
}, },
get status() { get status() {