Skip to content

Commit

Permalink
add blockNumber checkpoint handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jackchuma committed Nov 21, 2024
1 parent 22c9140 commit ae1c796
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 29 deletions.
7 changes: 6 additions & 1 deletion services/go-filler/log-fetcher/internal/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ func Main(ctx *cli.Context) error {
log.Crit("Failed to convert chainId to big.Int", "chainId", chainId)
}

l, err := listener.NewListener(chainIdBigInt, cfg.Networks, queue)
checkpoint, err := queue.ReadCheckpoint(chainId)
if err != nil {
log.Crit("Failed to read checkpoint", "error", err)
}

l, err := listener.NewListener(chainIdBigInt, cfg.Networks, queue, checkpoint)
if err != nil {
log.Crit("Failed to create listener", "error", err)
}
Expand Down
9 changes: 7 additions & 2 deletions services/go-filler/log-fetcher/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type Handler interface {
HandleLog(*bindings.RIP7755OutboxCrossChainCallRequested) error
HandleLog(chainId string, log *bindings.RIP7755OutboxCrossChainCallRequested) error
}

type handler struct {
Expand All @@ -20,7 +20,7 @@ func NewHandler(srcChain *chains.ChainConfig, networks chains.Networks, queue st
return &handler{validator: validator.NewValidator(srcChain, networks), queue: queue}, nil
}

func (h *handler) HandleLog(log *bindings.RIP7755OutboxCrossChainCallRequested) error {
func (h *handler) HandleLog(chainId string, log *bindings.RIP7755OutboxCrossChainCallRequested) error {
err := h.validator.ValidateLog(log)
if err != nil {
return err
Expand All @@ -31,5 +31,10 @@ func (h *handler) HandleLog(log *bindings.RIP7755OutboxCrossChainCallRequested)
return err
}

err = h.queue.WriteCheckpoint(chainId, log.Raw.BlockNumber)
if err != nil {
return err
}

return nil
}
35 changes: 31 additions & 4 deletions services/go-filler/log-fetcher/internal/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ func (q *QueueMock) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested)
return args.Error(0)
}

func (q *QueueMock) ReadCheckpoint(checkpointId string) (uint64, error) {
args := q.Called(checkpointId)
return args.Get(0).(uint64), args.Error(1)
}

func (q *QueueMock) WriteCheckpoint(checkpointId string, blockNumber uint64) error {
args := q.Called(checkpointId, blockNumber)
return args.Error(0)
}

