nip01: change the way to request to watch keys and get updates.

This commit is contained in:
fiatjaf 2020-11-22 18:42:00 -03:00
parent 90567674c1
commit 1eb0351d94
7 changed files with 234 additions and 88 deletions

View File

@ -8,7 +8,7 @@ Basic protocol flow description
1. Each user has a keypair. Signatures, public key and encodings are done according to the [Schnorr signatures standard for the curve `secp256k1`](https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki).
2. Users can publish `events` to any compatible relay by calling `POST /save_update` with `Content-Type: application/json` and a JSON object describing an event:
2. Users can publish `events` to any compatible relay by calling `POST /save_event` with `Content-Type: application/json` and a JSON object describing an event:
```
{
@ -21,24 +21,43 @@ Basic protocol flow description
sig: <64-bytes signature of the sha256 hash of the serialized event data, which is the same as the "id" field>,
}
```
3. (The serialization function is strict and standardized, see source code for now)
4. Upon receiving a call like that, the relay MUST compute the `id`, check the signature over the `id` and store the event so it can serve it later to other clients.
5. There are 3 kinds of messages a user can publish (for now, these things are extensible and will be extended):
- `0`: `set_metadata`: the `content` is set to a stringified JSON object `{name: <string>, about: <string>, picture: <url, string>}` describing the user who created the event. A relay may delete past `set_metadata` events once it gets a new one for the same pubkey.
- `1`: `text_note`: the `content` is set to the text content of a note, anything the user wants to say.
- `2`: `recommend_server`: the `content` is set to the URL (e.g., `https://somerelay.com`) of a relay the event creator wants to recommend to its followers.
6. A relay MUST serve an SSE (Server-Sent Events) stream at the path `GET /listen_updates`. As querystring parameters it MUST expect `?session=<arbitrary id chosen by the client>` any number of `&key=<32-bytes hex-encoded pubkey>`.
7. Immediately upon receiving that request the relay MUST return the recent past events from all the received authors with the event type `history`, then it SHOULD keep the connection open and return new events from these same keys as they are received with the event type `happening`. The format of the messages is just the same event object above, as JSON:
6. A relay MUST serve an SSE (Server-Sent Events) stream at the path `GET /listen_events`. As querystring parameters it MUST expect `?session=<arbitrary id chosen by the client>`.
7. The relay MUST also expect the following calls:
- `POST /request_watch?session=<session id>` with body `{"keys": [<32-byte hex-encoded pubkey>, ...]}`
Upon receiving this, the relay SHOULD begin watching for new events being saved coming from the given pubkeys and return them in the SSE stream identified by the given _session id_. Events triggered by this call MUST have their SSE type set to `n`.
- `POST /request_unwatch?session=<session id>` with body `{"keys": [<32-byte hex-encoded pubkey>, ...]}`
Upon receiving this, the relay SHOULD stop notifying the given SSE stream for updates from the given pubkeys.
- `POST /request_feed?session=<session id>` with optional body `{"limit": 100}`
Upon receiving this, the relay MUST return in the same SSE stream previously opened identified by the given _session id_ recent past events from all the pubkeys it may be watching on behalf of the that SSE stream. Events triggered by this call MUST have their SSE type set to `p`.
- `POST /request_user?session=<session id>` with body `{"pubkey": ...}`
Upon receiving this, the relay MUST return in the same SSE stream previously opened identified by the given _session id_ recent past events from the specified user, including a `set_metadata` event if the relay has it. Events triggered by this call MUST have their SSE type set to `r`.
- `POST /request_note?session=<session id>` with body `{"id": ...}`
Same as above, but instead the relay returns the specified `text_note` and/or related notes it has (notes that reference it or notes that it references). Events triggered by this call MUST have their SSE type set to `r`.
### Format of the SSE event (example)
```
type: history
type: p
data: {"id": "000...", "pubkey": ... etc.}
```
8. The relay MUST also expect the following calls:
- `POST /request_user?session=<session id>` with body `{"pubkey": ...}`
Upon receiving this, the relay MUST return in the same SSE stream previously opened identified by the given _session id_ recent past updates from the specified user, including a `set_metadata` event.
- `POST /request_note?session=<session id>` with body `{"id": ...}`
Same as above, but instead the relay returns the specified `text_note` and/or related notes it has (notes that reference it or notes that it references).
9. That's it. Knowing that every relay supports this interface should be enough for a client to be written taking these things into account.
10. New optional features can be implemented by clients and relays, and if they are good they can be included in the main protocol -- which should be written in a series of optional/mandatory NIPs (Nostr Improvement Proposal).
### Serialization of the event
The serialization of the event and `sha256` hashing of the result is what produces the event id. Then that id (which will be a 32-byte string) is signed according to BIP-340 standards to produce the event signature.
For more information, see the source code implementation of the serialization function, as it will be simpler to understand than whatever I can write here.
### Implementation notes
What a basic client should do:
1. Initiate SSE sessions with every relay it knows;
2. Send `/request_watch` to each with all the keys the user "follows";
3. Send `/request_feed` afterwards so the user sees a wall of posts upon turning the app on;
4. Send `/request_watch` and `/request_unwatch` if the user follows or unfollows someone;
5. Receive all events from the single SSE stream and filter/organize them locally, taking into account the fact that they may come as duplicates, out of order etc.

