diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..4032ec6b --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.git/ \ No newline at end of file diff --git a/Makefile b/Makefile index 381f3ccd..2805a819 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ $(DEB): tmp/build/$(SERVER_NAME)-linux-amd64 tmp/build/$(CLI_NAME)-linux-amd64 $(word 2,$^)=/usr/bin/$(CLI_NAME) \ ./share/toxiproxy.conf=/etc/init/toxiproxy.conf -docker: +docker: linux docker build --tag="shopify/toxiproxy:git" . docker-release: linux diff --git a/README.md b/README.md index fc7843f3..a37d07fe 100644 --- a/README.md +++ b/README.md @@ -466,6 +466,8 @@ All endpoints are JSON. - **POST /proxies/{proxy}/toxics/{toxic}** - Update an active toxic - **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic - **POST /reset** - Enable all proxies and remove all active toxics + - **GET /metrics** - Get metrics information (global) + - **GET /events** - Returns all available events history, with optional location (see bellow) - **GET /version** - Returns the server version number #### Populating Proxies @@ -478,6 +480,86 @@ A `/populate` call can be included for example at application start to ensure al exist. It is safe to make this call several times, since proxies will be untouched as long as their fields are consistent with the new data. +#### Metrics and event history + +Toxiproxy can keep metrics and record of recent events in order to enable easy visualization of real +time traffic, and enabling automated tests to verify the behaviour of the networking. This feature +makes Toxiproxy useful also for scenarios that do not involve toxics. Please note that since Toxiproxy +supports any kind of tcp stream, it does not count requests (which exists only in some protocols) +but tcp packets. + +The **metrics** endpoint gives global information about number of packets that passed through a specific +proxy. For example, we have two proxies, `Main DB` and `Distributed Cahce`, so a call to the +metrics endpoint will yield a response that looks like this: +```json +{ + "Distributed Cache": 103, + "Main DB": 51 +} +``` + +The **events** endpoint gives you recent events that happened, such as client connections and disconnections, +packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption), +both can be configured when running the server and default to 10 seconds, 100,000 events. + +The available event types can be seen [here](metrics/event_types.go). + +For example, a call to the `events` endpoint will yield a response that looks like this: +```json +{ + "data": [ + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:09.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:10.446332+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:13.448622+03:00", + "proxyName": "Distributed Cache", + "eventType": "Message" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:15.452107+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Disconnected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914659+03:00", + "proxyName": "Distributed Cache", + "eventType": "Client Connected" + }, + { + "client": "[::1]:50189", + "target": "127.0.0.1:4000", + "timestamp": "2020-04-07T17:12:19.914812+03:00", + "proxyName": "Distributed Cache", + "eventType": "Upstream unavailable" + } + ], + "location": "a439j" +} +``` +Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP) +and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target. + +The `location` field can be used in consecutive calls, to get only unread messages. The next call in this example +would be `/events?afterLocation=a439j`. + ### CLI Example ```bash diff --git a/api.go b/api.go index 02edd9aa..e206bc4d 100644 --- a/api.go +++ b/api.go @@ -3,10 +3,12 @@ package toxiproxy import ( "encoding/json" "fmt" + "github.com/Shopify/toxiproxy/metrics" "log" "net" "net/http" "os" + "strconv" "strings" "github.com/Shopify/toxiproxy/toxics" @@ -72,6 +74,8 @@ func (server *ApiServer) Listen(host string, port string) { r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicUpdate).Methods("POST") r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicDelete).Methods("DELETE") + r.HandleFunc("/events", server.GetMetricEvents).Methods("GET") + r.HandleFunc("/metrics", server.GetMetrics).Methods("GET") r.HandleFunc("/version", server.Version).Methods("GET") http.Handle("/", StopBrowsersMiddleware(r)) @@ -108,6 +112,49 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http. } } +func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) { + afterLocation := request.URL.Query().Get("afterLocation") + var result metrics.EventsAndLocation + if len(afterLocation) >= 1 { + locationNum, err := strconv.Atoi(afterLocation) + if err != nil { + apiError(response, newError("afterLocation must be a one returned from this api", http.StatusBadRequest)) + return + } + result = metrics.GetMetricEventsStartingFrom(locationNum) + } else { + result = metrics.GetMetricEvents() + } + + body, err := json.Marshal(result) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + +func (server *ApiServer) GetMetrics(response http.ResponseWriter, request *http.Request) { + data := metrics.GetMetrics() + + body, err := json.Marshal(data) + + if apiError(response, err) { + return + } + + response.Header().Set("Content-Type", "application/json") + _, err = response.Write(body) + if err != nil { + logrus.Warn("ProxyIndex: Failed to write response to client", err) + } +} + func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) { proxies := server.Collection.Proxies() diff --git a/cmd/toxiproxy.go b/cmd/toxiproxy.go index 5fb5b8e0..5c98851f 100644 --- a/cmd/toxiproxy.go +++ b/cmd/toxiproxy.go @@ -2,6 +2,7 @@ package main import ( "flag" + "github.com/Shopify/toxiproxy/metrics" "math/rand" "os" "os/signal" @@ -14,17 +15,23 @@ import ( var host string var port string var config string +var metricsTimeToKeep string +var metricsMaxEvents int func init() { flag.StringVar(&host, "host", "localhost", "Host for toxiproxy's API to listen on") flag.StringVar(&port, "port", "8474", "Port for toxiproxy's API to listen on") flag.StringVar(&config, "config", "", "JSON file containing proxies to create on startup") + flag.StringVar(&metricsTimeToKeep, "metrics-time", "10s", "Oldest age of events to keep in toxiproxy metrics (e.g. 20s)") + flag.IntVar(&metricsMaxEvents, "metrics-max", 100000, "Max num of events to keep in toxiproxy events") seed := flag.Int64("seed", time.Now().UTC().UnixNano(), "Seed for randomizing toxics with") flag.Parse() rand.Seed(*seed) } func main() { + metrics.InitSettings(metricsTimeToKeep, metricsMaxEvents) + server := toxiproxy.NewServer() if len(config) > 0 { server.PopulateConfig(config) diff --git a/link.go b/link.go index 7a67ae16..59b52115 100644 --- a/link.go +++ b/link.go @@ -1,7 +1,10 @@ package toxiproxy import ( + "github.com/Shopify/toxiproxy/metrics" "io" + "net" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/Shopify/toxiproxy/toxics" @@ -45,7 +48,13 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di next = make(chan *stream.StreamChunk) } - link.stubs[i] = toxics.NewToxicStub(last, next) + proxyName := "Unknown" + proxyUpstream := "Unknown" + if proxy != nil { + proxyName = proxy.Name + proxyUpstream = proxy.Upstream + } + link.stubs[i] = toxics.NewToxicStub(last, next, proxyName, proxyUpstream) last = next } link.output = stream.NewChanReader(last) @@ -53,15 +62,26 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di } // Start the link with the specified toxics -func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) { +func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) { + // assigned here so can be safely used in go routines + proxyName := link.proxy.Name + upstreamName := link.proxy.Upstream + go func() { bytes, err := io.Copy(link.input, source) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Source terminated") + + metrics.RegisterEvent(metrics.Event{ + EventType: metrics.ClientDisconnected, + Client: source.RemoteAddr().String(), + Upstream: upstreamName, + ProxyName: proxyName, + Time: time.Now()}) } link.input.Close() }() @@ -76,7 +96,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) bytes, err := io.Copy(dest, link.output) if err != nil { logrus.WithFields(logrus.Fields{ - "name": link.proxy.Name, + "name": proxyName, "bytes": bytes, "err": err, }).Warn("Destination terminated") @@ -92,7 +112,15 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { i := len(link.stubs) newin := make(chan *stream.StreamChunk, toxic.BufferSize) - link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output)) + + proxyName := "Unknown" + proxyUpstream := "Unknown" + if link.proxy != nil { + proxyName = link.proxy.Name + proxyUpstream = link.proxy.Upstream + } + + link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output, proxyName, proxyUpstream)) // Interrupt the last toxic so that we don't have a race when moving channels if link.stubs[i-1].InterruptToxic() { diff --git a/metrics/event_types.go b/metrics/event_types.go new file mode 100644 index 00000000..bec75e05 --- /dev/null +++ b/metrics/event_types.go @@ -0,0 +1,6 @@ +package metrics + +const ClientConnected = "Client Connected" +const UpstreamUnavailable = "Upstream Unavailable" +const Message = "Message" +const ClientDisconnected = "Client Disconnected" diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..389ea723 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,157 @@ +package metrics + +import ( + "container/list" + "fmt" + "go/types" + "time" +) + +var timeToKeep time.Duration +var maxNumOfEvents = 1 // default + +var eventList *list.List +var lastId int + +var incomingEventsChannel = make(chan Event, 100) +var eventsRequests = make(chan int) +var eventsResponse = make(chan EventsAndLocation) +var metricsRequests = make(chan types.Nil) +var metricsResponse = make(chan map[string]uint64) + +type Event struct { + location int + Client string `json:"client"` + Upstream string `json:"target"` + Time time.Time `json:"timestamp"` + ProxyName string `json:"proxyName"` + EventType string `json:"eventType"` +} + +func (e Event) String() string { + return fmt.Sprintf("Event {location: %d, proxy: %s}", e.location, e.ProxyName) +} + +type EventsAndLocation struct { + Data []Event `json:"data"` + Location int `json:"location,string"` +} + +var messages map[string]uint64 + +// (re) initializes data. Except for first initialization, intended to be used only in tests. +// Not "thread-safe". +func resetData() { + eventList = list.New() + messages = make(map[string]uint64) + lastId = 0 +} + +func init() { + resetData() + go metricsRoutine() +} + +// Init the settings. This should be called before registering events +func InitSettings(maxTimeToKeep string, maxEvents int) { + var err error + timeToKeep, err = time.ParseDuration("-" + maxTimeToKeep) + + if err != nil { + panic("Max metrics time is not a duration: " + maxTimeToKeep) + } + + maxNumOfEvents = maxEvents +} + +// The main requests handling routine. The routine makes sure that only one request +// (either external or RegisterEvent calls) is handled simultaneously. +func metricsRoutine() { + for { + select { + case event := <-incomingEventsChannel: + registerEvent(event) + case <-metricsRequests: + metricsResponse <- getMetrics() + case location := <-eventsRequests: + eventsResponse <- getMetricEventsStartingFrom(location) + } + } +} + +// Report an events that should be collected by metrics. +// +// Note: this is an asynchronous operation, so calls to Get methods in this module +// might not reflect an event registration immediately +func RegisterEvent(event Event) { + incomingEventsChannel <- event +} + +func registerEvent(event Event) { + messages[event.ProxyName] += 1 + event.location = lastId + lastId++ + + eventList.PushBack(event) + + // Cleanup stale events - more than max number or max age: + + for eventList.Len() > maxNumOfEvents { + eventList.Remove(eventList.Front()) + } + + startTime := time.Now().Add(timeToKeep) + + for eventList.Len() > 0 && eventList.Front().Value.(Event).Time.Before(startTime) { + eventList.Remove(eventList.Front()) + } +} + +// Get general metrics (proxy name -> num of events related to the proxy) since launch +func GetMetrics() map[string]uint64 { + metricsRequests <- types.Nil{} + return <-metricsResponse +} + +func getMetrics() map[string]uint64 { + return messages +} + +// Get all available events in history. The result includes a location tha can be used +// to call GetMetricEventsStartingFrom in order get only unread events. +func GetMetricEvents() EventsAndLocation { + return GetMetricEventsStartingFrom(-1) +} + +// Get all unread events in history. The location parameter should be a result of a previous call. +// The result includes a location tha can be used in the following call in order get only unread events. +func GetMetricEventsStartingFrom(location int) EventsAndLocation { + eventsRequests <- location + return <-eventsResponse +} + +func getMetricEventsStartingFrom(location int) EventsAndLocation { + var e *list.Element + var skippedCount = 0 + + // skip seen elements + for e = eventList.Front(); e != nil && e.Value.(Event).location <= location; e = e.Next() { + skippedCount++ + } + var result []Event + for ; e != nil; e = e.Next() { + result = append(result, e.Value.(Event)) + } + + if result == nil { + result = []Event{} + } + var returnedLocation int + if eventList.Len() > 0 { + returnedLocation = eventList.Back().Value.(Event).location + } else { // no events exist + returnedLocation = -1 + } + + return EventsAndLocation{result, returnedLocation} +} diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 00000000..c8af4ad0 --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,191 @@ +package metrics + +import ( + "testing" + "time" +) + +var duration3m, _ = time.ParseDuration("3m") + +func eventEquals(e1 Event, e2 Event) bool { + return e1.ProxyName == e2.ProxyName +} + +func TestRegisterEvent_Simple(t *testing.T) { + resetData() + InitSettings("10m", 1) + event := Event{Time: time.Now(), ProxyName: "e"} + + registerEvent(event) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event) { + t.Error("eventList does not contain exactly the one register event") + } +} + +func TestRegisterEvent_AllowOne_AddTwo_LastOneRemains(t *testing.T) { + resetData() + InitSettings("10m", 1) + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the last register event, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsolete_EmptyList(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + + registerEvent(event1) + + if eventList.Len() != 0 { + t.Error("eventList should be empty, len =", eventList.Len()) + } +} + +func TestRegisterEvent_AllowTwoMinutes_AddObsoleteAndNew_OnlyNewRemains(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now().Add(-duration3m), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + if eventList.Len() != 1 || !eventEquals(eventList.Front().Value.(Event), event2) { + t.Error("eventList does not contain exactly the one registered event, len =", eventList.Len()) + } +} + +func TestGetMetricEvents_ReturnsAllEvents(t *testing.T) { + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + registerEvent(event1) + registerEvent(event2) + registerEvent(event3) + + events := GetMetricEvents() + + if len(events.Data) != 3 || + !eventEquals(events.Data[0], event1) || + !eventEquals(events.Data[1], event2) || + !eventEquals(events.Data[2], event3) { + t.Error("the events do not match exactly the registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_ReturnsAllEventsAfterLocation(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + event3 := Event{Time: time.Now(), ProxyName: "e3"} + event4 := Event{Time: time.Now(), ProxyName: "e4"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + registerEvent(event3) + registerEvent(event4) + + // Act + events = GetMetricEventsStartingFrom(events.Location) + + // Assert + if len(events.Data) != 2 || + !eventEquals(events.Data[0], event3) || + !eventEquals(events.Data[1], event4) { + t.Error("the events do not match exactly the new registered events, events =", events.Data) + } +} + +func TestGetMetricEventsStartingFrom_NoNewEvents_EmptyList(t *testing.T) { + // Arrange + resetData() + InitSettings("2m", 10) + + event1 := Event{Time: time.Now(), ProxyName: "e1"} + event2 := Event{Time: time.Now(), ProxyName: "e2"} + + registerEvent(event1) + registerEvent(event2) + + events := GetMetricEvents() + + // Act + events = GetMetricEventsStartingFrom(events.Location) + + // Assert + if len(events.Data) != 0 { + t.Error("the event list is not empty, events =", events.Data) + } +} + +func TestGetMetrics_Simple(t *testing.T) { + resetData() + InitSettings("2m", 10) + + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_SomeEventsExpired_ExpiredEventsStillCount(t *testing.T) { + resetData() + InitSettings("2m", 3) + + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now().Add(-duration3m), ProxyName: "proxy2"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy1"}) + registerEvent(Event{Time: time.Now(), ProxyName: "proxy3"}) + + metrics := GetMetrics() + + if len(metrics) != 3 || + metrics["proxy1"] != 3 || + metrics["proxy2"] != 1 || + metrics["proxy3"] != 2 { + t.Error("the metrics do not match exactly the registered events, metrics =", metrics) + } +} + +func TestGetMetrics_NoEvents_EmptyMetrics(t *testing.T) { + resetData() + InitSettings("2m", 3) + + metrics := GetMetrics() + + if len(metrics) != 0 { + t.Error("the metrics are not empty, metrics =", metrics) + } +} diff --git a/proxy.go b/proxy.go index 1d165053..f5fcb6a9 100644 --- a/proxy.go +++ b/proxy.go @@ -2,7 +2,9 @@ package toxiproxy import ( "errors" + "github.com/Shopify/toxiproxy/metrics" "sync" + "time" "github.com/Shopify/toxiproxy/stream" "github.com/sirupsen/logrus" @@ -158,6 +160,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Info("Accepted client") + metrics.RegisterEvent(metrics.Event{EventType: metrics.ClientConnected, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) upstream, err := net.Dial("tcp", proxy.Upstream) if err != nil { @@ -167,6 +170,7 @@ func (proxy *Proxy) server() { "proxy": proxy.Listen, "upstream": proxy.Upstream, }).Error("Unable to open connection to upstream") + metrics.RegisterEvent(metrics.Event{EventType: metrics.UpstreamUnavailable, Client: client.RemoteAddr().String(), Upstream: proxy.Upstream, ProxyName: proxy.Name, Time: time.Now()}) client.Close() continue } diff --git a/toxic_collection.go b/toxic_collection.go index 3aded164..63d43474 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "sync" @@ -26,9 +27,15 @@ type ToxicCollection struct { } func NewToxicCollection(proxy *Proxy) *ToxicCollection { + toxic := new(toxics.NoopToxic) + if proxy != nil { + toxic.ProxyName = proxy.Name + toxic.Upstream = proxy.Upstream + } + collection := &ToxicCollection{ noop: &toxics.ToxicWrapper{ - Toxic: new(toxics.NoopToxic), + Toxic: toxic, Type: "noop", }, proxy: proxy, @@ -84,11 +91,15 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er var buffer bytes.Buffer + toxic := new(toxics.NoopToxic) + toxic.ProxyName = c.proxy.Name + toxic.Upstream = c.proxy.Upstream + // Default to a downstream toxic with a toxicity of 1. wrapper := &toxics.ToxicWrapper{ Stream: "downstream", Toxicity: 1.0, - Toxic: new(toxics.NoopToxic), + Toxic: toxic, } err := json.NewDecoder(io.TeeReader(data, &buffer)).Decode(wrapper) @@ -169,7 +180,7 @@ func (c *ToxicCollection) RemoveToxic(name string) error { return ErrToxicNotFound } -func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) { +func (c *ToxicCollection) StartLink(name string, input net.Conn, output io.WriteCloser, direction stream.Direction) { c.Lock() defer c.Unlock() diff --git a/toxics/limit_data_test.go b/toxics/limit_data_test.go index 50b3572d..1f1c2634 100644 --- a/toxics/limit_data_test.go +++ b/toxics/limit_data_test.go @@ -33,7 +33,7 @@ func checkRemainingChunks(t *testing.T, output chan *stream.StreamChunk) { func check(t *testing.T, toxic *toxics.LimitDataToxic, chunks [][]byte, expectedChunks [][]byte) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go toxic.Pipe(stub) @@ -54,7 +54,7 @@ func TestLimitDataToxicMayBeRestarted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk, 100) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() buf := buffer(90) @@ -85,7 +85,7 @@ func TestLimitDataToxicMayBeInterrupted(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { @@ -100,7 +100,7 @@ func TestLimitDataToxicNilShouldClosePipe(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") stub.State = toxic.NewState() go func() { diff --git a/toxics/noop.go b/toxics/noop.go index 08cdcde7..2bc95f54 100644 --- a/toxics/noop.go +++ b/toxics/noop.go @@ -1,7 +1,15 @@ package toxics -// The NoopToxic passes all data through without any toxic effects. -type NoopToxic struct{} +import ( + "github.com/Shopify/toxiproxy/metrics" + "time" +) + +// The NoopToxic passes all data through without any toxic effects, and collects metrics +type NoopToxic struct { + ProxyName string + Upstream string +} func (t *NoopToxic) Pipe(stub *ToxicStub) { for { @@ -13,11 +21,14 @@ func (t *NoopToxic) Pipe(stub *ToxicStub) { stub.Close() return } + metrics.RegisterEvent(metrics.Event{ProxyName: t.ProxyName, Upstream: t.Upstream, Time: time.Now(), EventType: metrics.Message}) stub.Output <- c } } } func init() { - Register("noop", new(NoopToxic)) + toxic := new(NoopToxic) + toxic.ProxyName = "fake proxy" + Register("noop", toxic) } diff --git a/toxics/slicer_test.go b/toxics/slicer_test.go index 6be1da19..74c56032 100644 --- a/toxics/slicer_test.go +++ b/toxics/slicer_test.go @@ -16,7 +16,7 @@ func TestSlicerToxic(t *testing.T) { input := make(chan *stream.StreamChunk) output := make(chan *stream.StreamChunk) - stub := toxics.NewToxicStub(input, output) + stub := toxics.NewToxicStub(input, output, "proxy", "upstream") done := make(chan bool) go func() { diff --git a/toxics/toxic.go b/toxics/toxic.go index bccd5f88..04c0d0d8 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -62,14 +62,18 @@ type ToxicStub struct { Interrupt chan struct{} running chan struct{} closed chan struct{} + proxyName string + upstream string } -func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub { +func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk, proxyName string, upstream string) *ToxicStub { return &ToxicStub{ Interrupt: make(chan struct{}), closed: make(chan struct{}), Input: input, Output: output, + proxyName: proxyName, + upstream: upstream, } } @@ -81,7 +85,10 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { if rand.Float32() < toxic.Toxicity { toxic.Pipe(s) } else { - new(NoopToxic).Pipe(s) + noopToxic := new(NoopToxic) + noopToxic.ProxyName = s.proxyName + noopToxic.Upstream = s.upstream + noopToxic.Pipe(s) } }