real-time event notifications for listeners.

This commit is contained in:
fiatjaf 2020-11-13 22:59:14 -03:00
parent 3e551058d7
commit 1b372be32d
6 changed files with 80 additions and 15 deletions

View File

@ -19,7 +19,7 @@ const (
type Event struct {
ID string `db:"id" json:"id"` // it's the hash of the serialized event
Pubkey string `db:"pubkey" json:"pubkey"`
PubKey string `db:"pubkey" json:"pubkey"`
CreatedAt uint32 `db:"created_at" json:"created_at"`
Kind uint8 `db:"kind" json:"kind"`
@ -38,7 +38,7 @@ func (evt *Event) Serialize() ([]byte, error) {
b.Write([]byte{0})
// pubkey
pubkeyb, err := hex.DecodeString(evt.Pubkey)
pubkeyb, err := hex.DecodeString(evt.PubKey)
if err != nil {
return nil, err
}
@ -46,7 +46,7 @@ func (evt *Event) Serialize() ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("error parsing pubkey: %w", err)
}
if evt.Pubkey != hex.EncodeToString(pubkey.SerializeCompressed()) {
if evt.PubKey != hex.EncodeToString(pubkey.SerializeCompressed()) {
return nil, fmt.Errorf("pubkey is not serialized in compressed format")
}
if _, err = b.Write(pubkeyb); err != nil {
@ -94,7 +94,7 @@ func (evt *Event) Serialize() ([]byte, error) {
// returns an error if the signature itself is invalid.
func (evt Event) CheckSignature() (bool, error) {
// validity of these is checked by Serialize()
pubkeyb, _ := hex.DecodeString(evt.Pubkey)
pubkeyb, _ := hex.DecodeString(evt.PubKey)
pubkey, _ := btcec.ParsePubKey(pubkeyb, btcec.S256())
bsig, err := hex.DecodeString(evt.Sig)

View File

@ -8,7 +8,6 @@ require (
github.com/gorilla/mux v1.8.0
github.com/jmoiron/sqlx v1.2.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/kr/pretty v0.2.1
github.com/lib/pq v1.8.0
github.com/mattn/go-sqlite3 v1.14.4
github.com/rs/cors v1.7.0

View File

@ -33,11 +33,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

View File

@ -45,7 +45,12 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
keys, _ := r.URL.Query()["key"]
es := eventsource.New(
eventsource.DefaultSettings(),
&eventsource.Settings{
Timeout: time.Second * 5,
CloseOnTimeout: true,
IdleTimeout: time.Minute * 5,
Gzip: true,
},
func(r *http.Request) [][]byte {
return [][]byte{
[]byte("X-Accel-Buffering: no"),
@ -65,6 +70,11 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
go func() {
for {
time.Sleep(25 * time.Second)
if es.ConsumersCount() == 0 {
removeFromWatchers(&es)
es.Close()
return
}
es.SendEventMessage("", "keepalive", "")
}
}()
@ -101,7 +111,7 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
}
// listen to new events
watchPubKeys(keys, &es)
}
func saveUpdate(w http.ResponseWriter, r *http.Request) {
@ -154,12 +164,14 @@ func saveUpdate(w http.ResponseWriter, r *http.Request) {
_, err = db.Exec(`
INSERT INTO event (id, pubkey, created_at, kind, ref, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, evt.ID, evt.Pubkey, evt.CreatedAt, evt.Kind, evt.Ref, evt.Content, evt.Sig)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, evt.Ref, evt.Content, evt.Sig)
if err != nil {
log.Warn().Err(err).Str("pubkey", evt.Pubkey).Msg("failed to save")
log.Warn().Err(err).Str("pubkey", evt.PubKey).Msg("failed to save")
w.WriteHeader(500)
return
}
w.WriteHeader(201)
notifyPubKeyEvent(evt.PubKey, &evt)
}

59
relay/watchers.go Normal file
View File

@ -0,0 +1,59 @@
package main
import (
"encoding/json"
"sync"
"gopkg.in/antage/eventsource.v1"
)
var watchers = make(map[string][]*eventsource.EventSource)
var index = make(map[*eventsource.EventSource][]string)
var wlock = sync.Mutex{}
func watchPubKeys(keys []string, es *eventsource.EventSource) {
wlock.Lock()
defer wlock.Unlock()
index[es] = keys
for _, key := range keys {
if arr, ok := watchers[key]; ok {
watchers[key] = append(arr, es)
} else {
watchers[key] = []*eventsource.EventSource{es}
}
}
}
func removeFromWatchers(es *eventsource.EventSource) {
wlock.Lock()
defer wlock.Unlock()
for _, key := range index[es] {
if arr, ok := watchers[key]; ok {
newarr := make([]*eventsource.EventSource, len(arr)-1)
i := 0
for _, oldes := range arr {
if oldes == es {
continue
}
newarr[i] = oldes
i++
}
}
}
delete(index, es)
}
func notifyPubKeyEvent(key string, evt *Event) {
wlock.Lock()
defer wlock.Unlock()
if arr, ok := watchers[key]; ok {
for _, es := range arr {
jevent, _ := json.Marshal(evt)
(*es).SendEventMessage(string(jevent), "event", "")
}
}
}

View File

@ -1,2 +1,2 @@
static/bundle.js: $(shell ls *.js)
static/bundle.js: $(shell find src -name '*.js')
./node_modules/.bin/rollup -c rollup.config.js