View File

@ -2,10 +2,13 @@ package main
import (
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
)
@ -13,7 +16,7 @@ type ErrorResponse struct {
Error error `json:"error"`
}
func saveUpdate(w http.ResponseWriter, r *http.Request) {
func saveEvent(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
var evt Event
@ -87,6 +90,82 @@ func saveUpdate(w http.ResponseWriter, r *http.Request) {
notifyPubKeyEvent(evt.PubKey, &evt)
}
func requestFeed(w http.ResponseWriter, r *http.Request) {
es := grabNamedSession(r.URL.Query().Get("session"))
if es == nil {
w.WriteHeader(400)
return
}
var data struct {
Limit int `json:"limit"`
}
json.NewDecoder(r.Body).Decode(&data)
keys, ok := backwatchers[es]
if !ok {
return
}
inkeys := make([]string, 0, len(keys))
for _, key := range keys {
// to prevent sql attack here we will check if these keys are valid 32-byte hex
parsed, err := hex.DecodeString(key)
if err != nil || len(parsed) != 32 {
continue
}
inkeys = append(inkeys, fmt.Sprintf("'%x'", parsed))
}
var lastUpdates []Event
err := db.Select(&lastUpdates, `
SELECT *, (SELECT count(*) FROM event AS r WHERE r.ref = event.id) AS rel
FROM event
WHERE pubkey IN (`+strings.Join(inkeys, ",")+`)
ORDER BY created_at DESC
LIMIT 50
`)
if err != nil && err != sql.ErrNoRows {
w.WriteHeader(500)
log.Warn().Err(err).Interface("keys", keys).Msg("failed to fetch updates")
return
}
for _, evt := range lastUpdates {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "p", "")
}
}
func requestWatchKeys(w http.ResponseWriter, r *http.Request) {
es := grabNamedSession(r.URL.Query().Get("session"))
if es == nil {
w.WriteHeader(400)
return
}
var data struct {
Keys []string `json:"keys"`
}
json.NewDecoder(r.Body).Decode(&data)
watchPubKeys(data.Keys, es)
}
func requestUnwatchKeys(w http.ResponseWriter, r *http.Request) {
es := grabNamedSession(r.URL.Query().Get("session"))
if es == nil {
w.WriteHeader(400)
return
}
var data struct {
Keys []string `json:"keys"`
}
json.NewDecoder(r.Body).Decode(&data)
unwatchPubKeys(data.Keys, es)
}
func requestUser(w http.ResponseWriter, r *http.Request) {
es := grabNamedSession(r.URL.Query().Get("session"))
if es == nil {
@ -94,11 +173,12 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
return
}
var pubkey struct {
var data struct {
PubKey string `json:"pubkey"`
Limit int `json:"limit"`
}
json.NewDecoder(r.Body).Decode(&pubkey)
if pubkey.PubKey == "" {
json.NewDecoder(r.Body).Decode(&data)
if data.PubKey == "" {
w.WriteHeader(400)
return
}
@ -108,9 +188,9 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
if err := db.Get(&metadata, `
SELECT * FROM event
WHERE pubkey = $1 AND kind = 0
`, pubkey.PubKey); err == nil {
`, data.PubKey); err == nil {
jevent, _ := json.Marshal(metadata)
(*es).SendEventMessage(string(jevent), "requested", "")
(*es).SendEventMessage(string(jevent), "r", "")
}
}()
@ -121,10 +201,10 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
FROM event
WHERE pubkey = $1 AND kind != 0
ORDER BY created_at DESC LIMIT 30
`, pubkey.PubKey); err == nil {
`, data.PubKey); err == nil {
for _, evt := range lastUpdates {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "requested", "")
(*es).SendEventMessage(string(jevent), "r", "")
}
}
}()
@ -137,11 +217,12 @@ func requestNote(w http.ResponseWriter, r *http.Request) {
return
}
var id struct {
Id string `json:"id"`
var data struct {
Id string `json:"id"`
Limit int `json:"limit"`
}
json.NewDecoder(r.Body).Decode(&id)
if id.Id == "" {
json.NewDecoder(r.Body).Decode(&data)
if data.Id == "" {
w.WriteHeader(400)
return
}
@ -150,9 +231,9 @@ func requestNote(w http.ResponseWriter, r *http.Request) {
var evt Event
if err := db.Get(&evt, `
SELECT * FROM event WHERE id = $1
`, id.Id); err == nil {
`, data.Id); err == nil {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "requested", "")
(*es).SendEventMessage(string(jevent), "r", "")
}
if evt.Ref == "" {
@ -164,7 +245,7 @@ func requestNote(w http.ResponseWriter, r *http.Request) {
SELECT * FROM event WHERE id = $1
`, evt.Ref); err == nil {
jevent, _ := json.Marshal(ref)
(*es).SendEventMessage(string(jevent), "requested", "")
(*es).SendEventMessage(string(jevent), "r", "")
}
}()
@ -174,10 +255,10 @@ func requestNote(w http.ResponseWriter, r *http.Request) {
SELECT * FROM event WHERE ref = $1
-- UNION ALL
-- SELECT * FROM event WHERE ref IN (SELECT ref FROM event WHERE ref = $1)
`, id.Id); err == nil {
`, data.Id); err == nil {
for _, evt := range related {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "requested", "")
(*es).SendEventMessage(string(jevent), "r", "")
}
}
}()

View File

@ -1,12 +1,8 @@
package main
import (
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
@ -21,7 +17,7 @@ var watchers = make(map[string][]*eventsource.EventSource)
var backwatchers = make(map[*eventsource.EventSource][]string)
var wlock = sync.Mutex{}
func listenUpdates(w http.ResponseWriter, r *http.Request) {
func listenEvents(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
var es eventsource.EventSource
@ -47,9 +43,6 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
}
}
// will return past items then track changes from these keys:
keys, _ := r.URL.Query()["key"]
es = eventsource.New(
&eventsource.Settings{
Timeout: time.Second * 5,
@ -87,55 +80,57 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
}()
es.ServeHTTP(w, r)
// past events
inkeys := make([]string, 0, len(keys))
for _, key := range keys {
// to prevent sql attack here we will check if these keys are valid 32-byte hex
parsed, err := hex.DecodeString(key)
if err != nil || len(parsed) != 32 {
continue
}
inkeys = append(inkeys, fmt.Sprintf("'%x'", parsed))
}
var lastUpdates []Event
err := db.Select(&lastUpdates, `
SELECT *, (SELECT count(*) FROM event AS r WHERE r.ref = event.id) AS rel
FROM event
WHERE pubkey IN (`+strings.Join(inkeys, ",")+`)
ORDER BY created_at DESC
LIMIT 50
`)
if err != nil && err != sql.ErrNoRows {
w.WriteHeader(500)
log.Warn().Err(err).Interface("keys", keys).Msg("failed to fetch updates")
return
}
for _, evt := range lastUpdates {
jevent, _ := json.Marshal(evt)
es.SendEventMessage(string(jevent), "history", "")
}
// listen to new events
watchPubKeys(keys, &es)
}
func watchPubKeys(keys []string, es *eventsource.EventSource) {
wlock.Lock()
defer wlock.Unlock()
backwatchers[es] = keys
currentKeys, _ := backwatchers[es]
backwatchers[es] = append(currentKeys, keys...)
for _, key := range keys {
if arr, ok := watchers[key]; ok {
watchers[key] = append(arr, es)
if ess, ok := watchers[key]; ok {
watchers[key] = append(ess, es)
} else {
watchers[key] = []*eventsource.EventSource{es}
}
}
}
func unwatchPubKeys(excludedKeys []string, es *eventsource.EventSource) {
wlock.Lock()
defer wlock.Unlock()
for _, key := range excludedKeys {
if ess, ok := watchers[key]; ok {
newEss := make([]*eventsource.EventSource, len(ess)-1)
var i = 0
for _, existingEs := range ess {
if existingEs == es {
continue
}
newEss[i] = existingEs
i++
}
watchers[key] = newEss
}
}
currentKeys, _ := backwatchers[es]
newKeys := make([]string, 0, len(currentKeys))
for _, currentKey := range currentKeys {
if inArray(excludedKeys, currentKey) {
continue
}
newKeys = append(newKeys, currentKey)
}
backwatchers[es] = newKeys
}
func removeFromWatchers(es *eventsource.EventSource) {
wlock.Lock()
defer wlock.Unlock()
@ -165,7 +160,7 @@ func notifyPubKeyEvent(key string, evt *Event) {
if ok {
for _, es := range arr {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "happening", "")
(*es).SendEventMessage(string(jevent), "n", "")
}
}
}

View File

@ -37,12 +37,18 @@ func main() {
log.Fatal().Err(err).Msg("failed to open database")
}
router.Path("/query_users").Methods("GET").HandlerFunc(queryUsers)
router.Path("/listen_updates").Methods("GET").HandlerFunc(listenUpdates)
router.Path("/save_update").Methods("POST").HandlerFunc(saveUpdate)
// NIP01
router.Path("/listen_events").Methods("GET").HandlerFunc(listenEvents)
router.Path("/save_event").Methods("POST").HandlerFunc(saveEvent)
router.Path("/request_feed").Methods("POST").HandlerFunc(requestFeed)
router.Path("/request_watch").Methods("POST").HandlerFunc(requestWatchKeys)
router.Path("/request_unwatch").Methods("POST").HandlerFunc(requestUnwatchKeys)
router.Path("/request_user").Methods("POST").HandlerFunc(requestUser)
router.Path("/request_note").Methods("POST").HandlerFunc(requestNote)
// extra?
router.Path("/query_users").Methods("GET").HandlerFunc(queryUsers)
srv := &http.Server{
Handler: cors.Default().Handler(router),
Addr: s.Host + ":" + s.Port,

10
relay/util.go Normal file
View File

@ -0,0 +1,10 @@
package main
func inArray(haystack []string, needle string) bool {
for _, item := range haystack {
if item == needle {
return true
}
}
return false
}

View File

@ -17,9 +17,8 @@ export default {
}
state.petnames = petnames
},
gotEventSource(state, session) {
gotEventSource(state) {
state.haveEventSource.resolve()
state.session = session
},
loadedRelays(state, relays) {
state.relays = relays

View File

@ -29,7 +29,7 @@ export default createStore({
return {
haveEventSource,
session: null,
session: new Date().getTime() + '' + Math.round(Math.random() * 100000),
relays,
key: makeRandom32().toString('hex'),
following: [],
@ -125,9 +125,16 @@ function listener(store) {
switch (mutation.type) {
case 'setInit':
case 'loadedRelays':
case 'follow':
case 'unfollow':
restartListeners()
break
case 'follow':
let watch = watchKey.bind(null, mutation.payload)
store.getters.readServers.forEach(watch)
break
case 'unfollow':
let unwatch = unwatchKey.bind(null, mutation.payload)
store.getters.readServers.forEach(unwatch)
break
}
})
@ -139,15 +146,28 @@ function listener(store) {
store.getters.readServers.forEach(listenToRelay)
}
function listenToRelay(host) {
function unwatchKey(key, host) {
window.fetch(host + '/request_unwatch?session=' + store.state.session, {
method: 'POST',
headers: {'content-type': 'application/json'},
body: JSON.stringify({keys: [key]})
})
}
function watchKey(key, host) {
window.fetch(host + '/request_watch?session=' + store.state.session, {
method: 'POST',
headers: {'content-type': 'application/json'},
body: JSON.stringify({keys: [key]})
})
}
async function listenToRelay(host) {
if (store.state.following.length === 0) return
let session = new Date().getTime() + '' + Math.round(Math.random() * 100000)
if (host.length && host[host.length - 1] === '/') host = host.slice(0, -1)
let qs = store.state.following.map(key => `key=${key}`).join('&')
let es = new EventSource(
host + '/listen_updates?' + qs + '&session=' + session
host + '/listen_updates?session=' + store.state.session
)
ess.set(host, es)
@ -156,13 +176,22 @@ function listener(store) {
es.close()
ess.delete(host)
}
es.onopen = () => {
store.commit('gotEventSource')
}
store.commit('gotEventSource', session)
// add initial keys
await window.fetch(host + '/request_watch?session=' + store.state.session, {
method: 'POST',
headers: {'content-type': 'application/json'},
body: JSON.stringify({keys: store.state.following})
})
// handle anything
es.addEventListener('notice', e => {
console.log(e.data)
})
;['history', 'happening', 'requested'].forEach(context => {
;['p', 'n', 'r'].forEach(context => {
es.addEventListener(context, e => {
store.dispatch('receivedEvent', {
event: JSON.parse(e.data),
@ -170,5 +199,12 @@ function listener(store) {
})
})
})
// request initial feed
await window.fetch(host + '/request_feed?session=' + store.state.session, {
method: 'POST',
headers: {'content-type': 'application/json'},
body: JSON.stringify({limit: 100})
})
}
}