diff --git a/config/chainstorage/aptos/mainnet/base.yml b/config/chainstorage/aptos/mainnet/base.yml index 0db58d39..bb0d90e2 100644 --- a/config/chainstorage/aptos/mainnet/base.yml +++ b/config/chainstorage/aptos/mainnet/base.yml @@ -166,10 +166,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/arbitrum/mainnet/base.yml b/config/chainstorage/arbitrum/mainnet/base.yml index b26cb88b..10a3021d 100644 --- a/config/chainstorage/arbitrum/mainnet/base.yml +++ b/config/chainstorage/arbitrum/mainnet/base.yml @@ -170,10 +170,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/avacchain/mainnet/base.yml b/config/chainstorage/avacchain/mainnet/base.yml index f588aae7..fe73ed0d 100644 --- a/config/chainstorage/avacchain/mainnet/base.yml +++ b/config/chainstorage/avacchain/mainnet/base.yml @@ -167,10 +167,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/base/goerli/base.yml b/config/chainstorage/base/goerli/base.yml index fd66e718..6f90f266 100644 --- a/config/chainstorage/base/goerli/base.yml +++ b/config/chainstorage/base/goerli/base.yml @@ -168,10 +168,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/base/mainnet/base.yml b/config/chainstorage/base/mainnet/base.yml index f6b0b72e..12cb9837 100644 --- a/config/chainstorage/base/mainnet/base.yml +++ b/config/chainstorage/base/mainnet/base.yml @@ -173,10 +173,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/bitcoin/mainnet/base.yml b/config/chainstorage/bitcoin/mainnet/base.yml index f77ebaeb..f4196147 100644 --- a/config/chainstorage/bitcoin/mainnet/base.yml +++ b/config/chainstorage/bitcoin/mainnet/base.yml @@ -169,10 +169,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/bsc/mainnet/base.yml b/config/chainstorage/bsc/mainnet/base.yml index 2d1f58b2..37489b73 100644 --- a/config/chainstorage/bsc/mainnet/base.yml +++ b/config/chainstorage/bsc/mainnet/base.yml @@ -169,10 +169,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/dogecoin/mainnet/base.yml b/config/chainstorage/dogecoin/mainnet/base.yml index 9f02aa4e..f22b94c5 100644 --- a/config/chainstorage/dogecoin/mainnet/base.yml +++ b/config/chainstorage/dogecoin/mainnet/base.yml @@ -173,10 +173,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/ethereum/goerli/base.yml b/config/chainstorage/ethereum/goerli/base.yml index 350b1e8f..fb2309e9 100644 --- a/config/chainstorage/ethereum/goerli/base.yml +++ b/config/chainstorage/ethereum/goerli/base.yml @@ -172,10 +172,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/ethereum/holesky/base.yml b/config/chainstorage/ethereum/holesky/base.yml index a2094503..f066fe20 100644 --- a/config/chainstorage/ethereum/holesky/base.yml +++ b/config/chainstorage/ethereum/holesky/base.yml @@ -166,10 +166,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/ethereum/mainnet/base.yml b/config/chainstorage/ethereum/mainnet/base.yml index a9a8a8ba..cd3e855d 100644 --- a/config/chainstorage/ethereum/mainnet/base.yml +++ b/config/chainstorage/ethereum/mainnet/base.yml @@ -176,10 +176,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/fantom/mainnet/base.yml b/config/chainstorage/fantom/mainnet/base.yml index 7cc1352a..a79e028d 100644 --- a/config/chainstorage/fantom/mainnet/base.yml +++ b/config/chainstorage/fantom/mainnet/base.yml @@ -166,10 +166,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/optimism/mainnet/base.yml b/config/chainstorage/optimism/mainnet/base.yml index 66a9ec71..16259280 100644 --- a/config/chainstorage/optimism/mainnet/base.yml +++ b/config/chainstorage/optimism/mainnet/base.yml @@ -166,10 +166,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/polygon/mainnet/base.yml b/config/chainstorage/polygon/mainnet/base.yml index 4162c613..7a8337ba 100644 --- a/config/chainstorage/polygon/mainnet/base.yml +++ b/config/chainstorage/polygon/mainnet/base.yml @@ -180,10 +180,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/polygon/testnet/base.yml b/config/chainstorage/polygon/testnet/base.yml index 51be47a3..9196d641 100644 --- a/config/chainstorage/polygon/testnet/base.yml +++ b/config/chainstorage/polygon/testnet/base.yml @@ -171,10 +171,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config/chainstorage/solana/mainnet/base.yml b/config/chainstorage/solana/mainnet/base.yml index 1b908d21..0ceeef8a 100644 --- a/config/chainstorage/solana/mainnet/base.yml +++ b/config/chainstorage/solana/mainnet/base.yml @@ -170,10 +170,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.poller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + checkpoint_size: 10000 + mini_batch_size: 100 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/config_templates/config/base.template.yml b/config_templates/config/base.template.yml index 6f36dcff..71e68b76 100644 --- a/config_templates/config/base.template.yml +++ b/config_templates/config/base.template.yml @@ -167,10 +167,12 @@ workflows: workflow_execution_timeout: 24h workflow_identity: workflow.event_backfiller replicator: - activity_retry_maximum_attempts: 3 + activity_retry_maximum_attempts: 5 activity_schedule_to_start_timeout: 5m activity_start_to_close_timeout: 10m batch_size: 1000 + mini_batch_size: 100 + checkpoint_size: 10000 parallelism: 10 task_list: default workflow_decision_timeout: 2m diff --git a/internal/config/config.go b/internal/config/config.go index e78f1191..169e9a9d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -210,6 +210,8 @@ type ( ReplicatorWorkflowConfig struct { WorkflowConfig `mapstructure:",squash"` BatchSize uint64 `mapstructure:"batch_size" validate:"required"` + MiniBatchSize uint64 `mapstructure:"mini_batch_size" validate:"required"` + CheckpointSize uint64 `mapstructure:"checkpoint_size" validate:"required,gtfield=BatchSize"` Parallelism int `mapstructure:"parallelism" validate:"required"` } diff --git a/internal/workflow/activity/activity.go b/internal/workflow/activity/activity.go index af21d8b4..62545d39 100644 --- a/internal/workflow/activity/activity.go +++ b/internal/workflow/activity/activity.go @@ -27,6 +27,7 @@ const ( ActivityEventReconciler = "activity.event_reconciler" ActivityEventLoader = "activity.event_loader" ActivityReplicator = "activity.replicator" + ActivityUpdateWatermark = "activity.update_watermark" loggerMsg = "activity.request" diff --git a/internal/workflow/activity/module.go b/internal/workflow/activity/module.go index 76bf7e14..c7391883 100644 --- a/internal/workflow/activity/module.go +++ b/internal/workflow/activity/module.go @@ -18,4 +18,5 @@ var Module = fx.Options( fx.Provide(NewEventReconciler), fx.Provide(NewEventLoader), fx.Provide(NewReplicator), + fx.Provide(NewUpdateWatermark), ) diff --git a/internal/workflow/activity/replicator.go b/internal/workflow/activity/replicator.go index ae5f60fc..37613c80 100644 --- a/internal/workflow/activity/replicator.go +++ b/internal/workflow/activity/replicator.go @@ -6,54 +6,58 @@ import ( "go.temporal.io/sdk/workflow" "go.uber.org/fx" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/coinbase/chainstorage/internal/cadence" + "github.com/coinbase/chainstorage/internal/gateway" "github.com/coinbase/chainstorage/internal/storage/blobstorage" + "github.com/coinbase/chainstorage/internal/storage/blobstorage/downloader" "github.com/coinbase/chainstorage/internal/storage/metastorage" "github.com/coinbase/chainstorage/internal/utils/fxparams" - "github.com/coinbase/chainstorage/internal/utils/syncgroup" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" - "github.com/coinbase/chainstorage/sdk" ) type ( Replicator struct { baseActivity - - client sdk.Client - metaStorage metastorage.MetaStorage - blobStorage blobstorage.BlobStorage + client gateway.Client + blockDownloader downloader.BlockDownloader + metaStorage metastorage.MetaStorage + blobStorage blobstorage.BlobStorage } ReplicatorParams struct { fx.In fxparams.Params - Runtime cadence.Runtime - Client sdk.Client - MetaStorage metastorage.MetaStorage - BlobStorage blobstorage.BlobStorage + Runtime cadence.Runtime + Client gateway.Client + BlockDownloader downloader.BlockDownloader + MetaStorage metastorage.MetaStorage + BlobStorage blobstorage.BlobStorage } ReplicatorRequest struct { - Tag uint32 - StartBlockHeight uint64 - EndBlockHeight uint64 - UpdateWatermark bool - Parallelism int - Compression api.Compression + Tag uint32 + StartHeight uint64 + EndHeight uint64 + Parallelism int + Compression api.Compression } ReplicatorResponse struct { + StartHeight uint64 + EndHeight uint64 } ) func NewReplicator(params ReplicatorParams) *Replicator { a := &Replicator{ - baseActivity: newBaseActivity(ActivityReplicator, params.Runtime), - client: params.Client, - metaStorage: params.MetaStorage, - blobStorage: params.BlobStorage, + baseActivity: newBaseActivity(ActivityReplicator, params.Runtime), + client: params.Client, + blockDownloader: params.BlockDownloader, + metaStorage: params.MetaStorage, + blobStorage: params.BlobStorage, } a.register(a.execute) return a @@ -71,31 +75,50 @@ func (a *Replicator) execute(ctx context.Context, request *ReplicatorRequest) (* } logger := a.getLogger(ctx).With(zap.Reflect("request", request)) logger.Info("Fetching block range", - zap.Uint64("startHeight", request.StartBlockHeight), - zap.Uint64("endHeight", request.EndBlockHeight)) - blocks, err := a.client.GetBlocksByRangeWithTag(ctx, request.Tag, request.StartBlockHeight, request.EndBlockHeight) + zap.Uint64("startHeight", request.StartHeight), + zap.Uint64("endHeight", request.EndHeight)) + blocks, err := a.client.GetBlockFilesByRange(ctx, &api.GetBlockFilesByRangeRequest{ + Tag: request.Tag, + StartHeight: request.StartHeight, + EndHeight: request.EndHeight, + }) if err != nil { return nil, err } - blockMetas := make([]*api.BlockMetadata, len(blocks)) - logger.Info("Uploading block data") - group, sgctx := syncgroup.New(ctx, syncgroup.WithThrottling(request.Parallelism)) - for i, block := range blocks { - blockMetas[i] = block.Metadata - block := block + blockMetas := make([]*api.BlockMetadata, len(blocks.Files)) + logger.Info("Replicating block data") + group, errgroupCtx := errgroup.WithContext(ctx) + group.SetLimit(request.Parallelism) + for i := range blocks.Files { + i := i group.Go(func() error { - _, err := a.blobStorage.Upload(sgctx, block, request.Compression) + blockFile := blocks.Files[i] + logger.Debug( + "downloading block", + zap.Uint32("tag", blockFile.Tag), + zap.Uint64("height", blockFile.Height), + zap.String("hash", blockFile.Hash), + ) + block, err := a.blockDownloader.Download(errgroupCtx, blockFile) + if err != nil { + return xerrors.Errorf("failed download block file from %s: %w", blockFile.GetFileUrl(), err) + } + _, err = a.blobStorage.Upload(errgroupCtx, block, request.Compression) + blockMetas[i] = block.Metadata return err }) } if err := group.Wait(); err != nil { - return nil, xerrors.Errorf("failed to upload blocks: %w", err) + return nil, xerrors.Errorf("failed to replicate block files: %w", err) } logger.Info("Persisting block metadata") - err = a.metaStorage.PersistBlockMetas(ctx, request.UpdateWatermark, blockMetas, nil) + err = a.metaStorage.PersistBlockMetas(ctx, false, blockMetas, nil) if err != nil { return nil, err } - return &ReplicatorResponse{}, nil + return &ReplicatorResponse{ + StartHeight: request.StartHeight, + EndHeight: request.EndHeight, + }, nil } diff --git a/internal/workflow/activity/update_watermark.go b/internal/workflow/activity/update_watermark.go new file mode 100644 index 00000000..7c3e9579 --- /dev/null +++ b/internal/workflow/activity/update_watermark.go @@ -0,0 +1,98 @@ +package activity + +import ( + "context" + + "go.temporal.io/sdk/workflow" + "go.uber.org/fx" + "go.uber.org/zap" + "golang.org/x/xerrors" + + "github.com/coinbase/chainstorage/internal/blockchain/parser" + "github.com/coinbase/chainstorage/internal/cadence" + "github.com/coinbase/chainstorage/internal/config" + "github.com/coinbase/chainstorage/internal/storage/metastorage" + "github.com/coinbase/chainstorage/internal/utils/fxparams" + api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" +) + +type ( + UpdateWatermark struct { + baseActivity + config *config.Config + metaStorage metastorage.MetaStorage + } + + UpdateWatermarkParams struct { + fx.In + fxparams.Params + Runtime cadence.Runtime + MetaStorage metastorage.MetaStorage + } + + UpdateWatermarkRequest struct { + Tag uint32 + BlockHeight uint64 + // validate the chain starting from this block (inclusive) + ValidateStart uint64 + } + + UpdateWatermarkResponse struct { + BlockHeight uint64 + } +) + +func NewUpdateWatermark(params UpdateWatermarkParams) *UpdateWatermark { + a := &UpdateWatermark{ + baseActivity: newBaseActivity(ActivityUpdateWatermark, params.Runtime), + config: params.Config, + metaStorage: params.MetaStorage, + } + a.register(a.execute) + return a +} + +func (a *UpdateWatermark) Execute(ctx workflow.Context, request *UpdateWatermarkRequest) (*UpdateWatermarkResponse, error) { + var response UpdateWatermarkResponse + err := a.executeActivity(ctx, request, &response) + return &response, err +} + +func (a *UpdateWatermark) execute(ctx context.Context, request *UpdateWatermarkRequest) (*UpdateWatermarkResponse, error) { + if err := a.validateRequest(request); err != nil { + return nil, err + } + logger := a.getLogger(ctx).With(zap.Reflect("request", request)) + tag := a.config.GetEffectiveBlockTag(request.Tag) + logger.Info("Updating watermark", + zap.Uint32("tag", tag), + zap.Uint64("validate_since", request.ValidateStart), + zap.Uint64("height", request.BlockHeight)) + + validateStart := request.BlockHeight - 1 + if request.ValidateStart > 0 { + if request.ValidateStart >= request.BlockHeight { + return nil, xerrors.Errorf("ValidateSince %d should be smaller than BlockHeight %d", + request.ValidateStart, request.BlockHeight) + } + validateStart = request.ValidateStart + } + if validateStart <= 0 { + validateStart = 1 + } + blocks, err := a.metaStorage.GetBlocksByHeightRange(ctx, tag, validateStart, request.BlockHeight+1) + if err != nil { + return nil, xerrors.Errorf("failed to get blocks by tag %d: %w", tag, err) + } + if len(blocks) > 1 { + if err := parser.ValidateChain(blocks[1:], blocks[0]); err != nil { + return nil, xerrors.Errorf("failed to validate chain: %w", err) + } + } + err = a.metaStorage.PersistBlockMetas(ctx, true, []*api.BlockMetadata{blocks[len(blocks)-1]}, nil) + if err != nil { + return nil, xerrors.Errorf("failed to set watermark: %w", err) + } + + return &UpdateWatermarkResponse{BlockHeight: request.BlockHeight}, nil +} diff --git a/internal/workflow/replicator.go b/internal/workflow/replicator.go index 47f5d3f5..d4f9ea9f 100644 --- a/internal/workflow/replicator.go +++ b/internal/workflow/replicator.go @@ -21,14 +21,16 @@ import ( type ( Replicator struct { baseWorkflow - replicator *activity.Replicator + replicator *activity.Replicator + updateWatermark *activity.UpdateWatermark } ReplicatorParams struct { fx.In fxparams.Params - Runtime cadence.Runtime - Replicator *activity.Replicator + Runtime cadence.Runtime + Replicator *activity.Replicator + UpdateWatermark *activity.UpdateWatermark } ReplicatorRequest struct { @@ -38,6 +40,8 @@ type ( UpdateWatermark bool DataCompression string // Optional. If not specified, it is read from the workflow config. BatchSize uint64 // Optional. If not specified, it is read from the workflow config. + MiniBatchSize uint64 // Optional. If not specified, it is read from the workflow config. + CheckpointSize uint64 // Optional. If not specified, it is read from the workflow config. Parallelism int // Optional. If not specified, it is read from the workflow config. } ) @@ -55,8 +59,9 @@ var ( func NewReplicator(params ReplicatorParams) *Replicator { w := &Replicator{ - baseWorkflow: newBaseWorkflow(¶ms.Config.Workflows.Replicator, params.Runtime), - replicator: params.Replicator, + baseWorkflow: newBaseWorkflow(¶ms.Config.Workflows.Replicator, params.Runtime), + replicator: params.Replicator, + updateWatermark: params.UpdateWatermark, } w.registerWorkflow(w.execute) return w @@ -82,6 +87,16 @@ func (w *Replicator) execute(ctx workflow.Context, request *ReplicatorRequest) e batchSize = request.BatchSize } + miniBatchSize := cfg.MiniBatchSize + if request.MiniBatchSize > 0 { + miniBatchSize = request.MiniBatchSize + } + + checkpointSize := cfg.CheckpointSize + if request.CheckpointSize > 0 { + checkpointSize = request.CheckpointSize + } + parallelism := cfg.Parallelism if request.Parallelism > 0 { parallelism = request.Parallelism @@ -106,26 +121,103 @@ func (w *Replicator) execute(ctx workflow.Context, request *ReplicatorRequest) e logger.Info("workflow started", zap.Uint64("batchSize", batchSize)) ctx = w.withActivityOptions(ctx) - for startHeight, endHeight := request.StartHeight, request.StartHeight+batchSize; startHeight < request.EndHeight; startHeight, endHeight = startHeight+batchSize, endHeight+batchSize { - endHeight := endHeight + for startHeight := request.StartHeight; startHeight < request.EndHeight; startHeight = startHeight + batchSize { + if startHeight >= request.StartHeight+checkpointSize { + newRequest := *request + newRequest.StartHeight = startHeight + logger.Info( + "checkpoint reached", + zap.Reflect("newRequest", newRequest), + ) + return workflow.NewContinueAsNewError(ctx, w.name, &newRequest) + } + endHeight := startHeight + batchSize if endHeight > request.EndHeight { endHeight = request.EndHeight } - _, err := w.replicator.Execute(ctx, &activity.ReplicatorRequest{ - StartBlockHeight: startHeight, - EndBlockHeight: endHeight, - Parallelism: parallelism, - Tag: tag, - UpdateWatermark: request.UpdateWatermark, - Compression: dataCompression, - }) - if err != nil { - return xerrors.Errorf("failed to replicate blocks from %v-%v: %w", startHeight, endHeight, err) + + wg := workflow.NewWaitGroup(ctx) + wg.Add(parallelism) + miniBatchCount := int((endHeight-startHeight-1)/miniBatchSize + 1) + inputChannel := workflow.NewNamedBufferedChannel(ctx, "replicator.input", miniBatchCount) + for batchStart := startHeight; batchStart < endHeight; batchStart = batchStart + miniBatchSize { + inputChannel.Send(ctx, batchStart) + } + inputChannel.Close() + + reprocessChannel := workflow.NewNamedBufferedChannel(ctx, "replicator.reprocess", miniBatchCount) + defer reprocessChannel.Close() + + // Phase 1: running mini batches in parallel. + for i := 0; i < parallelism; i++ { + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + for { + var batchStart uint64 + if ok := inputChannel.Receive(ctx, &batchStart); !ok { + break + } + batchEnd := batchStart + miniBatchSize + if batchEnd > endHeight { + batchEnd = endHeight + } + _, err := w.replicator.Execute(ctx, &activity.ReplicatorRequest{ + Tag: tag, + StartHeight: batchStart, + EndHeight: batchEnd, + Parallelism: parallelism, + Compression: dataCompression, + }) + if err != nil { + reprocessChannel.Send(ctx, batchStart) + logger.Warn( + "queued for reprocessing", + zap.Uint64("batchStart", batchStart), + zap.Error(err), + ) + } + } + }) + } + wg.Wait(ctx) + + // Phase 2: reprocess any failed batches sequentially. + // This should happen rarely (only if we over stress the cluster or the cluster itself was crashing). + for { + var batchStart uint64 + if ok := reprocessChannel.ReceiveAsync(&batchStart); !ok { + break + } + batchEnd := batchStart + miniBatchSize + if batchEnd > endHeight { + batchEnd = endHeight + } + _, err := w.replicator.Execute(ctx, &activity.ReplicatorRequest{ + Tag: tag, + StartHeight: batchStart, + EndHeight: batchEnd, + Parallelism: parallelism, + Compression: dataCompression, + }) + if err != nil { + return xerrors.Errorf("failed to replicate block from %d to %d: %w", batchStart, batchEnd, err) + } + } + + // Phase 3: update watermark + if request.UpdateWatermark { + _, err := w.updateWatermark.Execute(ctx, &activity.UpdateWatermarkRequest{ + Tag: request.Tag, + ValidateStart: startHeight - 1, + BlockHeight: endHeight - 1, + }) + if err != nil { + return xerrors.Errorf("failed to update watermark: %w", err) + } } } logger.Info("workflow finished") - return nil }) }