Skip to content

Commit

Permalink
Chunk up requests (#5)
Browse files Browse the repository at this point in the history
* Chunk up requests - Amazon denies if you send more than 100
  • Loading branch information
wildefires authored Feb 15, 2021
1 parent 2b3374c commit 690b4a0
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions timestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"golang.org/x/net/http2"
)

const TimestreamMaxRecordsPerRequest = 100

type TimeStreamAdapter struct {
databaseName string
logger *zap.SugaredLogger
Expand Down Expand Up @@ -121,15 +123,17 @@ func (t TimeStreamAdapter) Write(req *prompb.WriteRequest) (err error) {
records := t.toRecords(req)
receivedSamples.Add(float64(len(records)))

_, err = t.WriteRecords(&timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(t.databaseName),
TableName: aws.String(t.tableName),
Records: records,
})
for _, chunk := range t.splitRecords(records) {
_, err = t.WriteRecords(&timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(t.databaseName),
TableName: aws.String(t.tableName),
Records: chunk,
})

if err != nil {
t.logger.Warnw("Error sending samples to remote storage", "err", err, "storage", t.Name(), "num_samples", len(records))
failedSamples.WithLabelValues(t.Name()).Add(float64(len(records)))
if err != nil {
t.logger.Warnw("Error sending samples to remote storage", "err", err, "storage", t.Name(), "num_samples", len(chunk))
failedSamples.WithLabelValues(t.Name()).Add(float64(len(chunk)))
}
}
sentSamples.WithLabelValues(t.Name()).Add(float64(len(records)))

Expand Down Expand Up @@ -162,6 +166,21 @@ func (t TimeStreamAdapter) toRecords(writeRequest *prompb.WriteRequest) (records
return
}

func (t TimeStreamAdapter) splitRecords(records []*timestreamwrite.Record) [][]*timestreamwrite.Record {
var chunked [][]*timestreamwrite.Record

for i := 0; i < len(records); i += TimestreamMaxRecordsPerRequest {
end := i + TimestreamMaxRecordsPerRequest
if end > len(records) {
end = len(records)
}

chunked = append(chunked, records[i:end])
}
t.logger.Debugf("Successfully split %d records into %d chunks", len(records), len(chunked))
return chunked
}

// Read implementation

func (t TimeStreamAdapter) Read(request *prompb.ReadRequest) (response *prompb.ReadResponse, err error) {
Expand Down

0 comments on commit 690b4a0

Please sign in to comment.