diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 50c078c060..ef08010252 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -42,6 +42,9 @@ type payload struct { // buf holds the sequence of msgpack-encoded items. buf bytes.Buffer + + // closed specifies the notification channel for each Close call. + closed chan struct{} } var _ io.Reader = (*payload)(nil) @@ -51,6 +54,7 @@ func newPayload() *payload { p := &payload{ header: make([]byte, 8), off: 8, + closed: make(chan struct{}, 1), } return p } @@ -81,6 +85,11 @@ func (p *payload) reset() { p.off = 8 p.count = 0 p.buf.Reset() + select { + case <-p.closed: + // ensure there is room + default: + } } // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family @@ -109,6 +118,20 @@ func (p *payload) updateHeader() { } } +// Close implements io.Closer +func (p *payload) Close() error { + select { + case p.closed <- struct{}{}: + default: + // ignore subsequent Close calls + } + return nil +} + +// waitClose blocks until the first Close call occurs since the payload +// was constructed or the last reset happened. +func (p *payload) waitClose() { <-p.closed } + // Read implements io.Reader. It reads from the msgpack-encoded stream. func (p *payload) Read(b []byte) (n int, err error) { if p.off < len(p.header) { diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index 0dad3e0113..f8b073a3f1 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -18,12 +18,11 @@ import ( var fixedTime = now() -func newSpanList(count int) spanList { - n := count%5 + 1 // max trace size 5 +func newSpanList(n int) spanList { itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"} list := make([]*span, n) for i := 0; i < n; i++ { - list[i] = newBasicSpan("span.list." + itoa[i]) + list[i] = newBasicSpan("span.list." + itoa[i%5+1]) list[i].Start = fixedTime } return list @@ -41,7 +40,7 @@ func TestPayloadIntegrity(t *testing.T) { p.reset() lists := make(spanLists, n) for i := 0; i < n; i++ { - list := newSpanList(i) + list := newSpanList(i%5 + 1) lists[i] = list p.push(list) } @@ -67,7 +66,7 @@ func TestPayloadDecode(t *testing.T) { t.Run(strconv.Itoa(n), func(t *testing.T) { p.reset() for i := 0; i < n; i++ { - p.push(newSpanList(i)) + p.push(newSpanList(i%5 + 1)) } var got spanLists err := msgp.Decode(p, &got) diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 0f804ff65f..9334ca3300 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -133,7 +133,6 @@ func readContainerID(r io.Reader) (id string, ok bool) { } func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) { - // prepare the client and send the payload req, err := http.NewRequest("POST", t.traceURL, p) if err != nil { return nil, fmt.Errorf("cannot create http request: %v", err) @@ -147,6 +146,7 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) { if err != nil { return nil, err } + p.waitClose() if code := response.StatusCode; code >= 400 { // error, check the body for context information and // return a nice error. diff --git a/ddtrace/tracer/transport_test.go b/ddtrace/tracer/transport_test.go index 6645a0641f..8f626d3527 100644 --- a/ddtrace/tracer/transport_test.go +++ b/ddtrace/tracer/transport_test.go @@ -6,6 +6,7 @@ package tracer import ( + "context" "fmt" "io/ioutil" "net" @@ -16,6 +17,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -270,3 +272,37 @@ func TestCustomTransport(t *testing.T) { // make sure our custom round tripper was used assert.Len(customRoundTripper.reqs, 1) } + +// TestTransportHTTPRace defines a regression tests where the request body was being +// read even after http.Client.Do returns. See golang/go#33244 +func TestTransportHTTPRace(t *testing.T) { + srv := http.Server{ + Addr: "127.0.0.1:8889", + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body.Read(make([]byte, 4096)) + w.WriteHeader(http.StatusRequestEntityTooLarge) + }), + } + done := make(chan struct{}) + go func() { + srv.ListenAndServe() + done <- struct{}{} + }() + trans := &httpTransport{ + traceURL: "http://127.0.0.1:8889/", + client: &http.Client{}, + } + p := newPayload() + spanList := newSpanList(50) + for i := 0; i < 100; i++ { + for j := 0; j < 100; j++ { + p.push(spanList) + } + trans.send(p) + p.reset() + } + ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond) + defer cancelFunc() + srv.Shutdown(ctx) + <-done +} diff --git a/internal/version/version.go b/internal/version/version.go index f9eb5f104c..a442dfc56f 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -8,4 +8,4 @@ package version // Tag specifies the current release tag. It needs to be manually // updated. A test checks that the value of Tag never points to a // git tag that is older than HEAD. -const Tag = "v1.16.0" +const Tag = "v1.16.1"