Skip to content

Commit

Permalink
ddtrace/tracer: fix race condition in transport (#475)
Browse files Browse the repository at this point in the history
This change introduces a notification channel to allow waiting for the
HTTP transport to close the reader.

See: https://github.com/golang/go/blob/go1.12/src/net/http/client.go#L135-L137
  • Loading branch information
gbbr authored Jul 24, 2019
1 parent 71b316f commit 612cde3
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
23 changes: 23 additions & 0 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type payload struct {

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer

// closed specifies the notification channel for each Close call.
closed chan struct{}
}

var _ io.Reader = (*payload)(nil)
Expand All @@ -51,6 +54,7 @@ func newPayload() *payload {
p := &payload{
header: make([]byte, 8),
off: 8,
closed: make(chan struct{}, 1),
}
return p
}
Expand Down Expand Up @@ -81,6 +85,11 @@ func (p *payload) reset() {
p.off = 8
p.count = 0
p.buf.Reset()
select {
case <-p.closed:
// ensure there is room
default:
}
}

// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
Expand Down Expand Up @@ -109,6 +118,20 @@ func (p *payload) updateHeader() {
}
}

// Close implements io.Closer
func (p *payload) Close() error {
select {
case p.closed <- struct{}{}:
default:
// ignore subsequent Close calls
}
return nil
}

// waitClose blocks until the first Close call occurs since the payload
// was constructed or the last reset happened.
func (p *payload) waitClose() { <-p.closed }

// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *payload) Read(b []byte) (n int, err error) {
if p.off < len(p.header) {
Expand Down
9 changes: 4 additions & 5 deletions ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (

var fixedTime = now()

func newSpanList(count int) spanList {
n := count%5 + 1 // max trace size 5
func newSpanList(n int) spanList {
itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"}
list := make([]*span, n)
for i := 0; i < n; i++ {
list[i] = newBasicSpan("span.list." + itoa[i])
list[i] = newBasicSpan("span.list." + itoa[i%5+1])
list[i].Start = fixedTime
}
return list
Expand All @@ -41,7 +40,7 @@ func TestPayloadIntegrity(t *testing.T) {
p.reset()
lists := make(spanLists, n)
for i := 0; i < n; i++ {
list := newSpanList(i)
list := newSpanList(i%5 + 1)
lists[i] = list
p.push(list)
}
Expand All @@ -67,7 +66,7 @@ func TestPayloadDecode(t *testing.T) {
t.Run(strconv.Itoa(n), func(t *testing.T) {
p.reset()
for i := 0; i < n; i++ {
p.push(newSpanList(i))
p.push(newSpanList(i%5 + 1))
}
var got spanLists
err := msgp.Decode(p, &got)
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func readContainerID(r io.Reader) (id string, ok bool) {
}

func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
// prepare the client and send the payload
req, err := http.NewRequest("POST", t.traceURL, p)
if err != nil {
return nil, fmt.Errorf("cannot create http request: %v", err)
Expand All @@ -147,6 +146,7 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
if err != nil {
return nil, err
}
p.waitClose()
if code := response.StatusCode; code >= 400 {
// error, check the body for context information and
// return a nice error.
Expand Down
36 changes: 36 additions & 0 deletions ddtrace/tracer/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tracer

import (
"context"
"fmt"
"io/ioutil"
"net"
Expand All @@ -16,6 +17,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -270,3 +272,37 @@ func TestCustomTransport(t *testing.T) {
// make sure our custom round tripper was used
assert.Len(customRoundTripper.reqs, 1)
}

// TestTransportHTTPRace defines a regression tests where the request body was being
// read even after http.Client.Do returns. See golang/go#33244
func TestTransportHTTPRace(t *testing.T) {
srv := http.Server{
Addr: "127.0.0.1:8889",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body.Read(make([]byte, 4096))
w.WriteHeader(http.StatusRequestEntityTooLarge)
}),
}
done := make(chan struct{})
go func() {
srv.ListenAndServe()
done <- struct{}{}
}()
trans := &httpTransport{
traceURL: "http://127.0.0.1:8889/",
client: &http.Client{},
}
p := newPayload()
spanList := newSpanList(50)
for i := 0; i < 100; i++ {
for j := 0; j < 100; j++ {
p.push(spanList)
}
trans.send(p)
p.reset()
}
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond)
defer cancelFunc()
srv.Shutdown(ctx)
<-done
}
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ package version
// Tag specifies the current release tag. It needs to be manually
// updated. A test checks that the value of Tag never points to a
// git tag that is older than HEAD.
const Tag = "v1.16.0"
const Tag = "v1.16.1"

0 comments on commit 612cde3

Please sign in to comment.