From 36741051fd45d0e58b936b96c03080f2967e01db Mon Sep 17 00:00:00 2001 From: Danyal Prout Date: Wed, 14 Feb 2024 16:57:22 -0600 Subject: [PATCH] Add godoc / notice about trusting beacon client --- README.md | 5 +++++ api/service/api.go | 2 ++ archiver/service/api.go | 1 + archiver/service/archiver.go | 17 +++++++++++++++++ common/beacon/client.go | 2 ++ common/storage/storage.go | 24 +++++++++++++++++++++++- 6 files changed, 50 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c65a36..74b2a0e 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ There are currently two supported storage options: You can control which storage backend is used by setting the `BLOB_API_DATA_STORE` and `BLOB_ARCHIVER_DATA_STORE` to either `disk` or `s3`. +### Data Validity +Currently, the archiver and api do not validate the beacon node's data. Therefore, it's important to either trust the +Beacon node, or validate the data in the client. There is an open [issue](https://github.com/base-org/blob-archiver/issues/4) +to add data validation to the archiver and api. + ### Development The `Makefile` contains a number of commands for development: diff --git a/api/service/api.go b/api/service/api.go index eddfc97..6b1ecd9 100644 --- a/api/service/api.go +++ b/api/service/api.go @@ -150,6 +150,8 @@ func (a *API) toBeaconBlockHash(id string) (common.Hash, *httpError) { } } +// blobSidecarHandler implements the /eth/v1/beacon/blob_sidecars/{id} endpoint, using the underlying DataStoreReader +// to fetch blobs instead of the beacon node. This allows clients to fetch expired blobs. func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) { param := chi.URLParam(r, "id") beaconBlockHash, err := a.toBeaconBlockHash(param) diff --git a/archiver/service/api.go b/archiver/service/api.go index aa059b8..59e64d9 100644 --- a/archiver/service/api.go +++ b/archiver/service/api.go @@ -21,6 +21,7 @@ type API struct { metrics m.Metricer } +// NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver. func NewAPI(metrics m.Metricer, logger log.Logger) *API { result := &API{ router: chi.NewRouter(), diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index 0f17b5d..6349584 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -52,6 +52,10 @@ type ArchiverService struct { api *API } +// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for +// them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head +// to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be +// filled in. func (a *ArchiverService) Start(ctx context.Context) error { if a.cfg.MetricsConfig.Enabled { a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort) @@ -82,6 +86,11 @@ func (a *ArchiverService) Start(ctx context.Context) error { return a.trackLatestBlocks(ctx) } +// persistBlobsForBlockToS3 fetches the blobs for a given block and persists them to S3. It returns the block header +// and a boolean indicating whether the blobs already existed in S3 and any errors that occur. +// If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not +// perform any validation of the blobs, it assumes a trusted beacon node. See: +// https://github.com/base-org/blob-archiver/issues/4. func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string) (*v1.BeaconBlockHeader, bool, error) { currentHeader, err := a.beaconClient.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{ Block: blockIdentifier, @@ -121,6 +130,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde BlobSidecars: storage.BlobSidecars{Data: blobSidecars.Data}, } + // The blob that is being written has not been validated. It is assumed that the beacon node is trusted. err = a.dataStoreClient.Write(ctx, blobData) if err != nil { @@ -133,6 +143,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde return currentHeader.Data, false, nil } +// Stops the archiver service. func (a *ArchiverService) Stop(ctx context.Context) error { if a.stopped.Load() { return ErrAlreadyStopped @@ -155,6 +166,9 @@ func (a *ArchiverService) Stopped() bool { return a.stopped.Load() } +// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted +// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled. +// If an error is encountered persisting a block, it will retry after waiting for a period of time. func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) { current, alreadyExists, err := latest, false, error(nil) @@ -182,6 +196,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String()) } +// trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them. func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { t := time.NewTicker(a.cfg.PollInterval) defer t.Stop() @@ -198,6 +213,8 @@ func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error { } } +// processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before. +// In the case of a reorg, it will fetch the new head and then walk back the chain until it finds a known block. func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { a.log.Debug("refreshing live data") diff --git a/common/beacon/client.go b/common/beacon/client.go index 0715ca7..1bd4937 100644 --- a/common/beacon/client.go +++ b/common/beacon/client.go @@ -8,11 +8,13 @@ import ( "github.com/base-org/blob-archiver/common/flags" ) +// Client is an interface that wraps the go-eth-2 interfaces that the blob archiver and api require. type Client interface { client.BeaconBlockHeadersProvider client.BlobSidecarsProvider } +// NewBeaconClient returns a new HTTP beacon client. func NewBeaconClient(ctx context.Context, cfg flags.BeaconConfig) (Client, error) { cctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/common/storage/storage.go b/common/storage/storage.go index 6db3f0a..e889a18 100644 --- a/common/storage/storage.go +++ b/common/storage/storage.go @@ -15,8 +15,11 @@ const ( ) var ( + // ErrNotFound is returned when a blob is not found in the storage. ErrNotFound = errors.New("blob not found") - ErrStorage = errors.New("error accessing storage") + // ErrStorage is returned when there is an error accessing the storage. + ErrStorage = errors.New("error accessing storage") + // ErrEncoding is returned when there is an error in blob encoding or decoding. ErrEncoding = errors.New("error encoding/decoding blob") ) @@ -28,6 +31,8 @@ type BlobSidecars struct { Data []*deneb.BlobSidecar `json:"data"` } +// MarshalSSZ marshals the blob sidecars into SSZ. As the blob sidecars are a single list of fixed size elements, we can +// simply concatenate the marshaled sidecars together. func (b *BlobSidecars) MarshalSSZ() ([]byte, error) { result := make([]byte, b.SizeSSZ()) @@ -55,15 +60,32 @@ type BlobData struct { BlobSidecars BlobSidecars `json:"blob_sidecars"` } +// DataStoreReader is the interface for reading from a data store. type DataStoreReader interface { + // Exists returns true if the given blob hash exists in the data store, false otherwise. + // It should return one of the following: + // - nil: the existence check was successful. In this case the boolean should also be set correctly. + // - ErrStorage: there was an error accessing the data store. Exists(ctx context.Context, hash common.Hash) (bool, error) + // Read reads the blob data for the given beacon block hash from the data store. + // It should return one of the following: + // - nil: reading the blob was successful. The blob data is also returned. + // - ErrNotFound: the blob data was not found in the data store. + // - ErrStorage: there was an error accessing the data store. + // - ErrEncoding: there was an error decoding the blob data. Read(ctx context.Context, hash common.Hash) (BlobData, error) } +// DataStoreWriter is the interface for writing to a data store. type DataStoreWriter interface { + // Write writes the given blob data to the data store. It should return one of the following errors: + // - nil: writing the blob was successful. + // - ErrStorage: there was an error accessing the data store. + // - ErrEncoding: there was an error encoding the blob data. Write(ctx context.Context, data BlobData) error } +// DataStore is the interface for a data store that can be both written to and read from. type DataStore interface { DataStoreReader DataStoreWriter