relay: new serialization and websocket scheme.
This commit is contained in:
parent
afbebf3fa6
commit
56eff922ad
123
relay/event.go
123
relay/event.go
|
@ -1,9 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/fiatjaf/schnorr"
|
||||
|
@ -27,99 +26,63 @@ type Event struct {
|
|||
Tags []Tag `db:"tag" json:"tags"`
|
||||
Content string `db:"content" json:"content"`
|
||||
Sig string `db:"sig" json:"sig"`
|
||||
|
||||
// extra
|
||||
Rel int `db:"rel" json:"rel,omitempty"`
|
||||
}
|
||||
|
||||
type Tag interface {
|
||||
Identifier() byte
|
||||
Serialized() []byte
|
||||
}
|
||||
|
||||
type EventTag struct {
|
||||
EventID string
|
||||
RecommendedRelay string
|
||||
}
|
||||
|
||||
func (et EventTag) Identifier() byte { return 'e' }
|
||||
func (et EventTag) Serialized() []byte {
|
||||
b := bytes.Buffer{}
|
||||
b.WriteByte(et.Identifier())
|
||||
b.Write([]byte(et.EventID))
|
||||
b.Write([]byte(et.RecommendedRelay))
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
type PubKeyTag struct {
|
||||
PubKey string
|
||||
RecommendedRelay string
|
||||
}
|
||||
|
||||
func (et PubKeyTag) Identifier() byte { return 'p' }
|
||||
func (et PubKeyTag) Serialized() []byte {
|
||||
b := bytes.Buffer{}
|
||||
b.WriteByte(et.Identifier())
|
||||
b.Write([]byte(et.PubKey))
|
||||
b.Write([]byte(et.RecommendedRelay))
|
||||
return b.Bytes()
|
||||
}
|
||||
type Tag []interface{}
|
||||
|
||||
// Serialize outputs a byte array that can be hashed/signed to identify/authenticate
|
||||
// this event. An error will be returned if anything is malformed.
|
||||
func (evt *Event) Serialize() ([]byte, error) {
|
||||
b := bytes.Buffer{}
|
||||
func (evt *Event) Serialize() []byte {
|
||||
// the serialization process is just putting everything into a JSON array
|
||||
// so the order is kept
|
||||
arr := make([]interface{}, 6)
|
||||
|
||||
// version: 0 (only because if more fields are added later the id will not match)
|
||||
b.Write([]byte{0})
|
||||
// version: 0
|
||||
arr[0] = 0
|
||||
|
||||
// pubkey
|
||||
pubkeyb, err := hex.DecodeString(evt.PubKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pubkeyb) != 32 {
|
||||
return nil, fmt.Errorf("pubkey must be 32 bytes, not %d", len(pubkeyb))
|
||||
}
|
||||
if _, err = b.Write(pubkeyb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
arr[1] = evt.PubKey
|
||||
|
||||
// created_at
|
||||
var timeb [4]byte
|
||||
binary.BigEndian.PutUint32(timeb[:], evt.CreatedAt)
|
||||
if _, err := b.Write(timeb[:]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
arr[2] = int64(evt.CreatedAt)
|
||||
|
||||
// kind
|
||||
var kindb [1]byte
|
||||
kindb[0] = evt.Kind
|
||||
if _, err := b.Write(kindb[:]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
arr[3] = int64(evt.Kind)
|
||||
|
||||
// tags
|
||||
for _, tag := range evt.Tags {
|
||||
if _, err := b.Write(tag.Serialized()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
arr[4] = evt.Tags
|
||||
|
||||
// content
|
||||
if _, err = b.Write([]byte(evt.Content)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
arr[5] = evt.Content
|
||||
|
||||
return b.Bytes(), nil
|
||||
serialized, _ := json.Marshal(arr)
|
||||
return serialized
|
||||
}
|
||||
|
||||
// CheckSignature checks if the signature is valid for the id
|
||||
// (which is a hash of the serialized event content).
|
||||
// returns an error if the signature itself is invalid.
|
||||
func (evt Event) CheckSignature() (bool, error) {
|
||||
// validity of these is checked by Serialize(), which should be called first
|
||||
pubkey, _ := hex.DecodeString(evt.PubKey)
|
||||
// read and check pubkey
|
||||
pubkeyb, err := hex.DecodeString(evt.PubKey)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pubkeyb) != 32 {
|
||||
return false, fmt.Errorf("pubkey must be 32 bytes, not %d", len(pubkeyb))
|
||||
}
|
||||
|
||||
// check tags
|
||||
for _, tag := range evt.Tags {
|
||||
for _, item := range tag {
|
||||
switch item.(type) {
|
||||
case string, int64, float64, int, bool:
|
||||
// fine
|
||||
default:
|
||||
// not fine
|
||||
return false, fmt.Errorf("tag contains an invalid value %v", item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
hash, _ := hex.DecodeString(evt.ID)
|
||||
if len(hash) != 32 {
|
||||
|
@ -136,17 +99,13 @@ func (evt Event) CheckSignature() (bool, error) {
|
|||
}
|
||||
|
||||
var p [32]byte
|
||||
for i, b := range pubkey {
|
||||
p[i] = b
|
||||
}
|
||||
copy(p[:], pubkeyb)
|
||||
|
||||
var h [32]byte
|
||||
for i, b := range hash {
|
||||
h[i] = b
|
||||
}
|
||||
copy(h[:], hash)
|
||||
|
||||
var s [64]byte
|
||||
for i, b := range sig {
|
||||
s[i] = b
|
||||
}
|
||||
copy(s[:], sig)
|
||||
|
||||
return schnorr.Verify(p, h, s)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/fiatjaf/schnorr v0.2.1-hack
|
||||
github.com/go-sql-driver/mysql v1.5.0 // indirect
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/jmoiron/sqlx v1.2.0
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/lib/pq v1.8.0
|
||||
|
|
|
@ -24,6 +24,8 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
|
|||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
|
|
|
@ -11,46 +11,116 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const (
|
||||
// Time allowed to write a message to the peer.
|
||||
writeWait = 10 * time.Second
|
||||
|
||||
// Time allowed to read the next pong message from the peer.
|
||||
pongWait = 60 * time.Second
|
||||
|
||||
// Send pings to peer with this period. Must be less than pongWait.
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
|
||||
// Maximum message size allowed from peer.
|
||||
maxMessageSize = 512000
|
||||
)
|
||||
|
||||
var ratelimiter = rate.NewLimiter(rate.Every(time.Second*40), 2)
|
||||
|
||||
type ErrorResponse struct {
|
||||
Error error `json:"error"`
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
func saveEvent(w http.ResponseWriter, r *http.Request) {
|
||||
if !ratelimiter.Allow() {
|
||||
w.WriteHeader(503)
|
||||
func handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("failed to upgrade websocket")
|
||||
return
|
||||
}
|
||||
|
||||
r.Body = http.MaxBytesReader(w, r.Body, 50000)
|
||||
// reader
|
||||
go func() {
|
||||
defer func() {
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
w.Header().Set("content-type", "application/json")
|
||||
conn.SetReadLimit(maxMessageSize)
|
||||
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
conn.SetPongHandler(func(string) error {
|
||||
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(
|
||||
err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
log.Warn().Err(err).Msg("unexpected close error")
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
text := string(message)
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(text, "{"):
|
||||
// it's a new event
|
||||
err = saveEvent(message)
|
||||
case strings.HasPrefix(text, "sub-key:"):
|
||||
watchPubKey(strings.TrimSpace(text[8:]), conn)
|
||||
case strings.HasPrefix(text, "unsub-key:"):
|
||||
unwatchPubKey(strings.TrimSpace(text[10:]), conn)
|
||||
case strings.HasPrefix(text, "req-feed:"):
|
||||
err = requestFeed(message[len([]byte("req-feed:")):], conn)
|
||||
case strings.HasPrefix(text, "req-event:"):
|
||||
err = requestEvent(message[len([]byte("req-event")):], conn)
|
||||
case strings.HasPrefix(text, "req-key:"):
|
||||
err = requestKey(message[len([]byte("req-event")):], conn)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// TODO send an error message
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// writer
|
||||
go func() {
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
conn.Close()
|
||||
}()
|
||||
}()
|
||||
}
|
||||
|
||||
func saveEvent(body []byte) error {
|
||||
if !ratelimiter.Allow() {
|
||||
return errors.New("rate-limit")
|
||||
}
|
||||
|
||||
var evt Event
|
||||
err := json.NewDecoder(r.Body).Decode(&evt)
|
||||
err := json.Unmarshal(body, &evt)
|
||||
if err != nil {
|
||||
w.WriteHeader(400)
|
||||
log.Warn().Err(err).Msg("couldn't decode body")
|
||||
return
|
||||
return errors.New("failed to decode event")
|
||||
}
|
||||
|
||||
// disallow large contents
|
||||
if len(evt.Content) > 1000 {
|
||||
log.Warn().Err(err).Msg("event content too large")
|
||||
return
|
||||
return errors.New("event content too large")
|
||||
}
|
||||
|
||||
// check serialization
|
||||
serialized, err := evt.Serialize()
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("serialization error")
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
serialized := evt.Serialize()
|
||||
|
||||
// assign ID
|
||||
hash := sha256.Sum256(serialized)
|
||||
|
@ -59,14 +129,10 @@ func saveEvent(w http.ResponseWriter, r *http.Request) {
|
|||
// check signature (requires the ID to be set)
|
||||
if ok, err := evt.CheckSignature(); err != nil {
|
||||
log.Warn().Err(err).Msg("signature verification error")
|
||||
w.WriteHeader(400)
|
||||
json.NewEncoder(w).Encode(ErrorResponse{err})
|
||||
return
|
||||
return errors.New("signature verification error")
|
||||
} else if !ok {
|
||||
log.Warn().Err(err).Msg("signature invalid")
|
||||
w.WriteHeader(400)
|
||||
json.NewEncoder(w).Encode(ErrorResponse{errors.New("invalid signature")})
|
||||
return
|
||||
return errors.New("signature invalid")
|
||||
}
|
||||
|
||||
// react to different kinds of events
|
||||
|
@ -86,38 +152,31 @@ func saveEvent(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// insert
|
||||
tagsj, _ := json.Marshal(evt.Tags)
|
||||
_, err = db.Exec(`
|
||||
INSERT INTO event (id, pubkey, created_at, kind, ref, content, sig)
|
||||
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, evt.Ref, evt.Content, evt.Sig)
|
||||
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
|
||||
if err != nil {
|
||||
if strings.Index(err.Error(), "UNIQUE") != -1 {
|
||||
// already exists
|
||||
w.WriteHeader(200)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Warn().Err(err).Str("pubkey", evt.PubKey).Msg("failed to save")
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
return errors.New("failed to save event")
|
||||
}
|
||||
|
||||
w.WriteHeader(201)
|
||||
notifyPubKeyEvent(evt.PubKey, &evt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func requestFeed(w http.ResponseWriter, r *http.Request) {
|
||||
es := grabNamedSession(r.URL.Query().Get("session"))
|
||||
if es == nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
|
||||
func requestFeed(body []byte, conn *websocket.Conn) error {
|
||||
var data struct {
|
||||
Limit int `json:"limit"`
|
||||
Offset int `json:"offset"`
|
||||
}
|
||||
json.NewDecoder(r.Body).Decode(&data)
|
||||
json.Unmarshal(body, &data)
|
||||
|
||||
if data.Limit <= 0 || data.Limit > 100 {
|
||||
data.Limit = 50
|
||||
|
@ -125,17 +184,17 @@ func requestFeed(w http.ResponseWriter, r *http.Request) {
|
|||
if data.Offset < 0 {
|
||||
data.Offset = 0
|
||||
} else if data.Offset > 500 {
|
||||
return
|
||||
return errors.New("offset over 500")
|
||||
}
|
||||
|
||||
keys, ok := backwatchers[es]
|
||||
keys, ok := backwatchers[conn]
|
||||
if !ok {
|
||||
return
|
||||
return errors.New("not subscribed to anything")
|
||||
}
|
||||
|
||||
inkeys := make([]string, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
// to prevent sql attack here we will check if these keys are valid 32-byte hex
|
||||
// to prevent sql attack here we will check if these keys are valid 32byte hex
|
||||
parsed, err := hex.DecodeString(key)
|
||||
if err != nil || len(parsed) != 32 {
|
||||
continue
|
||||
|
@ -152,63 +211,30 @@ func requestFeed(w http.ResponseWriter, r *http.Request) {
|
|||
OFFSET $2
|
||||
`, data.Limit, data.Offset)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
w.WriteHeader(500)
|
||||
log.Warn().Err(err).Interface("keys", keys).Msg("failed to fetch updates")
|
||||
return
|
||||
log.Warn().Err(err).Interface("keys", keys).Msg("failed to fetch events")
|
||||
return errors.New("failed to fetch events")
|
||||
}
|
||||
|
||||
for _, evt := range lastUpdates {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "p", "")
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
evt,
|
||||
"p",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func requestWatchKeys(w http.ResponseWriter, r *http.Request) {
|
||||
es := grabNamedSession(r.URL.Query().Get("session"))
|
||||
if es == nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
|
||||
var data struct {
|
||||
Keys []string `json:"keys"`
|
||||
}
|
||||
json.NewDecoder(r.Body).Decode(&data)
|
||||
|
||||
watchPubKeys(data.Keys, es)
|
||||
}
|
||||
|
||||
func requestUnwatchKeys(w http.ResponseWriter, r *http.Request) {
|
||||
es := grabNamedSession(r.URL.Query().Get("session"))
|
||||
if es == nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
|
||||
var data struct {
|
||||
Keys []string `json:"keys"`
|
||||
}
|
||||
json.NewDecoder(r.Body).Decode(&data)
|
||||
|
||||
unwatchPubKeys(data.Keys, es)
|
||||
}
|
||||
|
||||
func requestUser(w http.ResponseWriter, r *http.Request) {
|
||||
es := grabNamedSession(r.URL.Query().Get("session"))
|
||||
if es == nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
|
||||
func requestKey(body []byte, conn *websocket.Conn) error {
|
||||
var data struct {
|
||||
PubKey string `json:"pubkey"`
|
||||
Limit int `json:"limit"`
|
||||
Offset int `json:"offset"`
|
||||
}
|
||||
json.NewDecoder(r.Body).Decode(&data)
|
||||
json.Unmarshal(body, &data)
|
||||
if data.PubKey == "" {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
return errors.New("invalid pubkey")
|
||||
}
|
||||
if data.Limit <= 0 || data.Limit > 100 {
|
||||
data.Limit = 30
|
||||
|
@ -216,7 +242,7 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
|
|||
if data.Offset < 0 {
|
||||
data.Offset = 0
|
||||
} else if data.Offset > 300 {
|
||||
return
|
||||
return errors.New("offset over 300")
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
@ -225,8 +251,11 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
|
|||
SELECT * FROM event
|
||||
WHERE pubkey = $1 AND kind = 0
|
||||
`, data.PubKey); err == nil {
|
||||
jevent, _ := json.Marshal(metadata)
|
||||
(*es).SendEventMessage(string(jevent), "r", "")
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
metadata,
|
||||
"r",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).
|
||||
Str("key", data.PubKey).
|
||||
|
@ -243,8 +272,11 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
|
|||
LIMIT $2 OFFSET $3
|
||||
`, data.PubKey, data.Limit, data.Offset); err == nil {
|
||||
for _, evt := range lastUpdates {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "r", "")
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
evt,
|
||||
"r",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
}
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).
|
||||
|
@ -252,23 +284,18 @@ func requestUser(w http.ResponseWriter, r *http.Request) {
|
|||
Msg("error fetching updates from requested user")
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func requestEvent(w http.ResponseWriter, r *http.Request) {
|
||||
es := grabNamedSession(r.URL.Query().Get("session"))
|
||||
if es == nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
|
||||
func requestEvent(body []byte, conn *websocket.Conn) error {
|
||||
var data struct {
|
||||
Id string `json:"id"`
|
||||
Limit int `json:"limit"`
|
||||
}
|
||||
json.NewDecoder(r.Body).Decode(&data)
|
||||
json.Unmarshal(body, &data)
|
||||
if data.Id == "" {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
return errors.New("no id provided")
|
||||
}
|
||||
if data.Limit > 100 || data.Limit <= 0 {
|
||||
data.Limit = 50
|
||||
|
@ -280,29 +307,31 @@ func requestEvent(w http.ResponseWriter, r *http.Request) {
|
|||
if err := db.Get(&evt, `
|
||||
SELECT * FROM event WHERE id = $1
|
||||
`, data.Id); err == nil {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "r", "")
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
evt,
|
||||
"r",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).
|
||||
Str("key", data.Id).
|
||||
Msg("error fetching a specific event")
|
||||
}
|
||||
|
||||
if evt.Ref == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// get referenced event
|
||||
var ref Event
|
||||
if err := db.Get(&ref, `
|
||||
SELECT * FROM event WHERE id = $1
|
||||
`, evt.Ref); err == nil {
|
||||
jevent, _ := json.Marshal(ref)
|
||||
(*es).SendEventMessage(string(jevent), "r", "")
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).
|
||||
Str("key", data.Id).Str("ref", evt.Ref).
|
||||
Msg("error fetching a referenced event")
|
||||
for _, tag := range evt.Tags {
|
||||
log.Print(tag)
|
||||
// get referenced event TODO
|
||||
// var ref Event
|
||||
// if err := db.Get(&ref, `
|
||||
// SELECT * FROM event WHERE id = $1
|
||||
// `, evt.Ref); err == nil {
|
||||
// jevent, _ := json.Marshal(ref)
|
||||
// (*es).SendEventMessage(string(jevent), "r", "")
|
||||
// } else if err != sql.ErrNoRows {
|
||||
// log.Warn().Err(err).
|
||||
// Str("key", data.Id).Str("ref", evt.Ref).
|
||||
// Msg("error fetching a referenced event")
|
||||
// }
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -315,8 +344,11 @@ func requestEvent(w http.ResponseWriter, r *http.Request) {
|
|||
LIMIT $2
|
||||
`, data.Id, data.Limit); err == nil {
|
||||
for _, evt := range related {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "r", "")
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
evt,
|
||||
"r",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
}
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Warn().Err(err).
|
||||
|
@ -324,4 +356,6 @@ func requestEvent(w http.ResponseWriter, r *http.Request) {
|
|||
Msg("error fetching events that reference requested event")
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -2,142 +2,67 @@ package main
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/antage/eventsource.v1"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var sessions = make(map[string]*eventsource.EventSource)
|
||||
var backsessions = make(map[*eventsource.EventSource]string)
|
||||
var slock = sync.Mutex{}
|
||||
|
||||
var watchers = make(map[string][]*eventsource.EventSource)
|
||||
var backwatchers = make(map[*eventsource.EventSource][]string)
|
||||
var watchers = make(map[string][]*websocket.Conn)
|
||||
var backwatchers = make(map[*websocket.Conn][]string)
|
||||
var wlock = sync.Mutex{}
|
||||
|
||||
func listenEvents(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("content-type", "application/json")
|
||||
var es eventsource.EventSource
|
||||
|
||||
session := r.URL.Query().Get("session")
|
||||
if session != "" {
|
||||
// if a session id was given, try to recover/save the es object
|
||||
slock.Lock()
|
||||
preves, ok := sessions[session]
|
||||
slock.Unlock()
|
||||
if ok {
|
||||
// end it here, just serve again the existing object
|
||||
es = *preves
|
||||
es.ServeHTTP(w, r)
|
||||
return
|
||||
} else {
|
||||
// proceed, but save the es object at the end
|
||||
defer func() {
|
||||
slock.Lock()
|
||||
sessions[session] = &es
|
||||
backsessions[&es] = session
|
||||
slock.Unlock()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
es = eventsource.New(
|
||||
&eventsource.Settings{
|
||||
Timeout: time.Second * 5,
|
||||
CloseOnTimeout: true,
|
||||
IdleTimeout: time.Minute * 5,
|
||||
Gzip: true,
|
||||
},
|
||||
func(r *http.Request) [][]byte {
|
||||
return [][]byte{
|
||||
[]byte("X-Accel-Buffering: no"),
|
||||
[]byte("Cache-Control: no-cache"),
|
||||
[]byte("Content-Type: text/event-stream"),
|
||||
[]byte("Connection: keep-alive"),
|
||||
[]byte("Access-Control-Allow-Origin: *"),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
es.SendRetryMessage(3 * time.Second)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(25 * time.Second)
|
||||
if es.ConsumersCount() == 0 {
|
||||
removeFromWatchers(&es)
|
||||
removeFromSessions(&es)
|
||||
es.Close()
|
||||
return
|
||||
}
|
||||
es.SendEventMessage("", "keepalive", "")
|
||||
}
|
||||
}()
|
||||
|
||||
es.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func watchPubKeys(keys []string, es *eventsource.EventSource) {
|
||||
func watchPubKey(key string, ws *websocket.Conn) {
|
||||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
|
||||
currentKeys, _ := backwatchers[es]
|
||||
backwatchers[es] = append(currentKeys, keys...)
|
||||
currentKeys, _ := backwatchers[ws]
|
||||
backwatchers[ws] = append(currentKeys, key)
|
||||
|
||||
for _, key := range keys {
|
||||
if ess, ok := watchers[key]; ok {
|
||||
watchers[key] = append(ess, es)
|
||||
} else {
|
||||
watchers[key] = []*eventsource.EventSource{es}
|
||||
}
|
||||
if wss, ok := watchers[key]; ok {
|
||||
watchers[key] = append(wss, ws)
|
||||
} else {
|
||||
watchers[key] = []*websocket.Conn{ws}
|
||||
}
|
||||
}
|
||||
|
||||
func unwatchPubKeys(excludedKeys []string, es *eventsource.EventSource) {
|
||||
func unwatchPubKey(excludedKey string, ws *websocket.Conn) {
|
||||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
|
||||
for _, key := range excludedKeys {
|
||||
if ess, ok := watchers[key]; ok {
|
||||
newEss := make([]*eventsource.EventSource, len(ess)-1)
|
||||
if wss, ok := watchers[excludedKey]; ok {
|
||||
newWss := make([]*websocket.Conn, len(wss)-1)
|
||||
|
||||
var i = 0
|
||||
for _, existingEs := range ess {
|
||||
if existingEs == es {
|
||||
continue
|
||||
}
|
||||
newEss[i] = existingEs
|
||||
i++
|
||||
var i = 0
|
||||
for _, existingWs := range wss {
|
||||
if existingWs == ws {
|
||||
continue
|
||||
}
|
||||
|
||||
watchers[key] = newEss
|
||||
newWss[i] = existingWs
|
||||
i++
|
||||
}
|
||||
|
||||
watchers[excludedKey] = newWss
|
||||
}
|
||||
|
||||
currentKeys, _ := backwatchers[es]
|
||||
currentKeys, _ := backwatchers[ws]
|
||||
newKeys := make([]string, 0, len(currentKeys))
|
||||
for _, currentKey := range currentKeys {
|
||||
if inArray(excludedKeys, currentKey) {
|
||||
if excludedKey == currentKey {
|
||||
continue
|
||||
}
|
||||
newKeys = append(newKeys, currentKey)
|
||||
}
|
||||
|
||||
backwatchers[es] = newKeys
|
||||
backwatchers[ws] = newKeys
|
||||
}
|
||||
|
||||
func removeFromWatchers(es *eventsource.EventSource) {
|
||||
func removeFromWatchers(es *websocket.Conn) {
|
||||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
|
||||
for _, key := range backwatchers[es] {
|
||||
if arr, ok := watchers[key]; ok {
|
||||
newarr := make([]*eventsource.EventSource, len(arr)-1)
|
||||
newarr := make([]*websocket.Conn, len(arr)-1)
|
||||
i := 0
|
||||
for _, oldes := range arr {
|
||||
if oldes == es {
|
||||
|
@ -158,24 +83,12 @@ func notifyPubKeyEvent(key string, evt *Event) {
|
|||
wlock.Unlock()
|
||||
|
||||
if ok {
|
||||
for _, es := range arr {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "n", "")
|
||||
for _, conn := range arr {
|
||||
jevent, _ := json.Marshal([]interface{}{
|
||||
evt,
|
||||
"n",
|
||||
})
|
||||
conn.WriteMessage(websocket.TextMessage, jevent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func grabNamedSession(name string) *eventsource.EventSource {
|
||||
slock.Lock()
|
||||
es, _ := sessions[name]
|
||||
slock.Unlock()
|
||||
return es
|
||||
}
|
||||
|
||||
func removeFromSessions(es *eventsource.EventSource) {
|
||||
slock.Lock()
|
||||
session := backsessions[es]
|
||||
delete(backsessions, es)
|
||||
delete(sessions, session)
|
||||
slock.Unlock()
|
||||
}
|
||||
|
|
|
@ -38,16 +38,7 @@ func main() {
|
|||
}
|
||||
|
||||
// NIP01
|
||||
router.Path("/listen_events").Methods("GET").HandlerFunc(listenEvents)
|
||||
router.Path("/save_event").Methods("POST").HandlerFunc(saveEvent)
|
||||
router.Path("/request_feed").Methods("POST").HandlerFunc(requestFeed)
|
||||
router.Path("/request_watch").Methods("POST").HandlerFunc(requestWatchKeys)
|
||||
router.Path("/request_unwatch").Methods("POST").HandlerFunc(requestUnwatchKeys)
|
||||
router.Path("/request_user").Methods("POST").HandlerFunc(requestUser)
|
||||
router.Path("/request_event").Methods("POST").HandlerFunc(requestEvent)
|
||||
|
||||
// extra?
|
||||
router.Path("/query_users").Methods("GET").HandlerFunc(queryUsers)
|
||||
router.Path("/ws").Methods("GET").HandlerFunc(handleWebsocket)
|
||||
|
||||
srv := &http.Server{
|
||||
Handler: cors.Default().Handler(router),
|
||||
|
|
|
@ -19,14 +19,13 @@ CREATE TABLE event (
|
|||
pubkey text NOT NULL,
|
||||
created_at integer NOT NULL,
|
||||
kind integer NOT NULL,
|
||||
ref text NOT NULL,
|
||||
tags jsonb NOT NULL,
|
||||
content text NOT NULL,
|
||||
sig text NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX ididx ON event (id);
|
||||
CREATE INDEX pubkeytimeidx ON event (pubkey, created_at);
|
||||
CREATE INDEX idxref ON event (ref);
|
||||
`)
|
||||
log.Print(err)
|
||||
return db, nil
|
||||
|
|
|
@ -19,14 +19,13 @@ CREATE TABLE event (
|
|||
pubkey text NOT NULL,
|
||||
created_at integer NOT NULL,
|
||||
kind integer NOT NULL,
|
||||
ref text NOT NULL,
|
||||
tags text NOT NULL,
|
||||
content text NOT NULL,
|
||||
sig text NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX ididx ON event (id);
|
||||
CREATE INDEX pubkeytimeidx ON event (pubkey, created_at);
|
||||
CREATE INDEX idxref ON event (ref);
|
||||
`)
|
||||
return db, nil
|
||||
}
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
package main
|
||||
|
||||
func inArray(haystack []string, needle string) bool {
|
||||
for _, item := range haystack {
|
||||
if item == needle {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
Loading…
Reference in New Issue