From 4f1f17f61dda35f549c585a3a4f38e46d532ea82 Mon Sep 17 00:00:00 2001
From: Ferret-san <diego@celestia.org>
Date: Tue, 4 Jun 2024 21:06:29 +0000
Subject: [PATCH] add metrics to `das/celestia`

---
 das/celestia/celestia.go | 60 ++++++++++++++++++++++++++++++++++------
 1 file changed, 51 insertions(+), 9 deletions(-)

diff --git a/das/celestia/celestia.go b/das/celestia/celestia.go
index c6f95011ad..f6b3a90222 100644
--- a/das/celestia/celestia.go
+++ b/das/celestia/celestia.go
@@ -20,6 +20,7 @@ import (
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/ethclient"
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/offchainlabs/nitro/das/celestia/types"
 	"github.com/offchainlabs/nitro/solgen/go/celestiagen"
 
@@ -44,6 +45,19 @@ type ValidatorConfig struct {
 	BlobstreamAddr string `koanf:"blobstream"`
 }
 
+var (
+	celestiaDALastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/action/last_success", nil)
+	celestiaLastNonDefaultGasprice     = metrics.NewRegisteredGaugeFloat64("celestia/last_gas_price", nil)
+	celestiaSuccessCounter             = metrics.NewRegisteredCounter("celestia/action/celestia_success", nil)
+	celestiaFailureCounter             = metrics.NewRegisteredCounter("celestia/action/celestia_failure", nil)
+	celestiaGasRetries                 = metrics.NewRegisteredCounter("celestia/action/gas_retries", nil)
+	celestiaBlobInclusionRetries       = metrics.NewRegisteredCounter("celestia/action/inclusion_retries", nil)
+
+	celestiaValidationLastSuccesfulActionGauge = metrics.NewRegisteredGauge("celestia/validation/last_success", nil)
+	celestiaValidationSuccessCounter           = metrics.NewRegisteredCounter("celestia/validation/blobstream_success", nil)
+	celestiaValidationFailureCounter           = metrics.NewRegisteredCounter("celestia/validation/blobstream_failure", nil)
+)
+
 var (
 	// ErrTxTimedout is the error message returned by the DA when mempool is congested
 	ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block")
@@ -159,6 +173,7 @@ func NewCelestiaDA(cfg *DAConfig, ethClient *ethclient.Client) (*CelestiaDA, err
 func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error) {
 	if c.Cfg.NoopWriter {
 		log.Warn("NoopWriter enabled, falling back", "c.Cfg.NoopWriter", c.Cfg.NoopWriter)
+		celestiaFailureCounter.Inc(1)
 		return nil, errors.New("NoopWriter enabled")
 	}
 	// set a 5 minute timeout context on submissions
@@ -168,6 +183,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 	defer cancel()
 	dataBlob, err := blob.NewBlobV0(*c.Namespace, message)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("Error creating blob", "err", err)
 		return nil, err
 	}
@@ -187,23 +203,30 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 				} else {
 					gasPrice = gasPrice * c.Cfg.GasMultiplier
 				}
+
+				celestiaGasRetries.Inc(1)
 				continue
 			default:
+				celestiaFailureCounter.Inc(1)
 				log.Warn("Blob Submission error", "err", err)
 				return nil, err
 			}
 		}
 
 		if height == 0 {
+			celestiaFailureCounter.Inc(1)
 			log.Warn("Unexpected height from blob response", "height", height)
 			return nil, errors.New("unexpected response code")
 		}
 
 		submitted = true
+
+		celestiaLastNonDefaultGasprice.Update(gasPrice)
 	}
 
 	proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("Error retrieving proof", "err", err)
 		return nil, err
 	}
@@ -214,14 +237,17 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 		time.Sleep(time.Millisecond * 100)
 		proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
 		if err != nil {
+			celestiaFailureCounter.Inc(1)
 			log.Warn("Error retrieving proof", "err", err)
 			return nil, err
 		}
 		proofRetries++
+		celestiaBlobInclusionRetries.Inc(1)
 	}
 
 	included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
 	if err != nil || !included {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("Error checking for inclusion", "err", err, "proof", proofs)
 		return nil, err
 	}
@@ -230,16 +256,19 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 	// we fetch the blob so that we can get the correct start index in the square
 	dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		return nil, err
 	}
 
 	if dataBlob.Index() <= 0 {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("Unexpected index from blob response", "index", dataBlob.Index())
 		return nil, errors.New("unexpected response code")
 	}
 
 	header, err := c.Client.Header.GetByHeight(ctx, height)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("Header retrieval error", "err", err)
 		return nil, err
 	}
@@ -263,6 +292,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 	// startRow
 	startRow := blobIndex / squareSize
 	if odsSize*startRow > blobIndex {
+		celestiaFailureCounter.Inc(1)
 		// return an empty batch
 		return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index())
 	}
@@ -278,6 +308,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 
 	blobPointerData, err := blobPointer.MarshalBinary()
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("BlobPointer MashalBinary error", "err", err)
 		return nil, err
 	}
@@ -285,18 +316,23 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
 	buf := new(bytes.Buffer)
 	err = binary.Write(buf, binary.BigEndian, CelestiaMessageHeaderFlag)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("batch type byte serialization failed", "err", err)
 		return nil, err
 	}
 
 	err = binary.Write(buf, binary.BigEndian, blobPointerData)
 	if err != nil {
+		celestiaFailureCounter.Inc(1)
 		log.Warn("blob pointer data serialization failed", "err", err)
 		return nil, err
 	}
 
 	serializedBlobPointerData := buf.Bytes()
-	log.Trace("celestia.CelestiaDA.Store", "serialized_blob_pointer", serializedBlobPointerData)
+
+	celestiaSuccessCounter.Inc(1)
+	celestiaDALastSuccesfulActionGauge.Update(time.Now().Unix())
+
 	return serializedBlobPointerData, nil
 }
 
@@ -403,6 +439,7 @@ func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (
 
 func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 	if c.Prover == nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, fmt.Errorf("no celestia prover config found")
 	}
 
@@ -413,6 +450,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 	blobBytes := buf.Bytes()
 	err := blobPointer.UnmarshalBinary(blobBytes)
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		log.Error("Couldn't unmarshal Celestia blob pointer", "err", err)
 		return nil, nil
 	}
@@ -420,12 +458,14 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 	// Get data root from a celestia node
 	header, err := c.Client.Header.GetByHeight(ctx, blobPointer.BlockHeight)
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		log.Warn("Header retrieval error", "err", err)
 		return nil, err
 	}
 
 	latestBlockNumber, err := c.Prover.EthClient.BlockNumber(context.Background())
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, err
 	}
 
@@ -436,6 +476,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 		Context:     ctx,
 	})
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, err
 	}
 
@@ -453,12 +494,14 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 
 	event, err = c.filter(ctx, latestBlockNumber, blobPointer.BlockHeight, backwards)
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, err
 	}
 
 	// get the block data root inclusion proof to the data root tuple root
 	dataRootProof, err := c.Prover.Trpc.DataRootInclusionProof(ctx, blobPointer.BlockHeight, event.StartBlock, event.EndBlock)
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, err
 	}
 
@@ -486,6 +529,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 		proof,
 	)
 	if err != nil {
+		celestiaValidationFailureCounter.Inc(1)
 		return nil, err
 	}
 
@@ -494,6 +538,7 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 	if valid {
 		sharesProof, err := c.Prover.Trpc.ProveShares(ctx, blobPointer.BlockHeight, blobPointer.Start, blobPointer.Start+blobPointer.SharesLength)
 		if err != nil {
+			celestiaValidationFailureCounter.Inc(1)
 			log.Error("Unable to get ShareProof", "err", err)
 			return nil, err
 		}
@@ -504,31 +549,28 @@ func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {
 
 		celestiaVerifierAbi, err := celestiagen.CelestiaBatchVerifierMetaData.GetAbi()
 		if err != nil {
+			celestiaValidationFailureCounter.Inc(1)
 			log.Error("Could not get ABI for Celestia Batch Verifier", "err", err)
 			return nil, err
 		}
 
 		verifyProofABI := celestiaVerifierAbi.Methods["verifyProof"]
 
-		// need to encode function signature to this
 		proofData, err := verifyProofABI.Inputs.Pack(
 			common.HexToAddress(c.Cfg.ValidatorConfig.BlobstreamAddr), namespaceNode, rowProof, attestationProof,
 		)
 		if err != nil {
+			celestiaValidationFailureCounter.Inc(1)
 			log.Error("Could not pack structs into ABI", "err", err)
 			return nil, err
 		}
 
-		fmt.Printf("Proof Data: %v\n", proofData)
-
-		// // apend size of batch + proofData
-		// sizeBytes := make([]byte, 4)
-		// binary.BigEndian.PutUint32(sizeBytes, uint32((len(proofData)))+msgLength)
-		// proofData = append(proofData, sizeBytes...)
-
+		celestiaValidationSuccessCounter.Inc(1)
+		celestiaValidationLastSuccesfulActionGauge.Update(time.Now().Unix())
 		return proofData, nil
 	}
 
+	celestiaValidationFailureCounter.Inc(1)
 	return nil, err
 }