From 03d1422b6fe44b3dc2d0b0351abf0d9fc234c5c6 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Thu, 9 Jan 2025 13:30:26 +0200 Subject: [PATCH 1/7] various TODOs --- da/avail/avail.go | 35 ++++++++++++++++++----------------- da/celestia/celestia.go | 4 ---- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 81c30b48b..a357a54a2 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -116,6 +116,8 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S // Set defaults c.pubsubServer = pubsubServer + + // TODO: Make configurable c.txInclusionTimeout = defaultTxInculsionTimeout c.batchRetryDelay = defaultBatchRetryDelay c.batchRetryAttempts = defaultBatchRetryAttempts @@ -125,6 +127,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S apply(c) } + types.RollappConsecutiveFailedDASubmission.Set(0) + + return nil +} + +// Start starts DataAvailabilityLayerClient instance. +func (c *DataAvailabilityLayerClient) Start() error { + c.ctx, c.cancel = context.WithCancel(context.Background()) // If client wasn't set, create a new one if c.client == nil { substrateApiClient, err := gsrpc.NewSubstrateAPI(c.config.ApiURL) @@ -138,14 +148,7 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S } } - types.RollappConsecutiveFailedDASubmission.Set(0) - - c.ctx, c.cancel = context.WithCancel(context.Background()) - return nil -} - -// Start starts DataAvailabilityLayerClient instance. -func (c *DataAvailabilityLayerClient) Start() error { + // TODO: should actually check for synced client c.synced <- struct{}{} return nil } @@ -169,6 +172,8 @@ func (c *DataAvailabilityLayerClient) GetClientType() da.Client { // RetrieveBatches retrieves batch from DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch { + // TODO: add retries (better to refactor and add retries by the caller) + //nolint:typecheck blockHash, err := c.client.GetBlockHash(daMetaData.Height) if err != nil { @@ -199,6 +204,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet ext.Method.CallIndex.MethodIndex == DataCallMethodIndex { data := ext.Method.Args + // FIXME: potential deadlock on parsing error for 0 < len(data) { var pbBatch pb.Batch err := proto.Unmarshal(data, &pbBatch) @@ -222,6 +228,8 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet } } + // TODO: if no batches, return error + return da.ResultRetrieveBatch{ BaseResult: da.BaseResult{ Code: da.StatusSuccess, @@ -256,13 +264,7 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result for { select { case <-c.ctx.Done(): - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{ - Code: da.StatusError, - Message: "context done", - Error: c.ctx.Err(), - }, - } + return da.ResultSubmitBatch{} default: var daBlockHeight uint64 err := retry.Do( @@ -378,11 +380,10 @@ func (c *DataAvailabilityLayerClient) broadcastTx(tx []byte) (uint64, error) { if err != nil { return 0, fmt.Errorf("%w: %s", da.ErrTxBroadcastNetworkError, err) } + defer sub.Unsubscribe() c.logger.Info("Submitted batch to avail. Waiting for inclusion event") - defer sub.Unsubscribe() - inclusionTimer := time.NewTimer(c.txInclusionTimeout) defer inclusionTimer.Stop() diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 80cd32f85..042301691 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -153,10 +153,6 @@ func (c *DataAvailabilityLayerClient) Start() (err error) { // Stop stops DataAvailabilityLayerClient. func (c *DataAvailabilityLayerClient) Stop() error { c.logger.Info("Stopping Celestia Data Availability Layer Client.") - err := c.pubsubServer.Stop() - if err != nil { - return err - } c.cancel() close(c.synced) return nil From d500437d480d81c6c376fb555238c0d537cdfc86 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Mon, 13 Jan 2025 15:15:06 +0200 Subject: [PATCH 2/7] minor --- da/avail/avail.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 15a719bd6..187858b8e 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -128,7 +128,7 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S apply(c) } - types.RollappConsecutiveFailedDASubmission.Set(0) + metrics.RollappConsecutiveFailedDASubmission.Set(0) return nil } From 8d4258f5d71bfa4e9e3570119c8d78ab5c2a6ff5 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Mon, 13 Jan 2025 15:17:03 +0200 Subject: [PATCH 3/7] minor fixes --- da/avail/avail.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 187858b8e..c93e05930 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -173,8 +173,6 @@ func (c *DataAvailabilityLayerClient) GetClientType() da.Client { // RetrieveBatches retrieves batch from DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch { - // TODO: add retries (better to refactor and add retries by the caller) - //nolint:typecheck blockHash, err := c.client.GetBlockHash(daMetaData.Height) if err != nil { @@ -182,7 +180,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet BaseResult: da.BaseResult{ Code: da.StatusError, Message: err.Error(), - Error: err, + Error: errors.Join(da.ErrRetrieval, err), }, } } @@ -192,7 +190,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet BaseResult: da.BaseResult{ Code: da.StatusError, Message: err.Error(), - Error: err, + Error: errors.Join(da.ErrRetrieval, err), }, } } @@ -205,31 +203,38 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet ext.Method.CallIndex.MethodIndex == DataCallMethodIndex { data := ext.Method.Args - // FIXME: potential deadlock on parsing error for 0 < len(data) { var pbBatch pb.Batch err := proto.Unmarshal(data, &pbBatch) if err != nil { c.logger.Error("unmarshal batch", "daHeight", daMetaData.Height, "error", err) - continue + break } // Convert the proto batch to a batch batch := &types.Batch{} err = batch.FromProto(&pbBatch) if err != nil { c.logger.Error("batch from proto", "daHeight", daMetaData.Height, "error", err) - continue + break } // Add the batch to the list batches = append(batches, batch) // Remove the bytes we just decoded. data = data[proto.Size(&pbBatch):] - } } } - // TODO: if no batches, return error + // if no batches, return error + if len(batches) == 0 { + return da.ResultRetrieveBatch{ + BaseResult: da.BaseResult{ + Code: da.StatusError, + Message: "Blob not found", + Error: da.ErrBlobNotFound, + }, + } + } return da.ResultRetrieveBatch{ BaseResult: da.BaseResult{ From e6a763df414feb7fcab182d84926b7e3c922780b Mon Sep 17 00:00:00 2001 From: PrathyushaLakkireddy Date: Thu, 16 Jan 2025 14:15:59 +0530 Subject: [PATCH 4/7] check sync status of avail --- da/avail/avail.go | 141 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 1 deletion(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index cf46faebf..310e3d0de 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -149,7 +149,24 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S // Start starts DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) Start() error { - c.synced <- struct{}{} + c.logger.Info("Starting Avail Data Availability Layer Client.") + c.ctx, c.cancel = context.WithCancel(context.Background()) + // If client wasn't set, create a new one + if c.client == nil { + substrateApiClient, err := gsrpc.NewSubstrateAPI(c.config.ApiURL) + if err != nil { + return err + } + c.client = SubstrateApi{ + Chain: substrateApiClient.RPC.Chain, + State: substrateApiClient.RPC.State, + Author: substrateApiClient.RPC.Author, + } + } + + // check for synced client + go c.sync() + return nil } @@ -451,3 +468,125 @@ func (d *DataAvailabilityLayerClient) GetMaxBlobSizeBytes() uint32 { func (c *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) { return da.Balance{}, nil } + +func (c *DataAvailabilityLayerClient) sync() error { + sync := func() error { + done := make(chan error, 1) + go func() { + // Continuously check sync status in a separate goroutine + done <- func() error { + for { + select { + case <-c.ctx.Done(): + return fmt.Errorf("context cancelled while checking sync status") + default: + // Get the latest finalized block height + finalizedHash, err := c.client.GetFinalizedHead() + if err != nil { + return fmt.Errorf("failed to get finalized head: %w", err) + } + + finalizedHeader, err := c.client.GetHeader(finalizedHash) + if err != nil { + return fmt.Errorf("failed to get finalized header: %w", err) + } + finalizedHeight := uint64(finalizedHeader.Number) + + // Get the current block height + currentBlock, err := c.client.GetBlockLatest() + if err != nil { + return fmt.Errorf("failed to get current block: %w", err) + } + currentHeight := uint64(currentBlock.Block.Header.Number) + + // Calculate blocks behind + var blocksBehind uint64 + if currentHeight >= finalizedHeight { + blocksBehind = currentHeight - finalizedHeight + } else { + blocksBehind = finalizedHeight - currentHeight + } + + defaultSyncThreshold := uint64(3) // TODO : Can change this defaultSyncThreshold + + // Check if within sync threshold + if blocksBehind <= defaultSyncThreshold && currentHeight > 0 { // Add check for currentHeight > 0 + c.logger.Info("Node is synced", + "current_height", currentHeight, + "finalized_height", finalizedHeight, + "blocks_behind", blocksBehind) + return nil + } + + c.logger.Debug("Node is not yet synced", + "current_height", currentHeight, + "finalized_height", finalizedHeight, + "blocks_behind", blocksBehind) + + return fmt.Errorf("node not synced: current=%d, finalized=%d, behind=%d", + currentHeight, finalizedHeight, blocksBehind) + } + } + }() + }() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case err := <-done: + return err + case <-ticker.C: + // Get sync status for logging + finalizedHash, err := c.client.GetFinalizedHead() + if err != nil { + c.logger.Error("Failed to get finalized head", "error", err) + continue + } + + finalizedHeader, err := c.client.GetHeader(finalizedHash) + if err != nil { + c.logger.Error("Failed to get finalized header", "error", err) + continue + } + + latestHash, err := c.client.GetBlockHash(0) + if err != nil { + c.logger.Error("Failed to get latest block hash", "error", err) + continue + } + + currentBlock, err := c.client.GetBlock(latestHash) + if err != nil { + c.logger.Error("Failed to get current block", "error", err) + continue + } + + c.logger.Info("Avail-node syncing", + "current_height", currentBlock.Block.Header.Number, + "finalized_height", finalizedHeader.Number) + } + } + } + + // Start sync with retry mechanism + err := retry.Do(sync, + retry.Attempts(0), // try forever + retry.Delay(10*time.Second), + retry.LastErrorOnly(true), + retry.DelayType(retry.FixedDelay), + retry.OnRetry(func(n uint, err error) { + c.logger.Error("Failed to sync Avail DA", "attempt", n, "error", err) + }), + ) + + c.logger.Info("Avail-node is synced.") + c.synced <- struct{}{} + + if err != nil { + c.logger.Error("Waiting for Avail data availability client to sync", "err", err) + } + + return err +} From d7dd5559e2b6293b8c4331c96caa57d28ae2b92e Mon Sep 17 00:00:00 2001 From: PrathyushaLakkireddy Date: Thu, 16 Jan 2025 14:51:02 +0530 Subject: [PATCH 5/7] get latest block --- da/avail/avail.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 310e3d0de..0abae4ba7 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -551,13 +551,7 @@ func (c *DataAvailabilityLayerClient) sync() error { continue } - latestHash, err := c.client.GetBlockHash(0) - if err != nil { - c.logger.Error("Failed to get latest block hash", "error", err) - continue - } - - currentBlock, err := c.client.GetBlock(latestHash) + currentBlock, err := c.client.GetBlockLatest() if err != nil { c.logger.Error("Failed to get current block", "error", err) continue From 8d524960ee1d175d2cd4c39d508cd65f50a8935c Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Sun, 19 Jan 2025 10:41:35 +0200 Subject: [PATCH 6/7] UT fix --- da/avail/avail.go | 2 +- da/avail/avail_test.go | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 5a3ab0a93..21b151e85 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -468,7 +468,7 @@ func (c *DataAvailabilityLayerClient) sync() error { for { select { case <-c.ctx.Done(): - return fmt.Errorf("context cancelled while checking sync status") + return nil default: // Get the latest finalized block height finalizedHash, err := c.client.GetFinalizedHead() diff --git a/da/avail/avail_test.go b/da/avail/avail_test.go index 26097a853..8a9bbaf42 100644 --- a/da/avail/avail_test.go +++ b/da/avail/avail_test.go @@ -43,14 +43,20 @@ func TestRetrieveBatches(t *testing.T) { pubsubServer := pubsub.NewServer() err = pubsubServer.Start() assert.NoError(err) + + // set mocks for sync flow + // Set the mock functions + mockSubstrateApiClient.On("GetFinalizedHead", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil) + mockSubstrateApiClient.On("GetHeader", mock.Anything).Return(&availtypes.Header{Number: 1}, nil) + mockSubstrateApiClient.On("GetBlockLatest", mock.Anything).Return(&availtypes.SignedBlock{Block: availtypes.Block{Header: availtypes.Header{Number: 1}}}, nil) + // Start the DALC dalc := avail.DataAvailabilityLayerClient{} err = dalc.Init(configBytes, pubsubServer, nil, testutil.NewLogger(t), options...) require.NoError(err) err = dalc.Start() require.NoError(err) - // Set the mock functions - mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil) + // Build batches for the block extrinsics batch1 := testutil.MustGenerateBatchAndKey(0, 1) batch2 := testutil.MustGenerateBatchAndKey(2, 3) @@ -86,7 +92,11 @@ func TestRetrieveBatches(t *testing.T) { }, }, } + + // Set the mock functions + mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil) mockSubstrateApiClient.On("GetBlock", mock.Anything).Return(signedBlock, nil) + // Retrieve the batches and make sure we only get the batches relevant for our app id daMetaData := &da.DASubmitMetaData{ Height: 1, From 963b11d189a3209b2d5e69b5420827f6f1ec70cf Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Sun, 19 Jan 2025 11:26:50 +0200 Subject: [PATCH 7/7] simplified sync code --- da/avail/avail.go | 146 ++++++++++++++++------------------------------ 1 file changed, 49 insertions(+), 97 deletions(-) diff --git a/da/avail/avail.go b/da/avail/avail.go index 21b151e85..e6a277e3b 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "time" "github.com/avast/retry-go/v4" @@ -152,7 +153,7 @@ func (c *DataAvailabilityLayerClient) Start() error { } // check for synced client - go c.sync() //nolint:errcheck + go c.sync() return nil } @@ -459,118 +460,69 @@ func (c *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) { return da.Balance{}, nil } -func (c *DataAvailabilityLayerClient) sync() error { - sync := func() error { - done := make(chan error, 1) - go func() { - // Continuously check sync status in a separate goroutine - done <- func() error { - for { - select { - case <-c.ctx.Done(): - return nil - default: - // Get the latest finalized block height - finalizedHash, err := c.client.GetFinalizedHead() - if err != nil { - return fmt.Errorf("failed to get finalized head: %w", err) - } - - finalizedHeader, err := c.client.GetHeader(finalizedHash) - if err != nil { - return fmt.Errorf("failed to get finalized header: %w", err) - } - finalizedHeight := uint64(finalizedHeader.Number) - - // Get the current block height - currentBlock, err := c.client.GetBlockLatest() - if err != nil { - return fmt.Errorf("failed to get current block: %w", err) - } - currentHeight := uint64(currentBlock.Block.Header.Number) - - // Calculate blocks behind - var blocksBehind uint64 - if currentHeight >= finalizedHeight { - blocksBehind = currentHeight - finalizedHeight - } else { - blocksBehind = finalizedHeight - currentHeight - } +func (c *DataAvailabilityLayerClient) sync() { + // wrapper to get finalized height and current height from the client + getHeights := func() (uint64, uint64, error) { + finalizedHash, err := c.client.GetFinalizedHead() + if err != nil { + return 0, 0, fmt.Errorf("failed to get finalized head: %w", err) + } - defaultSyncThreshold := uint64(3) // TODO : Can change this defaultSyncThreshold + finalizedHeader, err := c.client.GetHeader(finalizedHash) + if err != nil { + return 0, 0, fmt.Errorf("failed to get finalized header: %w", err) + } + finalizedHeight := uint64(finalizedHeader.Number) - // Check if within sync threshold - if blocksBehind <= defaultSyncThreshold && currentHeight > 0 { // Add check for currentHeight > 0 - c.logger.Info("Node is synced", - "current_height", currentHeight, - "finalized_height", finalizedHeight, - "blocks_behind", blocksBehind) - return nil - } + currentBlock, err := c.client.GetBlockLatest() + if err != nil { + return 0, 0, fmt.Errorf("failed to get current block: %w", err) + } + currentHeight := uint64(currentBlock.Block.Header.Number) - c.logger.Debug("Node is not yet synced", - "current_height", currentHeight, - "finalized_height", finalizedHeight, - "blocks_behind", blocksBehind) + return finalizedHeight, currentHeight, nil + } - return fmt.Errorf("node not synced: current=%d, finalized=%d, behind=%d", - currentHeight, finalizedHeight, blocksBehind) - } - } - }() - }() - - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case err := <-done: - return err - case <-ticker.C: - // Get sync status for logging - finalizedHash, err := c.client.GetFinalizedHead() - if err != nil { - c.logger.Error("Failed to get finalized head", "error", err) - continue - } + checkSync := func() error { + finalizedHeight, currentHeight, err := getHeights() + if err != nil { + return err + } - finalizedHeader, err := c.client.GetHeader(finalizedHash) - if err != nil { - c.logger.Error("Failed to get finalized header", "error", err) - continue - } + // Calculate blocks behind + blocksBehind := uint64(math.Abs(float64(currentHeight - finalizedHeight))) + defaultSyncThreshold := uint64(3) + + // Check if within sync threshold + if blocksBehind <= defaultSyncThreshold && currentHeight > 0 { + c.logger.Info("Node is synced", + "current_height", currentHeight, + "finalized_height", finalizedHeight, + "blocks_behind", blocksBehind) + return nil + } - currentBlock, err := c.client.GetBlockLatest() - if err != nil { - c.logger.Error("Failed to get current block", "error", err) - continue - } + c.logger.Debug("Node is not yet synced", + "current_height", currentHeight, + "finalized_height", finalizedHeight, + "blocks_behind", blocksBehind) - c.logger.Info("Avail-node syncing", - "current_height", currentBlock.Block.Header.Number, - "finalized_height", finalizedHeader.Number) - } - } + return fmt.Errorf("node not synced: current=%d, finalized=%d, behind=%d", + currentHeight, finalizedHeight, blocksBehind) } // Start sync with retry mechanism - err := retry.Do(sync, + err := retry.Do(checkSync, retry.Attempts(0), // try forever - retry.Delay(10*time.Second), + retry.Context(c.ctx), + retry.Delay(5*time.Second), // TODO: make configurable retry.LastErrorOnly(true), retry.DelayType(retry.FixedDelay), retry.OnRetry(func(n uint, err error) { - c.logger.Error("Failed to sync Avail DA", "attempt", n, "error", err) + c.logger.Error("sync Avail DA", "attempt", n, "error", err) }), ) - c.logger.Info("Avail-node is synced.") + c.logger.Info("Avail-node sync completed.", "err", err) c.synced <- struct{}{} - - if err != nil { - c.logger.Error("Waiting for Avail data availability client to sync", "err", err) - } - - return err }