fix: bug in server es sessions management.
This commit is contained in:
parent
415126c50f
commit
71094e00fe
|
@ -17,6 +17,10 @@ var sessions = make(map[string]*eventsource.EventSource)
|
|||
var backsessions = make(map[*eventsource.EventSource]string)
|
||||
var slock = sync.Mutex{}
|
||||
|
||||
var watchers = make(map[string][]*eventsource.EventSource)
|
||||
var backwatchers = make(map[*eventsource.EventSource][]string)
|
||||
var wlock = sync.Mutex{}
|
||||
|
||||
func listenUpdates(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("content-type", "application/json")
|
||||
var es eventsource.EventSource
|
||||
|
@ -36,9 +40,9 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
|
|||
// proceed, but save the es object at the end
|
||||
defer func() {
|
||||
slock.Lock()
|
||||
defer slock.Unlock()
|
||||
sessions[session] = &es
|
||||
backsessions[&es] = session
|
||||
slock.Unlock()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
@ -117,15 +121,11 @@ func listenUpdates(w http.ResponseWriter, r *http.Request) {
|
|||
watchPubKeys(keys, &es)
|
||||
}
|
||||
|
||||
var watchers = make(map[string][]*eventsource.EventSource)
|
||||
var index = make(map[*eventsource.EventSource][]string)
|
||||
var wlock = sync.Mutex{}
|
||||
|
||||
func watchPubKeys(keys []string, es *eventsource.EventSource) {
|
||||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
|
||||
index[es] = keys
|
||||
backwatchers[es] = keys
|
||||
|
||||
for _, key := range keys {
|
||||
if arr, ok := watchers[key]; ok {
|
||||
|
@ -140,7 +140,7 @@ func removeFromWatchers(es *eventsource.EventSource) {
|
|||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
|
||||
for _, key := range index[es] {
|
||||
for _, key := range backwatchers[es] {
|
||||
if arr, ok := watchers[key]; ok {
|
||||
newarr := make([]*eventsource.EventSource, len(arr)-1)
|
||||
i := 0
|
||||
|
@ -151,16 +151,18 @@ func removeFromWatchers(es *eventsource.EventSource) {
|
|||
newarr[i] = oldes
|
||||
i++
|
||||
}
|
||||
watchers[key] = newarr
|
||||
}
|
||||
}
|
||||
delete(index, es)
|
||||
delete(backwatchers, es)
|
||||
}
|
||||
|
||||
func notifyPubKeyEvent(key string, evt *Event) {
|
||||
wlock.Lock()
|
||||
defer wlock.Unlock()
|
||||
arr, ok := watchers[key]
|
||||
wlock.Unlock()
|
||||
|
||||
if arr, ok := watchers[key]; ok {
|
||||
if ok {
|
||||
for _, es := range arr {
|
||||
jevent, _ := json.Marshal(evt)
|
||||
(*es).SendEventMessage(string(jevent), "happening", "")
|
||||
|
@ -170,16 +172,15 @@ func notifyPubKeyEvent(key string, evt *Event) {
|
|||
|
||||
func grabNamedSession(name string) *eventsource.EventSource {
|
||||
slock.Lock()
|
||||
defer slock.Unlock()
|
||||
es, _ := sessions[name]
|
||||
slock.Unlock()
|
||||
return es
|
||||
}
|
||||
|
||||
func removeFromSessions(es *eventsource.EventSource) {
|
||||
slock.Lock()
|
||||
defer slock.Unlock()
|
||||
|
||||
session := backsessions[es]
|
||||
delete(backsessions, es)
|
||||
delete(sessions, session)
|
||||
slock.Unlock()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue