Skip to content

Commit

Permalink
Add RPC for chunked send of DAS batches
Browse files Browse the repository at this point in the history
This adds support for the batch poster in AnyTrust configuration to send
batches to the DA committee in chunks of a configurable maximum HTTP
POST body size. It adds new RPC methods to the daserver executable's RPC
server, das_startChunkedStore, das_sendChunk, das_commitChunkedStore,
which the clients created by the batch poster will automatically use if
they are detected to be available on the committee server, otherwise it
will fall back to the legacy das_store method. This allows an easy roll
out of either the client or server first.

The payloads of the new RPC methods are all signed by the batch
poster. As basic DoS prevention, at most 10 uncommitted stores can be
outstanding, uncommitted stores expire after a minute, and a
das_startChunkedStore with the same arguments is not replayable after a
minute. The batch poster should only be trying to store one batch at a
time, so this should be sufficient.

The new option
--node.data-availability.rpc-aggregator.max-store-chunk-body-size is
expressed in terms of the HTTP POST body size that the operator wants
the chunk requests to stay under. 512B of padding is also added to
whatever the user operator specifies here, since some proxies or
endpoints may additionally count headers. This is orthogonal to settings
like --node.batch-poster.max-size which control the maximum uncompressed
batch size assembled by the batch poster. This should allow the batch
poster to create very large batches which are broken up into small
chunks to be sent to the committee servers.

Once the client has received confirmation to its das_startChunkedStore
request, it sends chunks in parallel using das_sendChunk, then once all
chunks are sent uses das_commitChunkedStore to cause the data to be
stored in the server and to retrieve the signed response to aggregate
into the Data Availability Certificate.

Server-side metrics are kept largely the same between chunked and
non-chunked stores to minimize dashboard/alerting changes. In the
context of chunked transfers, the metrics mean as follows:
arb_das_rpc_store_requests - Count of initiated chunked transfers
arb_das_rpc_store_success  - Successful commits of chunked transfers
arb_das_rpc_store_failure  - Failure at any stage of the chunked transfer
arb_das_rpc_store_bytes    - Bytes committed
arb_das_rpc_store_duration - Total duration of chunked transfer (ns)

Additionally two new metrics have been added to count individual
das_sendChunk requests:
arb_das_rpc_sendchunk_success
arb_das_rpc_sendchunk_failure
  • Loading branch information
Tristan-Wilson committed May 25, 2024
1 parent 57f583b commit 96885e0
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 76 deletions.
4 changes: 3 additions & 1 deletion cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type ClientStoreConfig struct {
SigningKey string `koanf:"signing-key"`
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -102,6 +103,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet", "", "wallet containing ecdsa key to sign the message with")
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand Down Expand Up @@ -150,7 +152,7 @@ func startClientStore(args []string) error {
}
}

client, err := das.NewDASRPCClient(config.URL, signer)
client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
if err != nil {
return err
}
Expand Down
15 changes: 9 additions & 6 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ import (
)

type AggregatorConfig struct {
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
Enable bool `koanf:"enable"`
AssumedHonest int `koanf:"assumed-honest"`
Backends string `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: "",
AssumedHonest: 0,
Backends: "",
MaxStoreChunkBodySize: 512 * 1024,
}

func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage/retrieval of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Bool(prefix+".enable", DefaultAggregatorConfig.Enable, "enable storage of sequencer batch data from a list of RPC endpoints; this should only be used by the batch poster and not in combination with other DAS storage types")
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.String(prefix+".backends", DefaultAggregatorConfig.Backends, "JSON RPC backend configuration")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
}

type Aggregator struct {
Expand Down
1 change: 1 addition & 0 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var DefaultDataAvailabilityConfig = DataAvailabilityConfig{
RequestTimeout: 5 * time.Second,
Enable: false,
RestAggregator: DefaultRestfulClientAggregatorConfig,
RPCAggregator: DefaultAggregatorConfig,
ParentChainConnectionAttempts: 15,
PanicOnError: false,
IpfsStorage: DefaultIpfsStorageServiceConfig,
Expand Down
108 changes: 95 additions & 13 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package das
import (
"context"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbstate/daprovider"
Expand All @@ -20,39 +22,119 @@ import (
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
signer signature.DataSignerFunc
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
}

func nilSigner(_ []byte) ([]byte, error) {
return []byte{}, nil
}

func NewDASRPCClient(target string, signer signature.DataSignerFunc) (*DASRPCClient, error) {
const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
}
if signer == nil {
signer = nilSigner
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*daprovider.DataAvailabilityCertificate, error) {
/*
var ret StartChunkedStoreResult
if err := c.clnt.CallContext(ctx, &ret, "das_startChunkedStore", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil {
func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, _ []byte) (*daprovider.DataAvailabilityCertificate, error) {
timestamp := uint64(time.Now().Unix())
nChunks := uint64(len(message)) / c.chunkSize
lastChunkSize := uint64(len(message)) % c.chunkSize
if lastChunkSize > 0 {
nChunks++
} else {
lastChunkSize = c.chunkSize
}
totalSize := uint64(len(message))

startReqSig, err := applyDasSigner(c.signer, []byte{}, timestamp, nChunks, c.chunkSize, totalSize, timeout)
if err != nil {
return nil, err
}

var startChunkedStoreResult StartChunkedStoreResult
if err := c.clnt.CallContext(ctx, &startChunkedStoreResult, "das_startChunkedStore", hexutil.Uint64(timestamp), hexutil.Uint64(nChunks), hexutil.Uint64(c.chunkSize), hexutil.Uint64(totalSize), hexutil.Uint64(timeout), hexutil.Bytes(startReqSig)); err != nil {
if strings.Contains(err.Error(), "the method das_startChunkedStore does not exist") {
return c.legacyStore(ctx, message, timeout)
}
return nil, err
}
batchId := uint64(startChunkedStoreResult.BatchId)

g := new(errgroup.Group)
for i := uint64(0); i < nChunks; i++ {
var chunk []byte
if i == nChunks-1 {
chunk = message[i*c.chunkSize : i*c.chunkSize+lastChunkSize]
} else {
chunk = message[i*c.chunkSize : (i+1)*c.chunkSize]
}
*/

return c.legacyStore(ctx, message, timeout)
inner := func(_i uint64, _chunk []byte) func() error {
return func() error { return c.sendChunk(ctx, batchId, _i, _chunk) }
}
g.Go(inner(i, chunk))
}
if err := g.Wait(); err != nil {
return nil, err
}

finalReqSig, err := applyDasSigner(c.signer, []byte{}, uint64(startChunkedStoreResult.BatchId))
if err != nil {
return nil, err
}

var storeResult StoreResult
if err := c.clnt.CallContext(ctx, &storeResult, "das_commitChunkedStore", startChunkedStoreResult.BatchId, hexutil.Bytes(finalReqSig)); err != nil {
return nil, err
}

respSig, err := blsSignatures.SignatureFromBytes(storeResult.Sig)
if err != nil {
return nil, err
}

return &daprovider.DataAvailabilityCertificate{
DataHash: common.BytesToHash(storeResult.DataHash),
Timeout: uint64(storeResult.Timeout),
SignersMask: uint64(storeResult.SignersMask),
Sig: respSig,
KeysetHash: common.BytesToHash(storeResult.KeysetHash),
Version: byte(storeResult.Version),
}, nil
}

func (c *DASRPCClient) sendChunk(ctx context.Context, batchId, i uint64, chunk []byte) error {
chunkReqSig, err := applyDasSigner(c.signer, chunk, batchId, i)
if err != nil {
return err
}

if err := c.clnt.CallContext(ctx, nil, "das_sendChunk", hexutil.Uint64(batchId), hexutil.Uint64(i), hexutil.Bytes(chunk), hexutil.Bytes(chunkReqSig)); err != nil {
return err
}
return nil
}

func (c *DASRPCClient) legacyStore(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
Expand Down
Loading

0 comments on commit 96885e0

Please sign in to comment.