Skip to content

Commit

Permalink
Add godoc / notice about trusting beacon client
Browse files Browse the repository at this point in the history
  • Loading branch information
danyalprout committed Feb 14, 2024
1 parent 33a25f8 commit 7927f72
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 1 deletion.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ There are currently two supported storage options:
You can control which storage backend is used by setting the `BLOB_API_DATA_STORE` and `BLOB_ARCHIVER_DATA_STORE` to
either `disk` or `s3`.

### Data Validity
Currently, the archiver and api do not validate the beacon node's data. Therefore, it's important to either trust the
Beacon node, or validate the data in the client. There is an open [issue](https://github.com/base-org/blob-archiver/issues/4)
to add data validation to the archiver and api.

### Development
The `Makefile` contains a number of commands for development:

Expand Down
2 changes: 2 additions & 0 deletions api/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (a *API) toBeaconBlockHash(id string) (common.Hash, *httpError) {
}
}

// blobSidecarHandler implements the /eth/v1/beacon/blob_sidecars/{id} endpoint, using the underlying DataStoreReader
// to fetch blobs instead of the beacon node. This allows clients to fetch expired blobs.
func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) {
param := chi.URLParam(r, "id")
beaconBlockHash, err := a.toBeaconBlockHash(param)
Expand Down
1 change: 1 addition & 0 deletions archiver/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type API struct {
metrics m.Metricer
}

// NewAPI creates a new Archiver API instance. This API exposes an admin interface to control the archiver.
func NewAPI(metrics m.Metricer, logger log.Logger) *API {
result := &API{
router: chi.NewRouter(),
Expand Down
18 changes: 18 additions & 0 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type ArchiverService struct {
api *API
}

// Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for
// them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head
// to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be
// filled in.
func (a *ArchiverService) Start(ctx context.Context) error {
if a.cfg.MetricsConfig.Enabled {
a.log.Info("starting metrics server", "addr", a.cfg.MetricsConfig.ListenAddr, "port", a.cfg.MetricsConfig.ListenPort)
Expand Down Expand Up @@ -82,6 +86,11 @@ func (a *ArchiverService) Start(ctx context.Context) error {
return a.trackLatestBlocks(ctx)
}

// persistBlobsForBlockToS3 fetches the blobs for a given block and persists them to S3. It returns the block header
// and a boolean indicating whether the blobs already existed in S3 and any errors that occur.
// If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not
// perform any validation of the blobs, it assumes a trusted beacon node. See:
// https://github.com/base-org/blob-archiver/issues/4.
func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier string) (*v1.BeaconBlockHeader, bool, error) {
currentHeader, err := a.beaconClient.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{
Block: blockIdentifier,
Expand Down Expand Up @@ -121,6 +130,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde
BlobSidecars: storage.BlobSidecars{Data: blobSidecars.Data},
}

// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
err = a.dataStoreClient.Write(ctx, blobData)

if err != nil {
Expand All @@ -133,6 +143,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde
return currentHeader.Data, false, nil
}

// Stops the archiver service.
func (a *ArchiverService) Stop(ctx context.Context) error {
if a.stopped.Load() {
return ErrAlreadyStopped
Expand All @@ -155,6 +166,9 @@ func (a *ArchiverService) Stopped() bool {
return a.stopped.Load()
}

// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
current, alreadyExists, err := latest, false, error(nil)

Expand Down Expand Up @@ -182,6 +196,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl
a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String())
}

// trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them.
func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error {
t := time.NewTicker(a.cfg.PollInterval)
defer t.Stop()
Expand All @@ -198,6 +213,9 @@ func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error {
}
}

// processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before.
// In the case of a reorg, it will fetch the new head and then walk back the chain, storing all blobs until it finds a
// known block -- that already exists in the archivers' storage.
func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) {
a.log.Debug("refreshing live data")

Expand Down
2 changes: 2 additions & 0 deletions common/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"github.com/base-org/blob-archiver/common/flags"
)

// Client is an interface that wraps the go-eth-2 interfaces that the blob archiver and api require.
type Client interface {
client.BeaconBlockHeadersProvider
client.BlobSidecarsProvider
}

// NewBeaconClient returns a new HTTP beacon client.
func NewBeaconClient(ctx context.Context, cfg flags.BeaconConfig) (Client, error) {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
24 changes: 23 additions & 1 deletion common/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ const (
)

var (
// ErrNotFound is returned when a blob is not found in the storage.
ErrNotFound = errors.New("blob not found")
ErrStorage = errors.New("error accessing storage")
// ErrStorage is returned when there is an error accessing the storage.
ErrStorage = errors.New("error accessing storage")
// ErrEncoding is returned when there is an error in blob encoding or decoding.
ErrEncoding = errors.New("error encoding/decoding blob")
)

Expand All @@ -28,6 +31,8 @@ type BlobSidecars struct {
Data []*deneb.BlobSidecar `json:"data"`
}

// MarshalSSZ marshals the blob sidecars into SSZ. As the blob sidecars are a single list of fixed size elements, we can
// simply concatenate the marshaled sidecars together.
func (b *BlobSidecars) MarshalSSZ() ([]byte, error) {
result := make([]byte, b.SizeSSZ())

Expand Down Expand Up @@ -55,15 +60,32 @@ type BlobData struct {
BlobSidecars BlobSidecars `json:"blob_sidecars"`
}

// DataStoreReader is the interface for reading from a data store.
type DataStoreReader interface {
// Exists returns true if the given blob hash exists in the data store, false otherwise.
// It should return one of the following:
// - nil: the existence check was successful. In this case the boolean should also be set correctly.
// - ErrStorage: there was an error accessing the data store.
Exists(ctx context.Context, hash common.Hash) (bool, error)
// Read reads the blob data for the given beacon block hash from the data store.
// It should return one of the following:
// - nil: reading the blob was successful. The blob data is also returned.
// - ErrNotFound: the blob data was not found in the data store.
// - ErrStorage: there was an error accessing the data store.
// - ErrEncoding: there was an error decoding the blob data.
Read(ctx context.Context, hash common.Hash) (BlobData, error)
}

// DataStoreWriter is the interface for writing to a data store.
type DataStoreWriter interface {
// Write writes the given blob data to the data store. It should return one of the following errors:
// - nil: writing the blob was successful.
// - ErrStorage: there was an error accessing the data store.
// - ErrEncoding: there was an error encoding the blob data.
Write(ctx context.Context, data BlobData) error
}

// DataStore is the interface for a data store that can be both written to and read from.
type DataStore interface {
DataStoreReader
DataStoreWriter
Expand Down

0 comments on commit 7927f72

Please sign in to comment.