From d097aacdb2677ed585748d503b2cecdbd2702e00 Mon Sep 17 00:00:00 2001 From: "alon.kashtan" Date: Mon, 13 Apr 2020 16:16:53 +0300 Subject: [PATCH 1/9] Add metrics collection and endpoint to Toxiproxy --- .dockerignore | 1 + Makefile | 2 +- README.md | 80 ++++++++++++++++++++++ api.go | 47 +++++++++++++ cmd/toxiproxy.go | 7 ++ link.go | 38 +++++++++-- metrics/metrics.go | 140 ++++++++++++++++++++++++++++++++++++++ proxy.go | 4 ++ toxic_collection.go | 17 ++++- toxics/limit_data_test.go | 8 +-- toxics/noop.go | 17 ++++- toxics/slicer_test.go | 2 +- toxics/toxic.go | 11 ++- 13 files changed, 355 insertions(+), 19 deletions(-) create mode 100644 .dockerignore create mode 100644 metrics/metrics.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..4032ec6b --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.git/ \ No newline at end of file diff --git a/Makefile b/Makefile index 381f3ccd..2805a819 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ $(DEB): tmp/build/$(SERVER_NAME)-linux-amd64 tmp/build/$(CLI_NAME)-linux-amd64 $(word 2,$^)=/usr/bin/$(CLI_NAME) \ ./share/toxiproxy.conf=/etc/init/toxiproxy.conf -docker: +docker: linux docker build --tag="shopify/toxiproxy:git" . docker-release: linux diff --git a/README.md b/README.md index fc7843f3..08cd013a 100644 --- a/README.md +++ b/README.md @@ -466,6 +466,8 @@ All endpoints are JSON. - **POST /proxies/{proxy}/toxics/{toxic}** - Update an active toxic - **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic - **POST /reset** - Enable all proxies and remove all active toxics + - **GET /metrics** - Get metrics information (global) + - **GET /events** - Returns all available events history, with optional token (see bellow) - **GET /version** - Returns the server version number #### Populating Proxies @@ -478,6 +480,84 @@ A `/populate` call can be included for example at application start to ensure al exist. It is safe to make this call several times, since proxies will be untouched as long as their fields are consistent with the new data. +#### Metrics and event history + +Toxiproxy can keep metrics and record of recent events in order to enable easy visualization of real +time traffic, and enabling automated tests to verify the behaviour of the networking. This feature +makes Toxiproxy useful also for scenarios that do not involve toxics. Please note that since Toxiproxy +supports any kind of tcp stream, it does not count requests (which exists only in some protocols) +but tcp packets. + +The **metrics** endpoint gives global information about number of packets that passed through a specific +proxy. For example, we have two proxies, `Main DB` and `Distributed Cahce`, so a call to the +metrics endpoint will yield a response that looks like this: +```json +{ + "Distributed Cache": 103, + "Main DB": 51 +} +``` + +The **events** endpoint gives you recent events that happened, such as client connections and disconnections, +packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption), +both can be configured when running the server and default to 10 seconds, 100,000 events. + +For example, a call to the `events` endpoint will yield a response that looks like this: +```json +{ + "data": [ + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:09.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:10.446332+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:13.448622+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:15.452107+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Disconnected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914812+03:00", + "proxyName": "Distributed Cache", + "eventType": "Upstream unavailable" + } + ], + "token": "a439j" +} +``` +Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP) +and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target. + +The `token` field can be used in consecutive calls, to get only unread messages. The next call in this example +would be `/events?token=a439j`. + ### CLI Example ```bash diff --git a/api.go b/api.go index 02edd9aa..7891146a 100644 --- a/api.go +++ b/api.go @@ -3,10 +3,12 @@ package toxiproxy import ( "encoding/json" "fmt" + "github.com/Shopify/toxiproxy/metrics" "log" "net" "net/http" "os" + "strconv" "strings" "github.com/Shopify/toxiproxy/toxics" @@ -72,6 +74,8 @@ func (server *ApiServer) Listen(host string, port string) { r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicUpdate).Methods("POST") r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicDelete).Methods("DELETE") + r.HandleFunc("/events", server.GetMetricEvents).Methods("GET") + r.HandleFunc("/metrics", server.GetMetrics).Methods("GET") r.HandleFunc("/version", server.Version).Methods("GET") http.Handle("/", StopBrowsersMiddleware(r)) @@ -108,6 +112,49 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http. } } +func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) { + token := request.URL.Query().Get("token") + var result metrics.EventsAndToken + if len(token) >= 1 { + tokenNum, err := strconv.Atoi(token) + if err != nil { + apiError(response, newError("token must be a one returned from this api", http.StatusBadRequest)) + return + } + result = metrics.GetMetricEventsStartingFrom(tokenNum) + } else { + result = metrics.GetMetricEvents() + } + + body, err := json.Marshal(result) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + +func (server *ApiServer) GetMetrics(response http.ResponseWriter, request *http.Request) { + data := metrics.GetMetrics() + + body, err := json.Marshal(data) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) { proxies := server.Collection.Proxies() diff --git a/cmd/toxiproxy.go b/cmd/toxiproxy.go index 5fb5b8e0..5c98851f 100644 --- a/cmd/toxiproxy.go +++ b/cmd/toxiproxy.go @@ -2,6 +2,7 @@ package main import ( "flag" + "github.com/Shopify/toxiproxy/metrics" "math/rand" "os" "os/signal" @@ -14,17 +15,23 @@ import ( var host string var port string var config string +var metricsTimeToKeep string +var metricsMaxEvents int func init() { flag.StringVar(&host, "host", "localhost", "Host for toxiproxy's API to listen on") flag.StringVar(&port, "port", "8474", "Port for toxiproxy's API to listen on") flag.StringVar(&config, "config", "", "JSON file containing proxies to create on startup") + flag.StringVar(&metricsTimeToKeep, "metrics-time", "10s", "Oldest age of events to keep in toxiproxy metrics (e.g. 20s)") + flag.IntVar(&metricsMaxEvents, "metrics-max", 100000, "Max num of events to keep in toxiproxy events") seed := flag.Int64("seed", time.Now().UTC().UnixNano(), "Seed for randomizing toxics with") flag.Parse() rand.Seed(*seed) } func main() { + metrics.InitSettings(metricsTimeToKeep, metricsMaxEvents) + server := toxiproxy.NewServer() if len(config) > 0 { server.PopulateConfig(config) diff --git a/link.go b/link.go index 7a67ae16..a3f4576c 100644 --- a/link.go +++ b/link.go @@ -1,7 +1,10 @@ package toxiproxy import ( + "github.com/Shopify/toxiproxy/metrics" "io" + "net" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/Shopify/toxiproxy/toxics" @@ -45,7 +48,13 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di next = make(chan *stream.StreamChunk) } - link.stubs[i] = toxics.NewToxicStub(last, next) + proxyName := "Unknown" + proxyUpstream := "Unknown" + if proxy != nil { + proxyName = proxy.Name + proxyUpstream = proxy.Upstream + } + link.stubs[i] = toxics.NewToxicStub(last, next, proxyName, proxyUpstream) last = next } link.output = stream.NewChanReader(last) @@ -53,15 +62,26 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di } // Start the link with the specified toxics -func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) { +func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) { + // assigned here so can be safely used in go routines + proxyName := link.proxy.Name + upstreamName := link.proxy.Upstream + go func() { bytes, err := io.Copy(link.input, source) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Source terminated") + + metrics.RegisterEvent(metrics.Event{ + EventType: "Client Disconnected", + Client: source.RemoteAddr().String(), + Upstream: upstreamName, + ProxyName: proxyName, + Time: time.Now()}) } link.input.Close() }() @@ -76,7 +96,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) bytes, err := io.Copy(dest, link.output) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Destination terminated") @@ -92,7 +112,15 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { i := len(link.stubs) newin := make(chan *stream.StreamChunk, toxic.BufferSize) - link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output)) + + proxyName := "Unknown" + proxyUpstream := "Unknown" + if link.proxy != nil { + proxyName = link.proxy.Name + proxyUpstream = link.proxy.Upstream + } + + link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output, proxyName, proxyUpstream)) // Interrupt the last toxic so that we don't have a race when moving channels if link.stubs[i-1].InterruptToxic() { diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..c3f981e6 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,140 @@ +package metrics + +import ( + "container/list" + "go/types" + "time" +) + +var timeToKeep time.Duration +var maxNumOfEvents = 1 // default + +var eventList = list.New() +var lastId = 0 + +var incomingEventsChannel = make(chan Event, 100) +var eventsRequests = make(chan int) +var eventsResponse = make(chan EventsAndToken) +var metricsRequests = make(chan types.Nil) +var metricsResponse = make(chan map[string]uint64) + +type Event struct { + token int + Client string `json:"client"` + Upstream string `json:"target"` + Time time.Time `json:"timestamp"` + ProxyName string `json:"proxyName"` + EventType string `json:"eventType"` +} + +type EventsAndToken struct { + Data []Event `json:"data"` + Token int `json:"token,string"` +} + +var messages = make(map[string]uint64) + +func init() { + go metricsRoutine() +} + +// Init the settings. This should be called before registering events +func InitSettings(maxTimeToKeep string, maxEvents int) { + var err error + timeToKeep, err = time.ParseDuration("-" + maxTimeToKeep) + + if err != nil { + panic("Max metrics time is not a duration: " + maxTimeToKeep) + } + + maxNumOfEvents = maxEvents +} + +// The main requests handling routine. The routine makes sure that only one request +// (either external or RegisterEvent calls) is handled simultaneously. +func metricsRoutine() { + for { + select { + case event := <-incomingEventsChannel: + registerEvent(event) + case <-metricsRequests: + metricsResponse <- getMetrics() + case token := <-eventsRequests: + eventsResponse <- getMetricEventsStartingFrom(token) + } + } +} + +// Report an events that should be collected by metrics. +func RegisterEvent(event Event) { + incomingEventsChannel <- event +} + +func registerEvent(event Event) { + messages[event.ProxyName] += 1 + event.token = lastId + lastId++ + + eventList.PushBack(event) + + // Cleanup stale events - more than max number or max age: + + for eventList.Len() > maxNumOfEvents { + eventList.Remove(eventList.Front()) + } + + startTime := time.Now().Add(timeToKeep) + + for eventList.Len() > 0 && eventList.Front().Value.(Event).Time.Before(startTime) { + eventList.Remove(eventList.Front()) + } +} + +// Get general metrics (proxy name -> num of events related to the proxy) since launch +func GetMetrics() map[string]uint64 { + metricsRequests <- types.Nil{} + return <-metricsResponse +} + +func getMetrics() map[string]uint64 { + return messages +} + +// Get all available events in history. The result includes a token tha can be used +// to call GetMetricEventsStartingFrom in order get only unread events. +func GetMetricEvents() EventsAndToken { + return GetMetricEventsStartingFrom(-1) +} + +// Get all unread events in history. The token parameter should be a result of a previous call. +// The result includes a token tha can be used in the following call in order get only unread events. +func GetMetricEventsStartingFrom(token int) EventsAndToken { + eventsRequests <- token + return <-eventsResponse +} + +func getMetricEventsStartingFrom(token int) EventsAndToken { + var e *list.Element + var skippedCount = 0 + + // skip seen elements + for e = eventList.Front(); e != nil && e.Value.(Event).token <= token; e = e.Next() { + skippedCount++ + } + var result []Event + for ; e != nil; e = e.Next() { + result = append(result, e.Value.(Event)) + } + + if result == nil { + result = []Event{} + } + var returnedToken int + if eventList.Len() > 0 { + returnedToken = eventList.Back().Value.(Event).token + } else { // no events exist + returnedToken = -1 + } + + return EventsAndToken{result, returnedToken} +} diff --git a/proxy.go b/proxy.go index 1d165053..0ae20e71 100644 --- a/proxy.go +++ b/proxy.go @@ -2,7 +2,9 @@ package toxiproxy import ( "errors" + "github.com/Shopify/toxiproxy/metrics" "sync" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/sirupsen/logrus" @@ -158,6 +160,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Info("Accepted client") + metrics.RegisterEvent(metrics.Event{EventType: "Client Connected", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) upstream, err := net.Dial("tcp", proxy.Upstream) if err != nil { @@ -167,6 +170,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Error("Unable to open connection to upstream") + metrics.RegisterEvent(metrics.Event{EventType: "Upstream unavailable", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) client.Close() continue } diff --git a/toxic_collection.go b/toxic_collection.go index 3aded164..63d43474 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "sync" @@ -26,9 +27,15 @@ type ToxicCollection struct { } func NewToxicCollection(proxy *Proxy) *ToxicCollection { + toxic := new(toxics.NoopToxic) + if proxy != nil { + toxic.ProxyName = proxy.Name + toxic.Upstream = proxy.Upstream + } + collection := &ToxicCollection{ noop: &toxics.ToxicWrapper{ - Toxic: new(toxics.NoopToxic), + Toxic: toxic, Type: "noop", }, proxy: proxy, @@ -84,11 +91,15 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er var buffer bytes.Buffer + toxic := new(toxics.NoopToxic) + toxic.ProxyName = c.proxy.Name + toxic.Upstream = c.proxy.Upstream + // Default to a downstream toxic with a toxicity of 1. wrapper := &toxics.ToxicWrapper{ Stream: "downstream", Toxicity: 1.0, - Toxic: new(toxics.NoopToxic), + Toxic: toxic, } err := json.NewDecoder(io.TeeReader(data, &buffer)).Decode(wrapper) @@ -169,7 +180,7 @@ func (c *ToxicCollection) RemoveToxic(name string) error { return ErrToxicNotFound } -func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) { +func (c *ToxicCollection) StartLink(name string, input net.Conn, output io.WriteCloser, direction stream.Direction) { c.Lock() defer c.Unlock() diff --git a/toxics/limit_data_test.go b/toxics/limit_data_test.go index 50b3572d..1f1c2634 100644 --- a/toxics/limit_data_test.go +++ b/toxics/limit_data_test.go @@ -33,7 +33,7 @@ func checkRemainingChunks(t *testing.T, output chan *stream.StreamChunk) { func check(t *testing.T, toxic *toxics.LimitDataToxic, chunks [][]byte, expectedChunks [][]byte) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go toxic.Pipe(stub) @@ -54,7 +54,7 @@ func TestLimitDataToxicMayBeRestarted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() buf := buffer(90) @@ -85,7 +85,7 @@ func TestLimitDataToxicMayBeInterrupted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { @@ -100,7 +100,7 @@ func TestLimitDataToxicNilShouldClosePipe(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { diff --git a/toxics/noop.go b/toxics/noop.go index 08cdcde7..3eb80203 100644 --- a/toxics/noop.go +++ b/toxics/noop.go @@ -1,7 +1,15 @@ package toxics -// The NoopToxic passes all data through without any toxic effects. -type NoopToxic struct{} +import ( + "github.com/Shopify/toxiproxy/metrics" + "time" +) + +// The NoopToxic passes all data through without any toxic effects, and collects metrics +type NoopToxic struct { + ProxyName string + Upstream string +} func (t *NoopToxic) Pipe(stub *ToxicStub) { for { @@ -13,11 +21,14 @@ func (t *NoopToxic) Pipe(stub *ToxicStub) { stub.Close() return } + metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: "Message"}) stub.Output <- c } } } func init() { - Register("noop", new(NoopToxic)) + toxic := new(NoopToxic) + toxic.ProxyName = "fake proxy" + Register("noop", toxic) } diff --git a/toxics/slicer_test.go b/toxics/slicer_test.go index 6be1da19..74c56032 100644 --- a/toxics/slicer_test.go +++ b/toxics/slicer_test.go @@ -16,7 +16,7 @@ func TestSlicerToxic(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") done := make(chan bool) go func() { diff --git a/toxics/toxic.go b/toxics/toxic.go index bccd5f88..04c0d0d8 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -62,14 +62,18 @@ type ToxicStub struct { Interrupt chan struct{} running chan struct{} closed chan struct{} + proxyName string + upstream string } -func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub { +func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk, proxyName string, upstream string) *ToxicStub { return &ToxicStub{ Interrupt: make(chan struct{}), closed: make(chan struct{}), Input: input, Output: output, + proxyName: proxyName, + upstream: upstream, } } @@ -81,7 +85,10 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { if rand.Float32() < toxic.Toxicity { toxic.Pipe(s) } else { - new(NoopToxic).Pipe(s) + noopToxic := new(NoopToxic) + noopToxic.ProxyName = s.proxyName + noopToxic.Upstream = s.upstream + noopToxic.Pipe(s) } } From a8e9c30041cea709fd1a576de40a6bfcd97330f2 Mon Sep 17 00:00:00 2001 From: "alon.kashtan" Date: Mon, 20 Apr 2020 17:01:41 +0300 Subject: [PATCH 2/9] Add metrics unit tests --- metrics/metrics.go | 23 ++++- metrics/metrics_test.go | 191 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 metrics/metrics_test.go diff --git a/metrics/metrics.go b/metrics/metrics.go index c3f981e6..5ac46bfc 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "container/list" + "fmt" "go/types" "time" ) @@ -9,8 +10,8 @@ import ( var timeToKeep time.Duration var maxNumOfEvents = 1 // default -var eventList = list.New() -var lastId = 0 +var eventList *list.List +var lastId int var incomingEventsChannel = make(chan Event, 100) var eventsRequests = make(chan int) @@ -27,14 +28,27 @@ type Event struct { EventType string `json:"eventType"` } +func (e Event) String() string { + return fmt.Sprintf("Event {token: %d, proxy: %s}", e.token, e.ProxyName) +} + type EventsAndToken struct { Data []Event `json:"data"` Token int `json:"token,string"` } -var messages = make(map[string]uint64) +var messages map[string]uint64 + +// (re) initializes data. Except for first initialization, intended to be used only in tests. +// Not "thread-safe". +func resetData() { + eventList = list.New() + messages = make(map[string]uint64) + lastId = 0 +} func init() { + resetData() go metricsRoutine() } @@ -66,6 +80,9 @@ func metricsRoutine() { } // Report an events that should be collected by metrics. +// +// Note: this is an asynchronous operation, so calls to Get methods in this module +// might not reflect an event registration immediately func RegisterEvent(event Event) { incomingEventsChannel <- event } diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 00000000..e9977fcb --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,191 @@ +package metrics + +import ( + "testing" + "time" +) + +var duration3m, _ = time.ParseDuration("3m") + +func eventEquals(e1 Event, e2 Event) bool { + return e1.ProxyName == e2.ProxyName +} + +func TestRegisterEvent_Simple(t *testing.T) { + resetData() + InitSettings("10m", 1) + event := Event{Time: time.Now(), ProxyName: "e"} + + registerEvent(event) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event) { + t.Error("eventList does not contain exactly the one register event") + } +} + +func TestRegisterEvent_AllowOne_AddTwo_LastOneRemains(t *testing.T) { + resetData() + InitSettings("10m", 1) + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the last register event, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsolete_EmptyList(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + + registerEvent(event1) + + if eventList.Len() != 0 { + t.Error("eventList should be empty, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsoleteAndNew_OnlyNewRemains(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the one registered event, len =", eventList.Len()) + } +} + +func TestGetMetricEvents_ReturnsAllEvents(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + registerEvent(event1) + registerEvent(event2) + registerEvent(event3) + + events := GetMetricEvents() + + if len(events.Data) != 3 || + !eventEquals(events.Data[0], event1) || + !eventEquals(events.Data[1], event2) || + !eventEquals(events.Data[2], event3) { + t.Error("the events do not match exactly the registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + event4 := Event{Time: time.Now(), ProxyName: "e4"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + registerEvent(event3) + registerEvent(event4) + + // Act + events = GetMetricEventsStartingFrom(events.Token) + + // Assert + if len(events.Data) != 2 || + !eventEquals(events.Data[0], event3) || + !eventEquals(events.Data[1], event4) { + t.Error("the events do not match exactly the new registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_NoNewEvents_EmptyList(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + // Act + events = GetMetricEventsStartingFrom(events.Token) + + // Assert + if len(events.Data) != 0 { + t.Error("the event list is not empty, events =", events.Data) + } +} + +func TestGetMetrics_Simple(t *testing.T) { + resetData() + InitSettings("2m", 10) + + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_SomeEventsExpired_ExpiredEventsStillCount(t *testing.T) { + resetData() + InitSettings("2m", 3) + + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_NoEvents_EmptyMetrics(t *testing.T) { + resetData() + InitSettings("2m", 3) + + metrics := GetMetrics() + + if len(metrics) != 0 { + t.Error("the metrics are not empty, metrics =", metrics) + } +} From d88a0d751b2cbeb48b2898a39498fe5f17f3526c Mon Sep 17 00:00:00 2001 From: "alon.kashtan" Date: Tue, 21 Apr 2020 12:34:21 +0300 Subject: [PATCH 3/9] Change "token" to "location" --- README.md | 8 ++++---- api.go | 12 ++++++------ metrics/metrics.go | 42 ++++++++++++++++++++--------------------- metrics/metrics_test.go | 6 +++--- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 08cd013a..5b8f0faf 100644 --- a/README.md +++ b/README.md @@ -467,7 +467,7 @@ All endpoints are JSON. - **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic - **POST /reset** - Enable all proxies and remove all active toxics - **GET /metrics** - Get metrics information (global) - - **GET /events** - Returns all available events history, with optional token (see bellow) + - **GET /events** - Returns all available events history, with optional location (see bellow) - **GET /version** - Returns the server version number #### Populating Proxies @@ -549,14 +549,14 @@ For example, a call to the `events` endpoint will yield a response that looks li "eventType": "Upstream unavailable" } ], - "token": "a439j" + "location": "a439j" } ``` Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP) and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target. -The `token` field can be used in consecutive calls, to get only unread messages. The next call in this example -would be `/events?token=a439j`. +The `location` field can be used in consecutive calls, to get only unread messages. The next call in this example +would be `/events?afterLocation=a439j`. ### CLI Example diff --git a/api.go b/api.go index 7891146a..e206bc4d 100644 --- a/api.go +++ b/api.go @@ -113,15 +113,15 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http. } func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) { - token := request.URL.Query().Get("token") - var result metrics.EventsAndToken - if len(token) >= 1 { - tokenNum, err := strconv.Atoi(token) + afterLocation := request.URL.Query().Get("afterLocation") + var result metrics.EventsAndLocation + if len(afterLocation) >= 1 { + locationNum, err := strconv.Atoi(afterLocation) if err != nil { - apiError(response, newError("token must be a one returned from this api", http.StatusBadRequest)) + apiError(response, newError("afterLocation must be a one returned from this api", http.StatusBadRequest)) return } - result = metrics.GetMetricEventsStartingFrom(tokenNum) + result = metrics.GetMetricEventsStartingFrom(locationNum) } else { result = metrics.GetMetricEvents() } diff --git a/metrics/metrics.go b/metrics/metrics.go index 5ac46bfc..389ea723 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -15,12 +15,12 @@ var lastId int var incomingEventsChannel = make(chan Event, 100) var eventsRequests = make(chan int) -var eventsResponse = make(chan EventsAndToken) +var eventsResponse = make(chan EventsAndLocation) var metricsRequests = make(chan types.Nil) var metricsResponse = make(chan map[string]uint64) type Event struct { - token int + location int Client string `json:"client"` Upstream string `json:"target"` Time time.Time `json:"timestamp"` @@ -29,12 +29,12 @@ type Event struct { } func (e Event) String() string { - return fmt.Sprintf("Event {token: %d, proxy: %s}", e.token, e.ProxyName) + return fmt.Sprintf("Event {location: %d, proxy: %s}", e.location, e.ProxyName) } -type EventsAndToken struct { - Data []Event `json:"data"` - Token int `json:"token,string"` +type EventsAndLocation struct { + Data []Event `json:"data"` + Location int `json:"location,string"` } var messages map[string]uint64 @@ -73,8 +73,8 @@ func metricsRoutine() { registerEvent(event) case <-metricsRequests: metricsResponse <- getMetrics() - case token := <-eventsRequests: - eventsResponse <- getMetricEventsStartingFrom(token) + case location := <-eventsRequests: + eventsResponse <- getMetricEventsStartingFrom(location) } } } @@ -89,7 +89,7 @@ func RegisterEvent(event Event) { func registerEvent(event Event) { messages[event.ProxyName] += 1 - event.token = lastId + event.location = lastId lastId++ eventList.PushBack(event) @@ -117,25 +117,25 @@ func getMetrics() map[string]uint64 { return messages } -// Get all available events in history. The result includes a token tha can be used +// Get all available events in history. The result includes a location tha can be used // to call GetMetricEventsStartingFrom in order get only unread events. -func GetMetricEvents() EventsAndToken { +func GetMetricEvents() EventsAndLocation { return GetMetricEventsStartingFrom(-1) } -// Get all unread events in history. The token parameter should be a result of a previous call. -// The result includes a token tha can be used in the following call in order get only unread events. -func GetMetricEventsStartingFrom(token int) EventsAndToken { - eventsRequests <- token +// Get all unread events in history. The location parameter should be a result of a previous call. +// The result includes a location tha can be used in the following call in order get only unread events. +func GetMetricEventsStartingFrom(location int) EventsAndLocation { + eventsRequests <- location return <-eventsResponse } -func getMetricEventsStartingFrom(token int) EventsAndToken { +func getMetricEventsStartingFrom(location int) EventsAndLocation { var e *list.Element var skippedCount = 0 // skip seen elements - for e = eventList.Front(); e != nil && e.Value.(Event).token <= token; e = e.Next() { + for e = eventList.Front(); e != nil && e.Value.(Event).location <= location; e = e.Next() { skippedCount++ } var result []Event @@ -146,12 +146,12 @@ func getMetricEventsStartingFrom(token int) EventsAndToken { if result == nil { result = []Event{} } - var returnedToken int + var returnedLocation int if eventList.Len() > 0 { - returnedToken = eventList.Back().Value.(Event).token + returnedLocation = eventList.Back().Value.(Event).location } else { // no events exist - returnedToken = -1 + returnedLocation = -1 } - return EventsAndToken{result, returnedToken} + return EventsAndLocation{result, returnedLocation} } diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index e9977fcb..c8af4ad0 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -86,7 +86,7 @@ func TestGetMetricEvents_ReturnsAllEvents(t *testing.T) { } } -func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { +func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterLocation(t *testing.T) { // Arrange resetData() InitSettings("2m", 10) @@ -105,7 +105,7 @@ func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { registerEvent(event4) // Act - events = GetMetricEventsStartingFrom(events.Token) + events = GetMetricEventsStartingFrom(events.Location) // Assert if len(events.Data) != 2 || @@ -129,7 +129,7 @@ func TestGetMetricEventsStartingFrom_NoNewEvents_EmptyList(t *testing.T) { events := GetMetricEvents() // Act - events = GetMetricEventsStartingFrom(events.Token) + events = GetMetricEventsStartingFrom(events.Location) // Assert if len(events.Data) != 0 { From e4a44161178a08f9f085e9389286f8824ccd2c58 Mon Sep 17 00:00:00 2001 From: "alon.kashtan" Date: Tue, 21 Apr 2020 16:03:04 +0300 Subject: [PATCH 4/9] Move event types to constants --- README.md | 2 ++ link.go | 2 +- metrics/event_types.go | 6 ++++++ proxy.go | 4 ++-- toxics/noop.go | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 metrics/event_types.go diff --git a/README.md b/README.md index 5b8f0faf..a37d07fe 100644 --- a/README.md +++ b/README.md @@ -502,6 +502,8 @@ The **events** endpoint gives you recent events that happened, such as client co packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption), both can be configured when running the server and default to 10 seconds, 100,000 events. +The available event types can be seen [here](metrics/event_types.go). + For example, a call to the `events` endpoint will yield a response that looks like this: ```json { diff --git a/link.go b/link.go index a3f4576c..59b52115 100644 --- a/link.go +++ b/link.go @@ -77,7 +77,7 @@ func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) }).Warn("Source terminated") metrics.RegisterEvent(metrics.Event{ - EventType: "Client Disconnected", + EventType: metrics.ClientDisconnected, Client: source.RemoteAddr().String(), Upstream: upstreamName, ProxyName: proxyName, diff --git a/metrics/event_types.go b/metrics/event_types.go new file mode 100644 index 00000000..bec75e05 --- /dev/null +++ b/metrics/event_types.go @@ -0,0 +1,6 @@ +package metrics + +const ClientConnected = "Client Connected" +const UpstreamUnavailable = "Upstream Unavailable" +const Message = "Message" +const ClientDisconnected = "Client Disconnected" diff --git a/proxy.go b/proxy.go index 0ae20e71..f5fcb6a9 100644 --- a/proxy.go +++ b/proxy.go @@ -160,7 +160,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Info("Accepted client") - metrics.RegisterEvent(metrics.Event{EventType: "Client Connected", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) + metrics.RegisterEvent(metrics.Event{EventType: metrics.ClientConnected, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) upstream, err := net.Dial("tcp", proxy.Upstream) if err != nil { @@ -170,7 +170,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Error("Unable to open connection to upstream") - metrics.RegisterEvent(metrics.Event{EventType: "Upstream unavailable", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) + metrics.RegisterEvent(metrics.Event{EventType: metrics.UpstreamUnavailable, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) client.Close() continue } diff --git a/toxics/noop.go b/toxics/noop.go index 3eb80203..2bc95f54 100644 --- a/toxics/noop.go +++ b/toxics/noop.go @@ -21,7 +21,7 @@ func (t *NoopToxic) Pipe(stub *ToxicStub) { stub.Close() return } - metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: "Message"}) + metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: metrics.Message}) stub.Output <- c } } From a365499cf9bf8640bfc32ccb54dd8d0943aaf5f8 Mon Sep 17 00:00:00 2001 From: nothinux Date: Mon, 20 May 2019 16:20:53 +0700 Subject: [PATCH 5/9] update docs --- client/README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/README.md b/client/README.md index a078f872..d498b02f 100644 --- a/client/README.md +++ b/client/README.md @@ -39,7 +39,10 @@ client := toxiproxy.NewClient("localhost:8474") You can then create a new proxy using the client: ```go -proxy := client.CreateProxy("redis", "localhost:26379", "localhost:6379") +proxy, err := client.CreateProxy("redis", "localhost:26379", "localhost:6379") +if err != nil { + panic(err) +} ``` For large amounts of proxies, they can also be created using a configuration file: From 4d73fbe888f965d6901ac911d85f8acade1e584d Mon Sep 17 00:00:00 2001 From: Alon Kashtan Date: Thu, 30 Apr 2020 10:59:41 +0300 Subject: [PATCH 6/9] Add metrics collection and endpoint to Toxiproxy - Correct author --- .dockerignore | 1 + Makefile | 2 +- README.md | 80 ++++++++++++++++++++++ api.go | 47 +++++++++++++ cmd/toxiproxy.go | 7 ++ link.go | 38 +++++++++-- metrics/metrics.go | 140 ++++++++++++++++++++++++++++++++++++++ proxy.go | 4 ++ toxic_collection.go | 17 ++++- toxics/limit_data_test.go | 8 +-- toxics/noop.go | 17 ++++- toxics/slicer_test.go | 2 +- toxics/toxic.go | 11 ++- 13 files changed, 355 insertions(+), 19 deletions(-) create mode 100644 .dockerignore create mode 100644 metrics/metrics.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..4032ec6b --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.git/ \ No newline at end of file diff --git a/Makefile b/Makefile index 381f3ccd..2805a819 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ $(DEB): tmp/build/$(SERVER_NAME)-linux-amd64 tmp/build/$(CLI_NAME)-linux-amd64 $(word 2,$^)=/usr/bin/$(CLI_NAME) \ ./share/toxiproxy.conf=/etc/init/toxiproxy.conf -docker: +docker: linux docker build --tag="shopify/toxiproxy:git" . docker-release: linux diff --git a/README.md b/README.md index fc7843f3..08cd013a 100644 --- a/README.md +++ b/README.md @@ -466,6 +466,8 @@ All endpoints are JSON. - **POST /proxies/{proxy}/toxics/{toxic}** - Update an active toxic - **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic - **POST /reset** - Enable all proxies and remove all active toxics + - **GET /metrics** - Get metrics information (global) + - **GET /events** - Returns all available events history, with optional token (see bellow) - **GET /version** - Returns the server version number #### Populating Proxies @@ -478,6 +480,84 @@ A `/populate` call can be included for example at application start to ensure al exist. It is safe to make this call several times, since proxies will be untouched as long as their fields are consistent with the new data. +#### Metrics and event history + +Toxiproxy can keep metrics and record of recent events in order to enable easy visualization of real +time traffic, and enabling automated tests to verify the behaviour of the networking. This feature +makes Toxiproxy useful also for scenarios that do not involve toxics. Please note that since Toxiproxy +supports any kind of tcp stream, it does not count requests (which exists only in some protocols) +but tcp packets. + +The **metrics** endpoint gives global information about number of packets that passed through a specific +proxy. For example, we have two proxies, `Main DB` and `Distributed Cahce`, so a call to the +metrics endpoint will yield a response that looks like this: +```json +{ + "Distributed Cache": 103, + "Main DB": 51 +} +``` + +The **events** endpoint gives you recent events that happened, such as client connections and disconnections, +packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption), +both can be configured when running the server and default to 10 seconds, 100,000 events. + +For example, a call to the `events` endpoint will yield a response that looks like this: +```json +{ + "data": [ + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:09.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:10.446332+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:13.448622+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:15.452107+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Disconnected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914812+03:00", + "proxyName": "Distributed Cache", + "eventType": "Upstream unavailable" + } + ], + "token": "a439j" +} +``` +Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP) +and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target. + +The `token` field can be used in consecutive calls, to get only unread messages. The next call in this example +would be `/events?token=a439j`. + ### CLI Example ```bash diff --git a/api.go b/api.go index 02edd9aa..7891146a 100644 --- a/api.go +++ b/api.go @@ -3,10 +3,12 @@ package toxiproxy import ( "encoding/json" "fmt" + "github.com/Shopify/toxiproxy/metrics" "log" "net" "net/http" "os" + "strconv" "strings" "github.com/Shopify/toxiproxy/toxics" @@ -72,6 +74,8 @@ func (server *ApiServer) Listen(host string, port string) { r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicUpdate).Methods("POST") r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicDelete).Methods("DELETE") + r.HandleFunc("/events", server.GetMetricEvents).Methods("GET") + r.HandleFunc("/metrics", server.GetMetrics).Methods("GET") r.HandleFunc("/version", server.Version).Methods("GET") http.Handle("/", StopBrowsersMiddleware(r)) @@ -108,6 +112,49 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http. } } +func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) { + token := request.URL.Query().Get("token") + var result metrics.EventsAndToken + if len(token) >= 1 { + tokenNum, err := strconv.Atoi(token) + if err != nil { + apiError(response, newError("token must be a one returned from this api", http.StatusBadRequest)) + return + } + result = metrics.GetMetricEventsStartingFrom(tokenNum) + } else { + result = metrics.GetMetricEvents() + } + + body, err := json.Marshal(result) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + +func (server *ApiServer) GetMetrics(response http.ResponseWriter, request *http.Request) { + data := metrics.GetMetrics() + + body, err := json.Marshal(data) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) { proxies := server.Collection.Proxies() diff --git a/cmd/toxiproxy.go b/cmd/toxiproxy.go index 5fb5b8e0..5c98851f 100644 --- a/cmd/toxiproxy.go +++ b/cmd/toxiproxy.go @@ -2,6 +2,7 @@ package main import ( "flag" + "github.com/Shopify/toxiproxy/metrics" "math/rand" "os" "os/signal" @@ -14,17 +15,23 @@ import ( var host string var port string var config string +var metricsTimeToKeep string +var metricsMaxEvents int func init() { flag.StringVar(&host, "host", "localhost", "Host for toxiproxy's API to listen on") flag.StringVar(&port, "port", "8474", "Port for toxiproxy's API to listen on") flag.StringVar(&config, "config", "", "JSON file containing proxies to create on startup") + flag.StringVar(&metricsTimeToKeep, "metrics-time", "10s", "Oldest age of events to keep in toxiproxy metrics (e.g. 20s)") + flag.IntVar(&metricsMaxEvents, "metrics-max", 100000, "Max num of events to keep in toxiproxy events") seed := flag.Int64("seed", time.Now().UTC().UnixNano(), "Seed for randomizing toxics with") flag.Parse() rand.Seed(*seed) } func main() { + metrics.InitSettings(metricsTimeToKeep, metricsMaxEvents) + server := toxiproxy.NewServer() if len(config) > 0 { server.PopulateConfig(config) diff --git a/link.go b/link.go index 7a67ae16..a3f4576c 100644 --- a/link.go +++ b/link.go @@ -1,7 +1,10 @@ package toxiproxy import ( + "github.com/Shopify/toxiproxy/metrics" "io" + "net" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/Shopify/toxiproxy/toxics" @@ -45,7 +48,13 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di next = make(chan *stream.StreamChunk) } - link.stubs[i] = toxics.NewToxicStub(last, next) + proxyName := "Unknown" + proxyUpstream := "Unknown" + if proxy != nil { + proxyName = proxy.Name + proxyUpstream = proxy.Upstream + } + link.stubs[i] = toxics.NewToxicStub(last, next, proxyName, proxyUpstream) last = next } link.output = stream.NewChanReader(last) @@ -53,15 +62,26 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di } // Start the link with the specified toxics -func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) { +func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) { + // assigned here so can be safely used in go routines + proxyName := link.proxy.Name + upstreamName := link.proxy.Upstream + go func() { bytes, err := io.Copy(link.input, source) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Source terminated") + + metrics.RegisterEvent(metrics.Event{ + EventType: "Client Disconnected", + Client: source.RemoteAddr().String(), + Upstream: upstreamName, + ProxyName: proxyName, + Time: time.Now()}) } link.input.Close() }() @@ -76,7 +96,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) bytes, err := io.Copy(dest, link.output) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Destination terminated") @@ -92,7 +112,15 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { i := len(link.stubs) newin := make(chan *stream.StreamChunk, toxic.BufferSize) - link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output)) + + proxyName := "Unknown" + proxyUpstream := "Unknown" + if link.proxy != nil { + proxyName = link.proxy.Name + proxyUpstream = link.proxy.Upstream + } + + link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output, proxyName, proxyUpstream)) // Interrupt the last toxic so that we don't have a race when moving channels if link.stubs[i-1].InterruptToxic() { diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..c3f981e6 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,140 @@ +package metrics + +import ( + "container/list" + "go/types" + "time" +) + +var timeToKeep time.Duration +var maxNumOfEvents = 1 // default + +var eventList = list.New() +var lastId = 0 + +var incomingEventsChannel = make(chan Event, 100) +var eventsRequests = make(chan int) +var eventsResponse = make(chan EventsAndToken) +var metricsRequests = make(chan types.Nil) +var metricsResponse = make(chan map[string]uint64) + +type Event struct { + token int + Client string `json:"client"` + Upstream string `json:"target"` + Time time.Time `json:"timestamp"` + ProxyName string `json:"proxyName"` + EventType string `json:"eventType"` +} + +type EventsAndToken struct { + Data []Event `json:"data"` + Token int `json:"token,string"` +} + +var messages = make(map[string]uint64) + +func init() { + go metricsRoutine() +} + +// Init the settings. This should be called before registering events +func InitSettings(maxTimeToKeep string, maxEvents int) { + var err error + timeToKeep, err = time.ParseDuration("-" + maxTimeToKeep) + + if err != nil { + panic("Max metrics time is not a duration: " + maxTimeToKeep) + } + + maxNumOfEvents = maxEvents +} + +// The main requests handling routine. The routine makes sure that only one request +// (either external or RegisterEvent calls) is handled simultaneously. +func metricsRoutine() { + for { + select { + case event := <-incomingEventsChannel: + registerEvent(event) + case <-metricsRequests: + metricsResponse <- getMetrics() + case token := <-eventsRequests: + eventsResponse <- getMetricEventsStartingFrom(token) + } + } +} + +// Report an events that should be collected by metrics. +func RegisterEvent(event Event) { + incomingEventsChannel <- event +} + +func registerEvent(event Event) { + messages[event.ProxyName] += 1 + event.token = lastId + lastId++ + + eventList.PushBack(event) + + // Cleanup stale events - more than max number or max age: + + for eventList.Len() > maxNumOfEvents { + eventList.Remove(eventList.Front()) + } + + startTime := time.Now().Add(timeToKeep) + + for eventList.Len() > 0 && eventList.Front().Value.(Event).Time.Before(startTime) { + eventList.Remove(eventList.Front()) + } +} + +// Get general metrics (proxy name -> num of events related to the proxy) since launch +func GetMetrics() map[string]uint64 { + metricsRequests <- types.Nil{} + return <-metricsResponse +} + +func getMetrics() map[string]uint64 { + return messages +} + +// Get all available events in history. The result includes a token tha can be used +// to call GetMetricEventsStartingFrom in order get only unread events. +func GetMetricEvents() EventsAndToken { + return GetMetricEventsStartingFrom(-1) +} + +// Get all unread events in history. The token parameter should be a result of a previous call. +// The result includes a token tha can be used in the following call in order get only unread events. +func GetMetricEventsStartingFrom(token int) EventsAndToken { + eventsRequests <- token + return <-eventsResponse +} + +func getMetricEventsStartingFrom(token int) EventsAndToken { + var e *list.Element + var skippedCount = 0 + + // skip seen elements + for e = eventList.Front(); e != nil && e.Value.(Event).token <= token; e = e.Next() { + skippedCount++ + } + var result []Event + for ; e != nil; e = e.Next() { + result = append(result, e.Value.(Event)) + } + + if result == nil { + result = []Event{} + } + var returnedToken int + if eventList.Len() > 0 { + returnedToken = eventList.Back().Value.(Event).token + } else { // no events exist + returnedToken = -1 + } + + return EventsAndToken{result, returnedToken} +} diff --git a/proxy.go b/proxy.go index 1d165053..0ae20e71 100644 --- a/proxy.go +++ b/proxy.go @@ -2,7 +2,9 @@ package toxiproxy import ( "errors" + "github.com/Shopify/toxiproxy/metrics" "sync" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/sirupsen/logrus" @@ -158,6 +160,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Info("Accepted client") + metrics.RegisterEvent(metrics.Event{EventType: "Client Connected", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) upstream, err := net.Dial("tcp", proxy.Upstream) if err != nil { @@ -167,6 +170,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Error("Unable to open connection to upstream") + metrics.RegisterEvent(metrics.Event{EventType: "Upstream unavailable", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) client.Close() continue } diff --git a/toxic_collection.go b/toxic_collection.go index 3aded164..63d43474 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "sync" @@ -26,9 +27,15 @@ type ToxicCollection struct { } func NewToxicCollection(proxy *Proxy) *ToxicCollection { + toxic := new(toxics.NoopToxic) + if proxy != nil { + toxic.ProxyName = proxy.Name + toxic.Upstream = proxy.Upstream + } + collection := &ToxicCollection{ noop: &toxics.ToxicWrapper{ - Toxic: new(toxics.NoopToxic), + Toxic: toxic, Type: "noop", }, proxy: proxy, @@ -84,11 +91,15 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er var buffer bytes.Buffer + toxic := new(toxics.NoopToxic) + toxic.ProxyName = c.proxy.Name + toxic.Upstream = c.proxy.Upstream + // Default to a downstream toxic with a toxicity of 1. wrapper := &toxics.ToxicWrapper{ Stream: "downstream", Toxicity: 1.0, - Toxic: new(toxics.NoopToxic), + Toxic: toxic, } err := json.NewDecoder(io.TeeReader(data, &buffer)).Decode(wrapper) @@ -169,7 +180,7 @@ func (c *ToxicCollection) RemoveToxic(name string) error { return ErrToxicNotFound } -func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) { +func (c *ToxicCollection) StartLink(name string, input net.Conn, output io.WriteCloser, direction stream.Direction) { c.Lock() defer c.Unlock() diff --git a/toxics/limit_data_test.go b/toxics/limit_data_test.go index 50b3572d..1f1c2634 100644 --- a/toxics/limit_data_test.go +++ b/toxics/limit_data_test.go @@ -33,7 +33,7 @@ func checkRemainingChunks(t *testing.T, output chan *stream.StreamChunk) { func check(t *testing.T, toxic *toxics.LimitDataToxic, chunks [][]byte, expectedChunks [][]byte) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go toxic.Pipe(stub) @@ -54,7 +54,7 @@ func TestLimitDataToxicMayBeRestarted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() buf := buffer(90) @@ -85,7 +85,7 @@ func TestLimitDataToxicMayBeInterrupted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { @@ -100,7 +100,7 @@ func TestLimitDataToxicNilShouldClosePipe(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { diff --git a/toxics/noop.go b/toxics/noop.go index 08cdcde7..3eb80203 100644 --- a/toxics/noop.go +++ b/toxics/noop.go @@ -1,7 +1,15 @@ package toxics -// The NoopToxic passes all data through without any toxic effects. -type NoopToxic struct{} +import ( + "github.com/Shopify/toxiproxy/metrics" + "time" +) + +// The NoopToxic passes all data through without any toxic effects, and collects metrics +type NoopToxic struct { + ProxyName string + Upstream string +} func (t *NoopToxic) Pipe(stub *ToxicStub) { for { @@ -13,11 +21,14 @@ func (t *NoopToxic) Pipe(stub *ToxicStub) { stub.Close() return } + metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: "Message"}) stub.Output <- c } } } func init() { - Register("noop", new(NoopToxic)) + toxic := new(NoopToxic) + toxic.ProxyName = "fake proxy" + Register("noop", toxic) } diff --git a/toxics/slicer_test.go b/toxics/slicer_test.go index 6be1da19..74c56032 100644 --- a/toxics/slicer_test.go +++ b/toxics/slicer_test.go @@ -16,7 +16,7 @@ func TestSlicerToxic(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") done := make(chan bool) go func() { diff --git a/toxics/toxic.go b/toxics/toxic.go index bccd5f88..04c0d0d8 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -62,14 +62,18 @@ type ToxicStub struct { Interrupt chan struct{} running chan struct{} closed chan struct{} + proxyName string + upstream string } -func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub { +func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk, proxyName string, upstream string) *ToxicStub { return &ToxicStub{ Interrupt: make(chan struct{}), closed: make(chan struct{}), Input: input, Output: output, + proxyName: proxyName, + upstream: upstream, } } @@ -81,7 +85,10 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { if rand.Float32() < toxic.Toxicity { toxic.Pipe(s) } else { - new(NoopToxic).Pipe(s) + noopToxic := new(NoopToxic) + noopToxic.ProxyName = s.proxyName + noopToxic.Upstream = s.upstream + noopToxic.Pipe(s) } } From 9005f07085aaa678a36e266389054f8572a86c27 Mon Sep 17 00:00:00 2001 From: Alon Kashtan Date: Thu, 30 Apr 2020 11:00:02 +0300 Subject: [PATCH 7/9] Add metrics unit tests - correct author --- metrics/metrics.go | 23 ++++- metrics/metrics_test.go | 191 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 metrics/metrics_test.go diff --git a/metrics/metrics.go b/metrics/metrics.go index c3f981e6..5ac46bfc 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "container/list" + "fmt" "go/types" "time" ) @@ -9,8 +10,8 @@ import ( var timeToKeep time.Duration var maxNumOfEvents = 1 // default -var eventList = list.New() -var lastId = 0 +var eventList *list.List +var lastId int var incomingEventsChannel = make(chan Event, 100) var eventsRequests = make(chan int) @@ -27,14 +28,27 @@ type Event struct { EventType string `json:"eventType"` } +func (e Event) String() string { + return fmt.Sprintf("Event {token: %d, proxy: %s}", e.token, e.ProxyName) +} + type EventsAndToken struct { Data []Event `json:"data"` Token int `json:"token,string"` } -var messages = make(map[string]uint64) +var messages map[string]uint64 + +// (re) initializes data. Except for first initialization, intended to be used only in tests. +// Not "thread-safe". +func resetData() { + eventList = list.New() + messages = make(map[string]uint64) + lastId = 0 +} func init() { + resetData() go metricsRoutine() } @@ -66,6 +80,9 @@ func metricsRoutine() { } // Report an events that should be collected by metrics. +// +// Note: this is an asynchronous operation, so calls to Get methods in this module +// might not reflect an event registration immediately func RegisterEvent(event Event) { incomingEventsChannel <- event } diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 00000000..e9977fcb --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,191 @@ +package metrics + +import ( + "testing" + "time" +) + +var duration3m, _ = time.ParseDuration("3m") + +func eventEquals(e1 Event, e2 Event) bool { + return e1.ProxyName == e2.ProxyName +} + +func TestRegisterEvent_Simple(t *testing.T) { + resetData() + InitSettings("10m", 1) + event := Event{Time: time.Now(), ProxyName: "e"} + + registerEvent(event) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event) { + t.Error("eventList does not contain exactly the one register event") + } +} + +func TestRegisterEvent_AllowOne_AddTwo_LastOneRemains(t *testing.T) { + resetData() + InitSettings("10m", 1) + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the last register event, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsolete_EmptyList(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + + registerEvent(event1) + + if eventList.Len() != 0 { + t.Error("eventList should be empty, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsoleteAndNew_OnlyNewRemains(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the one registered event, len =", eventList.Len()) + } +} + +func TestGetMetricEvents_ReturnsAllEvents(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + registerEvent(event1) + registerEvent(event2) + registerEvent(event3) + + events := GetMetricEvents() + + if len(events.Data) != 3 || + !eventEquals(events.Data[0], event1) || + !eventEquals(events.Data[1], event2) || + !eventEquals(events.Data[2], event3) { + t.Error("the events do not match exactly the registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + event4 := Event{Time: time.Now(), ProxyName: "e4"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + registerEvent(event3) + registerEvent(event4) + + // Act + events = GetMetricEventsStartingFrom(events.Token) + + // Assert + if len(events.Data) != 2 || + !eventEquals(events.Data[0], event3) || + !eventEquals(events.Data[1], event4) { + t.Error("the events do not match exactly the new registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_NoNewEvents_EmptyList(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + // Act + events = GetMetricEventsStartingFrom(events.Token) + + // Assert + if len(events.Data) != 0 { + t.Error("the event list is not empty, events =", events.Data) + } +} + +func TestGetMetrics_Simple(t *testing.T) { + resetData() + InitSettings("2m", 10) + + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_SomeEventsExpired_ExpiredEventsStillCount(t *testing.T) { + resetData() + InitSettings("2m", 3) + + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_NoEvents_EmptyMetrics(t *testing.T) { + resetData() + InitSettings("2m", 3) + + metrics := GetMetrics() + + if len(metrics) != 0 { + t.Error("the metrics are not empty, metrics =", metrics) + } +} From 41ad50ee76c338b87dc10d6d5437ab9d15491dd1 Mon Sep 17 00:00:00 2001 From: Alon Kashtan Date: Thu, 30 Apr 2020 11:00:41 +0300 Subject: [PATCH 8/9] Change "token" to "location" - correct author --- README.md | 8 ++++---- api.go | 12 ++++++------ metrics/metrics.go | 42 ++++++++++++++++++++--------------------- metrics/metrics_test.go | 6 +++--- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 08cd013a..5b8f0faf 100644 --- a/README.md +++ b/README.md @@ -467,7 +467,7 @@ All endpoints are JSON. - **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic - **POST /reset** - Enable all proxies and remove all active toxics - **GET /metrics** - Get metrics information (global) - - **GET /events** - Returns all available events history, with optional token (see bellow) + - **GET /events** - Returns all available events history, with optional location (see bellow) - **GET /version** - Returns the server version number #### Populating Proxies @@ -549,14 +549,14 @@ For example, a call to the `events` endpoint will yield a response that looks li "eventType": "Upstream unavailable" } ], - "token": "a439j" + "location": "a439j" } ``` Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP) and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target. -The `token` field can be used in consecutive calls, to get only unread messages. The next call in this example -would be `/events?token=a439j`. +The `location` field can be used in consecutive calls, to get only unread messages. The next call in this example +would be `/events?afterLocation=a439j`. ### CLI Example diff --git a/api.go b/api.go index 7891146a..e206bc4d 100644 --- a/api.go +++ b/api.go @@ -113,15 +113,15 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http. } func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) { - token := request.URL.Query().Get("token") - var result metrics.EventsAndToken - if len(token) >= 1 { - tokenNum, err := strconv.Atoi(token) + afterLocation := request.URL.Query().Get("afterLocation") + var result metrics.EventsAndLocation + if len(afterLocation) >= 1 { + locationNum, err := strconv.Atoi(afterLocation) if err != nil { - apiError(response, newError("token must be a one returned from this api", http.StatusBadRequest)) + apiError(response, newError("afterLocation must be a one returned from this api", http.StatusBadRequest)) return } - result = metrics.GetMetricEventsStartingFrom(tokenNum) + result = metrics.GetMetricEventsStartingFrom(locationNum) } else { result = metrics.GetMetricEvents() } diff --git a/metrics/metrics.go b/metrics/metrics.go index 5ac46bfc..389ea723 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -15,12 +15,12 @@ var lastId int var incomingEventsChannel = make(chan Event, 100) var eventsRequests = make(chan int) -var eventsResponse = make(chan EventsAndToken) +var eventsResponse = make(chan EventsAndLocation) var metricsRequests = make(chan types.Nil) var metricsResponse = make(chan map[string]uint64) type Event struct { - token int + location int Client string `json:"client"` Upstream string `json:"target"` Time time.Time `json:"timestamp"` @@ -29,12 +29,12 @@ type Event struct { } func (e Event) String() string { - return fmt.Sprintf("Event {token: %d, proxy: %s}", e.token, e.ProxyName) + return fmt.Sprintf("Event {location: %d, proxy: %s}", e.location, e.ProxyName) } -type EventsAndToken struct { - Data []Event `json:"data"` - Token int `json:"token,string"` +type EventsAndLocation struct { + Data []Event `json:"data"` + Location int `json:"location,string"` } var messages map[string]uint64 @@ -73,8 +73,8 @@ func metricsRoutine() { registerEvent(event) case <-metricsRequests: metricsResponse <- getMetrics() - case token := <-eventsRequests: - eventsResponse <- getMetricEventsStartingFrom(token) + case location := <-eventsRequests: + eventsResponse <- getMetricEventsStartingFrom(location) } } } @@ -89,7 +89,7 @@ func RegisterEvent(event Event) { func registerEvent(event Event) { messages[event.ProxyName] += 1 - event.token = lastId + event.location = lastId lastId++ eventList.PushBack(event) @@ -117,25 +117,25 @@ func getMetrics() map[string]uint64 { return messages } -// Get all available events in history. The result includes a token tha can be used +// Get all available events in history. The result includes a location tha can be used // to call GetMetricEventsStartingFrom in order get only unread events. -func GetMetricEvents() EventsAndToken { +func GetMetricEvents() EventsAndLocation { return GetMetricEventsStartingFrom(-1) } -// Get all unread events in history. The token parameter should be a result of a previous call. -// The result includes a token tha can be used in the following call in order get only unread events. -func GetMetricEventsStartingFrom(token int) EventsAndToken { - eventsRequests <- token +// Get all unread events in history. The location parameter should be a result of a previous call. +// The result includes a location tha can be used in the following call in order get only unread events. +func GetMetricEventsStartingFrom(location int) EventsAndLocation { + eventsRequests <- location return <-eventsResponse } -func getMetricEventsStartingFrom(token int) EventsAndToken { +func getMetricEventsStartingFrom(location int) EventsAndLocation { var e *list.Element var skippedCount = 0 // skip seen elements - for e = eventList.Front(); e != nil && e.Value.(Event).token <= token; e = e.Next() { + for e = eventList.Front(); e != nil && e.Value.(Event).location <= location; e = e.Next() { skippedCount++ } var result []Event @@ -146,12 +146,12 @@ func getMetricEventsStartingFrom(token int) EventsAndToken { if result == nil { result = []Event{} } - var returnedToken int + var returnedLocation int if eventList.Len() > 0 { - returnedToken = eventList.Back().Value.(Event).token + returnedLocation = eventList.Back().Value.(Event).location } else { // no events exist - returnedToken = -1 + returnedLocation = -1 } - return EventsAndToken{result, returnedToken} + return EventsAndLocation{result, returnedLocation} } diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index e9977fcb..c8af4ad0 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -86,7 +86,7 @@ func TestGetMetricEvents_ReturnsAllEvents(t *testing.T) { } } -func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { +func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterLocation(t *testing.T) { // Arrange resetData() InitSettings("2m", 10) @@ -105,7 +105,7 @@ func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterToken(t *testing.T) { registerEvent(event4) // Act - events = GetMetricEventsStartingFrom(events.Token) + events = GetMetricEventsStartingFrom(events.Location) // Assert if len(events.Data) != 2 || @@ -129,7 +129,7 @@ func TestGetMetricEventsStartingFrom_NoNewEvents_EmptyList(t *testing.T) { events := GetMetricEvents() // Act - events = GetMetricEventsStartingFrom(events.Token) + events = GetMetricEventsStartingFrom(events.Location) // Assert if len(events.Data) != 0 { From e5b4cf744d2015fb2432c1135ed3de3ff35a9f88 Mon Sep 17 00:00:00 2001 From: Alon Kashtan Date: Thu, 30 Apr 2020 11:01:13 +0300 Subject: [PATCH 9/9] Move event types to constants - correct author --- README.md | 2 ++ link.go | 2 +- metrics/event_types.go | 6 ++++++ proxy.go | 4 ++-- toxics/noop.go | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 metrics/event_types.go diff --git a/README.md b/README.md index 5b8f0faf..a37d07fe 100644 --- a/README.md +++ b/README.md @@ -502,6 +502,8 @@ The **events** endpoint gives you recent events that happened, such as client co packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption), both can be configured when running the server and default to 10 seconds, 100,000 events. +The available event types can be seen [here](metrics/event_types.go). + For example, a call to the `events` endpoint will yield a response that looks like this: ```json { diff --git a/link.go b/link.go index a3f4576c..59b52115 100644 --- a/link.go +++ b/link.go @@ -77,7 +77,7 @@ func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) }).Warn("Source terminated") metrics.RegisterEvent(metrics.Event{ - EventType: "Client Disconnected", + EventType: metrics.ClientDisconnected, Client: source.RemoteAddr().String(), Upstream: upstreamName, ProxyName: proxyName, diff --git a/metrics/event_types.go b/metrics/event_types.go new file mode 100644 index 00000000..bec75e05 --- /dev/null +++ b/metrics/event_types.go @@ -0,0 +1,6 @@ +package metrics + +const ClientConnected = "Client Connected" +const UpstreamUnavailable = "Upstream Unavailable" +const Message = "Message" +const ClientDisconnected = "Client Disconnected" diff --git a/proxy.go b/proxy.go index 0ae20e71..f5fcb6a9 100644 --- a/proxy.go +++ b/proxy.go @@ -160,7 +160,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Info("Accepted client") - metrics.RegisterEvent(metrics.Event{EventType: "Client Connected", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) + metrics.RegisterEvent(metrics.Event{EventType: metrics.ClientConnected, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) upstream, err := net.Dial("tcp", proxy.Upstream) if err != nil { @@ -170,7 +170,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Error("Unable to open connection to upstream") - metrics.RegisterEvent(metrics.Event{EventType: "Upstream unavailable", Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) + metrics.RegisterEvent(metrics.Event{EventType: metrics.UpstreamUnavailable, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) client.Close() continue } diff --git a/toxics/noop.go b/toxics/noop.go index 3eb80203..2bc95f54 100644 --- a/toxics/noop.go +++ b/toxics/noop.go @@ -21,7 +21,7 @@ func (t *NoopToxic) Pipe(stub *ToxicStub) { stub.Close() return } - metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: "Message"}) + metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: metrics.Message}) stub.Output <- c } }