Skip to content

Commit 75d478b

Browse files
committed
test: handle empty newlines in BenchmarkPublisher
1 parent 477584b commit 75d478b

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

internal/publish/pub_test.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/stretchr/testify/require"
3333

3434
"go.elastic.co/apm/v2/apmtest"
35+
"go.elastic.co/fastjson"
3536

3637
"github.com/elastic/beats/v7/libbeat/beat"
3738
"github.com/elastic/beats/v7/libbeat/idxmgmt"
@@ -97,6 +98,8 @@ func TestPublisherStopShutdownInactive(t *testing.T) {
9798
}
9899

99100
func BenchmarkPublisher(b *testing.B) {
101+
require.NoError(b, logp.DevelopmentSetup(logp.ToObserverOutput()))
102+
100103
mux := http.NewServeMux()
101104
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
102105
w.Header().Set("X-Elastic-Product", "Elasticsearch")
@@ -109,14 +112,30 @@ func BenchmarkPublisher(b *testing.B) {
109112
assert.NoError(b, err)
110113
defer gzr.Close()
111114

115+
var jsonw fastjson.Writer
116+
jsonw.RawString(`{"items":[`)
117+
first := true
118+
112119
scanner := bufio.NewScanner(gzr)
113120
var n int64
114-
for scanner.Scan() { // index
115-
if scanner.Scan() { // actual event
116-
n++
121+
122+
// stop if there's no more data or we bump into an empty line
123+
// Prevent an issue with clients appending newlines to
124+
// valid requests
125+
for scanner.Scan() && len(scanner.Bytes()) != 0 { // index
126+
require.True(b, scanner.Scan())
127+
128+
if first {
129+
first = false
130+
} else {
131+
jsonw.RawByte(',')
117132
}
133+
jsonw.RawString(`{"create":{"status":201}}`)
134+
n++
118135
}
119136
assert.NoError(b, scanner.Err())
137+
jsonw.RawString(`]}`)
138+
w.Write(jsonw.Bytes())
120139
indexed.Add(n)
121140
})
122141
srv := httptest.NewServer(mux)
@@ -137,9 +156,12 @@ func BenchmarkPublisher(b *testing.B) {
137156
namespace := config.Namespace{}
138157
err = conf.Unpack(&namespace)
139158
require.NoError(b, err)
159+
140160
pipeline, err := pipeline.New(
141161
beat.Info{},
142-
pipeline.Monitors{},
162+
pipeline.Monitors{
163+
Logger: logp.NewLogger("monitor"),
164+
},
143165
namespace,
144166
outputGroup,
145167
pipeline.Settings{

0 commit comments

Comments
 (0)