From ae1c7966c904034a985324d0bb76ca65bf01645e Mon Sep 17 00:00:00 2001 From: Jack Chuma Date: Thu, 21 Nov 2024 17:25:23 -0500 Subject: [PATCH] add blockNumber checkpoint handling --- .../log-fetcher/internal/fetcher/fetcher.go | 7 ++- .../log-fetcher/internal/handler/handler.go | 9 +++- .../internal/handler/handler_test.go | 35 +++++++++++++-- .../log-fetcher/internal/listener/listener.go | 44 ++++++++++--------- .../internal/listener/listener_test.go | 2 +- .../internal/store/mongo_client.go | 43 +++++++++++++++++- .../internal/store/mongo_client_test.go | 10 +++++ 7 files changed, 121 insertions(+), 29 deletions(-) diff --git a/services/go-filler/log-fetcher/internal/fetcher/fetcher.go b/services/go-filler/log-fetcher/internal/fetcher/fetcher.go index 221ad7c..f15bde8 100644 --- a/services/go-filler/log-fetcher/internal/fetcher/fetcher.go +++ b/services/go-filler/log-fetcher/internal/fetcher/fetcher.go @@ -48,7 +48,12 @@ func Main(ctx *cli.Context) error { log.Crit("Failed to convert chainId to big.Int", "chainId", chainId) } - l, err := listener.NewListener(chainIdBigInt, cfg.Networks, queue) + checkpoint, err := queue.ReadCheckpoint(chainId) + if err != nil { + log.Crit("Failed to read checkpoint", "error", err) + } + + l, err := listener.NewListener(chainIdBigInt, cfg.Networks, queue, checkpoint) if err != nil { log.Crit("Failed to create listener", "error", err) } diff --git a/services/go-filler/log-fetcher/internal/handler/handler.go b/services/go-filler/log-fetcher/internal/handler/handler.go index 5dfcce0..1e93c5d 100644 --- a/services/go-filler/log-fetcher/internal/handler/handler.go +++ b/services/go-filler/log-fetcher/internal/handler/handler.go @@ -8,7 +8,7 @@ import ( ) type Handler interface { - HandleLog(*bindings.RIP7755OutboxCrossChainCallRequested) error + HandleLog(chainId string, log *bindings.RIP7755OutboxCrossChainCallRequested) error } type handler struct { @@ -20,7 +20,7 @@ func NewHandler(srcChain *chains.ChainConfig, networks chains.Networks, queue st return &handler{validator: validator.NewValidator(srcChain, networks), queue: queue}, nil } -func (h *handler) HandleLog(log *bindings.RIP7755OutboxCrossChainCallRequested) error { +func (h *handler) HandleLog(chainId string, log *bindings.RIP7755OutboxCrossChainCallRequested) error { err := h.validator.ValidateLog(log) if err != nil { return err @@ -31,5 +31,10 @@ func (h *handler) HandleLog(log *bindings.RIP7755OutboxCrossChainCallRequested) return err } + err = h.queue.WriteCheckpoint(chainId, log.Raw.BlockNumber) + if err != nil { + return err + } + return nil } diff --git a/services/go-filler/log-fetcher/internal/handler/handler_test.go b/services/go-filler/log-fetcher/internal/handler/handler_test.go index 7abb7a8..1a8258b 100644 --- a/services/go-filler/log-fetcher/internal/handler/handler_test.go +++ b/services/go-filler/log-fetcher/internal/handler/handler_test.go @@ -27,6 +27,16 @@ func (q *QueueMock) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested) return args.Error(0) } +func (q *QueueMock) ReadCheckpoint(checkpointId string) (uint64, error) { + args := q.Called(checkpointId) + return args.Get(0).(uint64), args.Error(1) +} + +func (q *QueueMock) WriteCheckpoint(checkpointId string, blockNumber uint64) error { + args := q.Called(checkpointId, blockNumber) + return args.Error(0) +} + func (q *QueueMock) Close() error { args := q.Called() return args.Error(0) @@ -40,10 +50,10 @@ func TestHandler(t *testing.T) { validatorMock.On("ValidateLog", log).Return(nil) queueMock.On("Enqueue", log).Return(nil) - + queueMock.On("WriteCheckpoint", "test", log.Raw.BlockNumber).Return(nil) handler := &handler{validator: validatorMock, queue: queueMock} - err := handler.HandleLog(log) + err := handler.HandleLog("test", log) assert.NoError(t, err) @@ -61,7 +71,7 @@ func TestHandlerReturnsErrorFromValidator(t *testing.T) { handler := &handler{validator: validatorMock, queue: queueMock} - err := handler.HandleLog(log) + err := handler.HandleLog("test", log) assert.Error(t, err) } @@ -77,7 +87,24 @@ func TestHandlerReturnsErrorFromQueue(t *testing.T) { handler := &handler{validator: validatorMock, queue: queueMock} - err := handler.HandleLog(log) + err := handler.HandleLog("test", log) + + assert.Error(t, err) +} + +func TestHandlerReturnsErrorFromWriteCheckpoint(t *testing.T) { + validatorMock := new(ValidatorMock) + queueMock := new(QueueMock) + + log := &bindings.RIP7755OutboxCrossChainCallRequested{} + + validatorMock.On("ValidateLog", log).Return(nil) + queueMock.On("Enqueue", log).Return(nil) + queueMock.On("WriteCheckpoint", "test", log.Raw.BlockNumber).Return(errors.New("test error")) + + handler := &handler{validator: validatorMock, queue: queueMock} + + err := handler.HandleLog("test", log) assert.Error(t, err) } diff --git a/services/go-filler/log-fetcher/internal/listener/listener.go b/services/go-filler/log-fetcher/internal/listener/listener.go index 4b98e44..b7adaee 100644 --- a/services/go-filler/log-fetcher/internal/listener/listener.go +++ b/services/go-filler/log-fetcher/internal/listener/listener.go @@ -25,19 +25,21 @@ type Listener interface { } type listener struct { - outbox *bindings.RIP7755Outbox - handler handler.Handler - logs chan *bindings.RIP7755OutboxCrossChainCallRequested - stop chan struct{} - wg sync.WaitGroup - pollRate time.Duration - pollReqCh chan struct{} - polling bool + outbox *bindings.RIP7755Outbox + handler handler.Handler + logs chan *bindings.RIP7755OutboxCrossChainCallRequested + stop chan struct{} + wg sync.WaitGroup + pollRate time.Duration + pollReqCh chan struct{} + polling bool + startingBlock uint64 + srcChainId string } var httpRegex = regexp.MustCompile("^http(s)?://") -func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queue) (Listener, error) { +func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queue, startingBlock uint64) (Listener, error) { srcChain, err := networks.GetChainConfig(srcChainId) if err != nil { return nil, err @@ -63,13 +65,15 @@ func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queu } return &listener{ - outbox: outbox, - handler: h, - logs: make(chan *bindings.RIP7755OutboxCrossChainCallRequested), - stop: make(chan struct{}), - pollReqCh: make(chan struct{}, 1), - pollRate: 3 * time.Second, - polling: httpRegex.MatchString(srcChain.RpcUrl), + outbox: outbox, + handler: h, + logs: make(chan *bindings.RIP7755OutboxCrossChainCallRequested), + stop: make(chan struct{}), + pollReqCh: make(chan struct{}, 1), + pollRate: 3 * time.Second, + polling: httpRegex.MatchString(srcChain.RpcUrl), + startingBlock: startingBlock, + srcChainId: srcChainId.String(), }, nil } @@ -82,7 +86,7 @@ func (l *listener) Start() error { } func webSocketListener(l *listener) error { - sub, err := l.outbox.WatchCrossChainCallRequested(&bind.WatchOpts{}, l.logs, [][32]byte{}) + sub, err := l.outbox.WatchCrossChainCallRequested(&bind.WatchOpts{Start: &l.startingBlock}, l.logs, [][32]byte{}) if err != nil { return fmt.Errorf("failed to subscribe to logs: %v", err) } @@ -111,7 +115,7 @@ func pollListener(l *listener) error { case <-l.pollReqCh: logger.Info("Running") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - logIterator, err := l.outbox.FilterCrossChainCallRequested(&bind.FilterOpts{Context: ctx}, [][32]byte{}) + logIterator, err := l.outbox.FilterCrossChainCallRequested(&bind.FilterOpts{Context: ctx, Start: l.startingBlock}, [][32]byte{}) if err != nil { logger.Error("failed to filter logs", "error", err) cancel() @@ -128,7 +132,7 @@ func pollListener(l *listener) error { } log := logIterator.Event - err = l.handler.HandleLog(log) + err = l.handler.HandleLog(l.srcChainId, log) if err != nil { logger.Error("failed to handle log", "error", err) continue @@ -155,7 +159,7 @@ func (l *listener) loop(sub ethereum.Subscription) { logger.Info("Log Block Number", "blockNumber", log.Raw.BlockNumber) logger.Info("Log Index", "index", log.Raw.Index) - err := l.handler.HandleLog(log) + err := l.handler.HandleLog(l.srcChainId, log) if err != nil { logger.Error("Error handling log", "error", err) } diff --git a/services/go-filler/log-fetcher/internal/listener/listener_test.go b/services/go-filler/log-fetcher/internal/listener/listener_test.go index f34e341..d3d31bc 100644 --- a/services/go-filler/log-fetcher/internal/listener/listener_test.go +++ b/services/go-filler/log-fetcher/internal/listener/listener_test.go @@ -24,7 +24,7 @@ var networksCfg chains.NetworksConfig = chains.NetworksConfig{ var queue store.Queue func TestNewListener(t *testing.T) { - l, err := NewListener(big.NewInt(421614), networksCfg.Networks, queue) + l, err := NewListener(big.NewInt(421614), networksCfg.Networks, queue, 0) if err != nil { t.Fatalf("Failed to create listener: %v", err) } diff --git a/services/go-filler/log-fetcher/internal/store/mongo_client.go b/services/go-filler/log-fetcher/internal/store/mongo_client.go index 3541561..21d4bb5 100644 --- a/services/go-filler/log-fetcher/internal/store/mongo_client.go +++ b/services/go-filler/log-fetcher/internal/store/mongo_client.go @@ -6,17 +6,22 @@ import ( "github.com/base-org/RIP-7755-poc/services/go-filler/bindings" logger "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type Queue interface { Enqueue(*bindings.RIP7755OutboxCrossChainCallRequested) error + ReadCheckpoint(checkpointId string) (uint64, error) + WriteCheckpoint(checkpointId string, blockNumber uint64) error Close() error } type MongoCollection interface { InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) + UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult } type MongoDriverClient interface { @@ -27,6 +32,7 @@ type MongoDriverClient interface { type queue struct { client MongoDriverClient collection MongoCollection + checkpoint MongoCollection } type record struct { @@ -34,13 +40,17 @@ type record struct { Request bindings.CrossChainRequest } +type checkpoint struct { + BlockNumber uint64 +} + func NewQueue(ctx *cli.Context) (Queue, error) { client, err := connect(ctx) if err != nil { return nil, err } - return &queue{client: client, collection: client.Database("calls").Collection("requests")}, nil + return &queue{client: client, collection: client.Database("calls").Collection("requests"), checkpoint: client.Database("calls").Collection("checkpoint")}, nil } func (q *queue) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested) error { @@ -60,6 +70,37 @@ func (q *queue) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested) erro return nil } +func (q *queue) ReadCheckpoint(checkpointId string) (uint64, error) { + res := q.checkpoint.FindOne(context.TODO(), bson.M{"id": checkpointId}) + if res.Err() != nil { + // If the checkpoint doesn't exist, return 0 as starting block + if res.Err() == mongo.ErrNoDocuments { + return 0, nil + } + return 0, res.Err() + } + + var c checkpoint + if err := res.Decode(&c); err != nil { + return 0, err + } + + return c.BlockNumber, nil +} + +func (q *queue) WriteCheckpoint(checkpointId string, blockNumber uint64) error { + c := checkpoint{ + BlockNumber: blockNumber, + } + opts := options.Update().SetUpsert(true) + _, err := q.checkpoint.UpdateOne(context.TODO(), bson.M{"id": checkpointId}, bson.M{"$set": c}, opts) + if err != nil { + return err + } + + return nil +} + func (q *queue) Close() error { return q.client.Disconnect(context.TODO()) } diff --git a/services/go-filler/log-fetcher/internal/store/mongo_client_test.go b/services/go-filler/log-fetcher/internal/store/mongo_client_test.go index 81e131d..ac2c701 100644 --- a/services/go-filler/log-fetcher/internal/store/mongo_client_test.go +++ b/services/go-filler/log-fetcher/internal/store/mongo_client_test.go @@ -25,6 +25,16 @@ func (c *MongoConnectionMock) InsertOne(ctx context.Context, document interface{ return args.Get(0).(*mongo.InsertOneResult), args.Error(1) } +func (c *MongoConnectionMock) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + args := c.Called(ctx, filter, update, opts) + return args.Get(0).(*mongo.UpdateResult), args.Error(1) +} + +func (c *MongoConnectionMock) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult { + args := c.Called(ctx, filter, opts) + return args.Get(0).(*mongo.SingleResult) +} + func (m *MongoClientMock) Database(name string, opts ...*options.DatabaseOptions) *mongo.Database { args := m.Called(name, opts) return args.Get(0).(*mongo.Database)