func (q *QueueMock) Close() error {
args := q.Called()
return args.Error(0)
Expand All @@ -40,10 +50,10 @@ func TestHandler(t *testing.T) {

validatorMock.On("ValidateLog", log).Return(nil)
queueMock.On("Enqueue", log).Return(nil)

queueMock.On("WriteCheckpoint", "test", log.Raw.BlockNumber).Return(nil)
handler := &handler{validator: validatorMock, queue: queueMock}

err := handler.HandleLog(log)
err := handler.HandleLog("test", log)

assert.NoError(t, err)

Expand All @@ -61,7 +71,7 @@ func TestHandlerReturnsErrorFromValidator(t *testing.T) {

handler := &handler{validator: validatorMock, queue: queueMock}

err := handler.HandleLog(log)
err := handler.HandleLog("test", log)

assert.Error(t, err)
}
Expand All @@ -77,7 +87,24 @@ func TestHandlerReturnsErrorFromQueue(t *testing.T) {

handler := &handler{validator: validatorMock, queue: queueMock}

err := handler.HandleLog(log)
err := handler.HandleLog("test", log)

assert.Error(t, err)
}

func TestHandlerReturnsErrorFromWriteCheckpoint(t *testing.T) {
validatorMock := new(ValidatorMock)
queueMock := new(QueueMock)

log := &bindings.RIP7755OutboxCrossChainCallRequested{}

validatorMock.On("ValidateLog", log).Return(nil)
queueMock.On("Enqueue", log).Return(nil)
queueMock.On("WriteCheckpoint", "test", log.Raw.BlockNumber).Return(errors.New("test error"))

handler := &handler{validator: validatorMock, queue: queueMock}

err := handler.HandleLog("test", log)

assert.Error(t, err)
}
44 changes: 24 additions & 20 deletions services/go-filler/log-fetcher/internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ type Listener interface {
}

type listener struct {
outbox *bindings.RIP7755Outbox
handler handler.Handler
logs chan *bindings.RIP7755OutboxCrossChainCallRequested
stop chan struct{}
wg sync.WaitGroup
pollRate time.Duration
pollReqCh chan struct{}
polling bool
outbox *bindings.RIP7755Outbox
handler handler.Handler
logs chan *bindings.RIP7755OutboxCrossChainCallRequested
stop chan struct{}
wg sync.WaitGroup
pollRate time.Duration
pollReqCh chan struct{}
polling bool
startingBlock uint64
srcChainId string
}

var httpRegex = regexp.MustCompile("^http(s)?://")

func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queue) (Listener, error) {
func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queue, startingBlock uint64) (Listener, error) {
srcChain, err := networks.GetChainConfig(srcChainId)
if err != nil {
return nil, err
Expand All @@ -63,13 +65,15 @@ func NewListener(srcChainId *big.Int, networks chains.Networks, queue store.Queu
}

return &listener{
outbox: outbox,
handler: h,
logs: make(chan *bindings.RIP7755OutboxCrossChainCallRequested),
stop: make(chan struct{}),
pollReqCh: make(chan struct{}, 1),
pollRate: 3 * time.Second,
polling: httpRegex.MatchString(srcChain.RpcUrl),
outbox: outbox,
handler: h,
logs: make(chan *bindings.RIP7755OutboxCrossChainCallRequested),
stop: make(chan struct{}),
pollReqCh: make(chan struct{}, 1),
pollRate: 3 * time.Second,
polling: httpRegex.MatchString(srcChain.RpcUrl),
startingBlock: startingBlock,
srcChainId: srcChainId.String(),
}, nil
}

Expand All @@ -82,7 +86,7 @@ func (l *listener) Start() error {
}

func webSocketListener(l *listener) error {
sub, err := l.outbox.WatchCrossChainCallRequested(&bind.WatchOpts{}, l.logs, [][32]byte{})
sub, err := l.outbox.WatchCrossChainCallRequested(&bind.WatchOpts{Start: &l.startingBlock}, l.logs, [][32]byte{})
if err != nil {
return fmt.Errorf("failed to subscribe to logs: %v", err)
}
Expand Down Expand Up @@ -111,7 +115,7 @@ func pollListener(l *listener) error {
case <-l.pollReqCh:
logger.Info("Running")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
logIterator, err := l.outbox.FilterCrossChainCallRequested(&bind.FilterOpts{Context: ctx}, [][32]byte{})
logIterator, err := l.outbox.FilterCrossChainCallRequested(&bind.FilterOpts{Context: ctx, Start: l.startingBlock}, [][32]byte{})
if err != nil {
logger.Error("failed to filter logs", "error", err)
cancel()
Expand All @@ -128,7 +132,7 @@ func pollListener(l *listener) error {
}

log := logIterator.Event
err = l.handler.HandleLog(log)
err = l.handler.HandleLog(l.srcChainId, log)
if err != nil {
logger.Error("failed to handle log", "error", err)
continue
Expand All @@ -155,7 +159,7 @@ func (l *listener) loop(sub ethereum.Subscription) {
logger.Info("Log Block Number", "blockNumber", log.Raw.BlockNumber)
logger.Info("Log Index", "index", log.Raw.Index)

err := l.handler.HandleLog(log)
err := l.handler.HandleLog(l.srcChainId, log)
if err != nil {
logger.Error("Error handling log", "error", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var networksCfg chains.NetworksConfig = chains.NetworksConfig{
var queue store.Queue

func TestNewListener(t *testing.T) {
l, err := NewListener(big.NewInt(421614), networksCfg.Networks, queue)
l, err := NewListener(big.NewInt(421614), networksCfg.Networks, queue, 0)
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
Expand Down
43 changes: 42 additions & 1 deletion services/go-filler/log-fetcher/internal/store/mongo_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"github.com/base-org/RIP-7755-poc/services/go-filler/bindings"
logger "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type Queue interface {
Enqueue(*bindings.RIP7755OutboxCrossChainCallRequested) error
ReadCheckpoint(checkpointId string) (uint64, error)
WriteCheckpoint(checkpointId string, blockNumber uint64) error
Close() error
}

type MongoCollection interface {
InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult
}

type MongoDriverClient interface {
Expand All @@ -27,20 +32,25 @@ type MongoDriverClient interface {
type queue struct {
client MongoDriverClient
collection MongoCollection
checkpoint MongoCollection
}

type record struct {
RequestHash [32]byte
Request bindings.CrossChainRequest
}

type checkpoint struct {
BlockNumber uint64
}

func NewQueue(ctx *cli.Context) (Queue, error) {
client, err := connect(ctx)
if err != nil {
return nil, err
}

return &queue{client: client, collection: client.Database("calls").Collection("requests")}, nil
return &queue{client: client, collection: client.Database("calls").Collection("requests"), checkpoint: client.Database("calls").Collection("checkpoint")}, nil
}

func (q *queue) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested) error {
Expand All @@ -60,6 +70,37 @@ func (q *queue) Enqueue(log *bindings.RIP7755OutboxCrossChainCallRequested) erro
return nil
}

func (q *queue) ReadCheckpoint(checkpointId string) (uint64, error) {
res := q.checkpoint.FindOne(context.TODO(), bson.M{"id": checkpointId})
if res.Err() != nil {
// If the checkpoint doesn't exist, return 0 as starting block
if res.Err() == mongo.ErrNoDocuments {
return 0, nil
}
return 0, res.Err()
}

var c checkpoint
if err := res.Decode(&c); err != nil {
return 0, err
}

return c.BlockNumber, nil
}

func (q *queue) WriteCheckpoint(checkpointId string, blockNumber uint64) error {
c := checkpoint{
BlockNumber: blockNumber,
}
opts := options.Update().SetUpsert(true)
_, err := q.checkpoint.UpdateOne(context.TODO(), bson.M{"id": checkpointId}, bson.M{"$set": c}, opts)
if err != nil {
return err
}

return nil
}

func (q *queue) Close() error {
return q.client.Disconnect(context.TODO())
}
Expand Down
10 changes: 10 additions & 0 deletions services/go-filler/log-fetcher/internal/store/mongo_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ func (c *MongoConnectionMock) InsertOne(ctx context.Context, document interface{
return args.Get(0).(*mongo.InsertOneResult), args.Error(1)
}

func (c *MongoConnectionMock) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
args := c.Called(ctx, filter, update, opts)
return args.Get(0).(*mongo.UpdateResult), args.Error(1)
}

func (c *MongoConnectionMock) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult {
args := c.Called(ctx, filter, opts)
return args.Get(0).(*mongo.SingleResult)
}

func (m *MongoClientMock) Database(name string, opts ...*options.DatabaseOptions) *mongo.Database {
args := m.Called(name, opts)
return args.Get(0).(*mongo.Database)
Expand Down

0 comments on commit ae1c796

Please sign in to comment.