Skip to content

Commit

Permalink
Updated structure for aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
ihlar committed May 6, 2019
1 parent a88bb20 commit 1f57719
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 297 deletions.
31 changes: 31 additions & 0 deletions go/aggregator/internal/aggregate/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package aggregate

const nObservers = 2 // TODO: Make proper config

type Observer struct {
LeftRtt uint
RightRtt uint
FullRtt uint
}

func (o* Observer) addSample(event Event) {
if event.Left_rtt != 0 {
o.LeftRtt = ewma(o.LeftRtt, event.Left_rtt)
}
if event.Right_rtt != 0 {
o.RightRtt = ewma(o.RightRtt, event.Right_rtt)
}
if event.Full_rtt_initiator != 0 {
o.FullRtt = ewma(o.FullRtt, event.Full_rtt_initiator)
}
// if event.Full_rtt_responder != 0 {
// o.FullRtt = ewma(o.FullRtt, event.Full_rtt_responder)
// }
}

func ewma(a uint, s uint) uint {
if a == 0 {
return s
}
return s >> 4 + (a*15) >> 4
}
70 changes: 70 additions & 0 deletions go/aggregator/internal/aggregate/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package aggregate

import (
"strconv"

"github.com/EricssonResearch/spindump/go/aggregator/internal/format"
)

type Session struct {
ClientId string
ServerId string
Type string

observers [nObservers]Observer
outputFormat format.C3Sequence
cnt int
}

func NewSession(e Event) *Session {
return &Session{
Type: e.Type,
outputFormat: make(format.C3Sequence),
cnt: 0,
}
}

type SessionId struct {
Id string
Type string
}

func (s* Session) OutputFormat() format.C3Sequence {
left := s.observers[0].LeftRtt
full := s.observers[0].FullRtt
out := []format.LabeledRtt{format.LabeledRtt{Label: "C-0", Rtt: left}}

for i := 1; i < len(s.observers); i++ {
rtt := s.observers[i].LeftRtt - s.observers[i-1].LeftRtt
label := strconv.Itoa(i-1) + "-" + strconv.Itoa(i)
out = append(out, format.LabeledRtt{Label: label, Rtt: rtt})
}
right := s.observers[len(s.observers)-1].RightRtt
out = append(out, format.LabeledRtt{Label: strconv.Itoa(len(s.observers)-1) + "-S", Rtt: right})
out = append(out, format.LabeledRtt{Label: "Full", Rtt: full})

s.outputFormat.Add(out)

return s.outputFormat
}

func (s* Session) addSample(event Event, obsid string) {
id, err := strconv.Atoi(obsid)
if err != nil {
panic(err)
}
s.observers[id].addSample(event)

s.cnt++
if s.cnt > 63 {
s.OutputFormat()
s.cnt = 0
}
}

// Let's do smart things
func (s *Session) NewEvent(ev Event, obs string) {
if ev.Event == "measurement" {
s.addSample(ev, obs)
}
}
20 changes: 20 additions & 0 deletions go/aggregator/internal/aggregate/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package aggregate

// representation of spindump json event
type Event struct {
Type string
Event string
Addrs []string
Session string
Ts string
Who string
Left_rtt uint
Right_rtt uint
Full_rtt_initiator uint
Full_rtt_responder uint
ECT0 uint
ECT1 uint
CE uint
Packets uint
Bytes uint
}
19 changes: 19 additions & 0 deletions go/aggregator/internal/format/c3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package format

type LabeledRtt struct {
Label string
Rtt uint
}

type C3Sequence map[string][]uint

func (s* C3Sequence) Add (lrs []LabeledRtt) {
for _, lr := range lrs {
_, ok := (*s)[lr.Label]
if !ok {
(*s)[lr.Label] = []uint{lr.Rtt}
} else {
(*s)[lr.Label] = append((*s)[lr.Label], lr.Rtt)
}
}
}
102 changes: 102 additions & 0 deletions go/aggregator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main
import (
"encoding/json"
"net/http"
"io/ioutil"
"log"
"flag"
"time"
"os"
"context"
"os/signal"

"github.com/gorilla/mux"

"github.com/EricssonResearch/spindump/go/aggregator/internal/aggregate"
)

var sessions map[string]*aggregate.Session

func getSessions(w http.ResponseWriter, _ *http.Request) {
var sids []aggregate.SessionId
for id, sess := range sessions {
sids = append(sids, aggregate.SessionId{Id: id, Type: sess.Type})
}
json.NewEncoder(w).Encode(sids)
}

func getSession(w http.ResponseWriter, r *http.Request) {
var sessId = mux.Vars(r)["session"]
sess, _ := sessions[sessId]
json.NewEncoder(w).Encode(sess.OutputFormat())
}

func addEvent(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}

var sender = mux.Vars(r)["id"]

var event aggregate.Event
err = json.Unmarshal(body, &event)
if err != nil {
panic(err)
}

sess, ok := sessions[event.Session]
if !ok {
sess = aggregate.NewSession(event)
log.Printf("New session %s", event.Session)
sessions[event.Session] = sess
}
sess.NewEvent(event, sender)
}

func commonHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Origin","*")
next.ServeHTTP(w, r)
})
}

// todo: move data store and functionality to appropriate package and files
func main() {

sessions = make(map[string]*aggregate.Session)

sAddr := flag.String("bind", "0.0.0.0:5040", "Server address")
flag.Parse()

r := mux.NewRouter()
r.HandleFunc("/data/{id:[0-9]+}", addEvent).Methods("POST")
r.HandleFunc("/demo", getSessions).Methods("GET")
r.HandleFunc("/demo/{session}", getSession).Methods("GET")
r.Use(commonHeaders)

srv := &http.Server{
Addr: *sAddr,
WriteTimeout: time.Second * 1,
ReadTimeout: time.Second * 10,
Handler: r,
}

go func() {
if err := srv.ListenAndServe(); err != nil {
log.Println(err)
}
}()

c := make(chan os.Signal, 1)
// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C)
signal.Notify(c, os.Interrupt)

<-c

ctx, cancel := context.WithTimeout(context.Background(), time.Second * 1)
defer cancel()
srv.Shutdown(ctx)
log.Println("shutting down")
os.Exit(0)
}
Loading

0 comments on commit 1f57719

Please sign in to comment.