Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DuneAPI: new batch wire format (breaking change!) #64

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions client/duneapi/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package duneapi

import (
"encoding/json"
"io"

"github.com/duneanalytics/blockchain-ingester/models"
)

type BlockBatchHeader struct {
BlockSizes []int `json:"block_sizes"`
}

func WriteBlockBatch(out io.Writer, payloads []models.RPCBlock, disableHeader bool) error {
// we write a batch header (single line, NDJSON) with the size of each block payload and then concatenate the payloads
header := BlockBatchHeader{
BlockSizes: make([]int, len(payloads)),
}
for i, block := range payloads {
header.BlockSizes[i] = len(block.Payload)
}
// allow disabling the header for testing/backwards compatibility
if !disableHeader {
buf, err := json.Marshal(header)
if err != nil {
return err
}
_, err = out.Write(buf)
if err != nil {
return err
}
_, err = out.Write([]byte("\n"))
if err != nil {
return err
}
}
for _, block := range payloads {
_, err := out.Write(block.Payload)
if err != nil {
return err
}
}
return nil
}
95 changes: 95 additions & 0 deletions client/duneapi/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package duneapi_test

import (
"bufio"
"bytes"
"encoding/json"
"io"
"testing"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/stretchr/testify/require"
)

func TestWriteBlockBatch(t *testing.T) {
tests := []struct {
name string
payloads []models.RPCBlock
expected string
}{
{
name: "single payload",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}`)},
},
expected: `{"block_sizes":[11]}
{"block":1}`,
},
{
name: "multiple payloads, with new lines",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}` + "\n")},
{Payload: []byte(`{"block":2}` + "\n")},
},
expected: `{"block_sizes":[12,12]}
{"block":1}
{"block":2}
`,
},
{
name: "multiple payloads, no newlines",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}`)},
{Payload: []byte(`{"block":2}`)},
},
expected: `{"block_sizes":[11,11]}
{"block":1}{"block":2}`,
},
{
name: "empty payloads",
payloads: []models.RPCBlock{},
expected: `{"block_sizes":[]}
`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var buf bytes.Buffer
err := duneapi.WriteBlockBatch(&buf, tt.payloads, false)
require.NoError(t, err)

require.Equal(t, tt.expected, buf.String())
rebuilt, err := ReadBlockBatch(&buf)
require.NoError(t, err)
require.EqualValues(t, tt.payloads, rebuilt)
})
}
}

func ReadBlockBatch(buf *bytes.Buffer) ([]models.RPCBlock, error) {
reader := bufio.NewReader(buf)
headerLine, err := reader.ReadString('\n')
if err != nil {
return nil, err
}

var header duneapi.BlockBatchHeader
err = json.Unmarshal([]byte(headerLine), &header)
if err != nil {
return nil, err
}

payloads := make([]models.RPCBlock, len(header.BlockSizes))
for i, size := range header.BlockSizes {
payload := make([]byte, size)
_, err := io.ReadFull(reader, payload)
if err != nil {
return nil, err
}
payloads[i] = models.RPCBlock{Payload: payload}
}

return payloads, nil
}
25 changes: 11 additions & 14 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,28 @@ func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) err

func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) {
request := &BlockchainIngestRequest{}
var err error

buffer.Reset()
// not thread safe, multiple calls to the compressor here
if c.cfg.DisableCompression {
buffer.Reset()
for _, block := range payloads {
_, err := buffer.Write(block.Payload)
if err != nil {
return nil, err
}
err = WriteBlockBatch(buffer, payloads, c.cfg.DisableBatchHeader)
if err != nil {
return nil, err
}
request.Payload = buffer.Bytes()
} else {
buffer.Reset()
c.compressor.Reset(buffer)
for _, block := range payloads {
_, err := c.compressor.Write(block.Payload)
if err != nil {
return nil, err
}
err = WriteBlockBatch(c.compressor, payloads, c.cfg.DisableBatchHeader)
if err != nil {
return nil, err
}
err := c.compressor.Close()
if err != nil {
return nil, err
}
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.Payload = buffer.Bytes()

numbers := make([]string, len(payloads))
for i, payload := range payloads {
Expand All @@ -137,6 +132,7 @@ func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer)
request.BlockNumbers = blockNumbers
request.IdempotencyKey = c.idempotencyKey(*request)
request.EVMStack = c.cfg.Stack.String()
request.BatchSize = len(payloads)
return request, nil
}

Expand Down Expand Up @@ -276,6 +272,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
Expand Down
2 changes: 2 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
// - lowers latency
// - reduces bandwidth
DisableCompression bool

DisableBatchHeader bool // for testing/backwards compatibility
}

// The response from the DuneAPI ingest endpoint.
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func main() {
BlockchainName: cfg.BlockchainName,
Stack: cfg.RPCStack,
DisableCompression: cfg.DisableCompression,
DisableBatchHeader: cfg.Dune.DisableBatchHeader,
})
if err != nil {
stdlog.Fatal(err)
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

type DuneClient struct {
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"`
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` // nolint:lll
DisableBatchHeader bool `long:"duneapi-disable-batch-header" env:"DUNEAPI_DISABLE_BATCH_HEADERS" description:"Disable batch headers on DuneAPI request payload"` // nolint:lll
}

func (d DuneClient) HasError() error {
Expand Down
Loading