Skip to content

Commit

Permalink
Major scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
ferranbt committed Apr 6, 2022
1 parent d753735 commit e851d7f
Show file tree
Hide file tree
Showing 26 changed files with 2,005 additions and 2,244 deletions.
175 changes: 48 additions & 127 deletions blocktracker/blocktracker.go → block-tracker/blocktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ const (

// BlockTracker is an interface to track new blocks on the chain
type BlockTracker struct {
config *Config
blocks []*ethgo.Block
blocksLock sync.Mutex
subscriber BlockTrackerInterface
blockChs []chan *BlockEvent
blockChsLock sync.Mutex
provider BlockProvider
once sync.Once
closeCh chan struct{}
config *Config

blocks []*ethgo.Block
lock sync.Mutex
tracker BlockTrackerInterface
provider BlockProvider

eventBroker *EventBroker
closeCh chan struct{}
}

type Config struct {
Expand Down Expand Up @@ -69,85 +69,43 @@ func NewBlockTracker(provider BlockProvider, opts ...ConfigOption) *BlockTracker
if tracker == nil {
tracker = NewJSONBlockTracker(log.New(os.Stderr, "", log.LstdFlags), provider)
}
return &BlockTracker{
blocks: []*ethgo.Block{},
blockChs: []chan *BlockEvent{},
config: config,
subscriber: tracker,
provider: provider,
closeCh: make(chan struct{}),
}
}

func (b *BlockTracker) Subscribe() chan *BlockEvent {
b.blockChsLock.Lock()
defer b.blockChsLock.Unlock()

ch := make(chan *BlockEvent, 1)
b.blockChs = append(b.blockChs, ch)
return ch
}

func (b *BlockTracker) AcquireLock() Lock {
return Lock{lock: &b.blocksLock}
}

func (t *BlockTracker) Init() (err error) {
var block *ethgo.Block
t.once.Do(func() {
block, err = t.provider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return
}
if block.Number == 0 {
return
}

blocks := make([]*ethgo.Block, t.config.MaxBlockBacklog)

var i uint64
for i = 0; i < t.config.MaxBlockBacklog; i++ {
blocks[t.config.MaxBlockBacklog-i-1] = block
if block.Number == 0 {
break
}
block, err = t.provider.GetBlockByHash(block.ParentHash, false)
if err != nil {
return
}
}
broker, err := NewEventBroker(context.Background(), EventBrokerCfg{})
if err != nil {
panic(err)
}

if i != t.config.MaxBlockBacklog {
// less than maxBacklog elements
blocks = blocks[t.config.MaxBlockBacklog-i-1:]
}
t.blocks = blocks
})
return err
}
initial, err := provider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
panic(err)
}

func (b *BlockTracker) MaxBlockBacklog() uint64 {
return b.config.MaxBlockBacklog
}
b := &BlockTracker{
blocks: []*ethgo.Block{},
config: config,
tracker: tracker,
provider: provider,
eventBroker: broker,
closeCh: make(chan struct{}),
}

func (b *BlockTracker) LastBlocked() *ethgo.Block {
target := b.blocks[len(b.blocks)-1]
if target == nil {
return nil
// add an initial block
if err := b.HandleReconcile(initial); err != nil {
panic(err)
}
return target.Copy()
return b
}

func (b *BlockTracker) BlocksBlocked() []*ethgo.Block {
res := []*ethgo.Block{}
for _, i := range b.blocks {
res = append(res, i.Copy())
}
return res
// Header returns the last block of the tracked chain
func (b *BlockTracker) Header() *ethgo.Block {
b.lock.Lock()
last := b.blocks[len(b.blocks)-1].Copy()
b.lock.Unlock()
return last
}

func (b *BlockTracker) Len() int {
return len(b.blocks)
func (b *BlockTracker) Subscribe() *Subscription {
return b.eventBroker.Subscribe()
}

func (b *BlockTracker) Close() error {
Expand All @@ -162,7 +120,7 @@ func (b *BlockTracker) Start() error {
cancelFn()
}()
// start the polling
err := b.subscriber.Track(ctx, func(block *ethgo.Block) error {
err := b.tracker.Track(ctx, func(block *ethgo.Block) error {
return b.HandleReconcile(block)
})
if err != nil {
Expand All @@ -171,7 +129,7 @@ func (b *BlockTracker) Start() error {
return err
}

func (t *BlockTracker) AddBlockLocked(block *ethgo.Block) error {
func (t *BlockTracker) addBlocks(block *ethgo.Block) error {
if uint64(len(t.blocks)) == t.config.MaxBlockBacklog {
// remove past blocks if there are more than maxReconcileBlocks
t.blocks = t.blocks[1:]
Expand Down Expand Up @@ -225,7 +183,7 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block,
count := uint64(0)
for {
if count > t.config.MaxBlockBacklog {
return nil, -1, fmt.Errorf("cannot reconcile more than max backlog values")
return nil, -1, fmt.Errorf("cannot reconcile more than '%d' max backlog values", t.config.MaxBlockBacklog)
}
count++

Expand All @@ -250,8 +208,8 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block,
}

func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error) {
t.blocksLock.Lock()
defer t.blocksLock.Unlock()
t.lock.Lock()
defer t.lock.Unlock()

blocks, indx, err := t.handleReconcileImpl(block)
if err != nil {
Expand All @@ -274,7 +232,7 @@ func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error)
// include the new blocks
for _, block := range blocks {
blockEvnt.Added = append(blockEvnt.Added, block)
if err := t.AddBlockLocked(block); err != nil {
if err := t.addBlocks(block); err != nil {
return nil, err
}
}
Expand All @@ -290,15 +248,7 @@ func (t *BlockTracker) HandleReconcile(block *ethgo.Block) error {
return nil
}

t.blockChsLock.Lock()
for _, ch := range t.blockChs {
select {
case ch <- blockEvnt:
default:
}
}
t.blockChsLock.Unlock()

t.eventBroker.Publish(blockEvnt)
return nil
}

Expand Down Expand Up @@ -409,41 +359,12 @@ func (s *SubscriptionBlockTracker) Track(ctx context.Context, handle func(block
return nil
}

type Lock struct {
Locked bool
lock *sync.Mutex
}

func (l *Lock) Lock() {
l.Locked = true
l.lock.Lock()
}

func (l *Lock) Unlock() {
l.Locked = false
l.lock.Unlock()
}

// EventType is the type of the event
type EventType int

const (
// EventAdd happens when a new event is included in the chain
EventAdd EventType = iota
// EventDel may happen when there is a reorg and a past event is deleted
EventDel
)

// Event is an event emitted when a new log is included
type Event struct {
Type EventType
Added []*ethgo.Log
Removed []*ethgo.Log
}

// BlockEvent is an event emitted when a new block is included
type BlockEvent struct {
Type EventType
Added []*ethgo.Block
Removed []*ethgo.Block
}

func (b *BlockEvent) Header() *ethgo.Block {
return b.Added[len(b.Added)-1]
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package blocktracker

/*
import (
"context"
"log"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/jsonrpc"
"github.com/umbracle/ethgo/testutil"
web3 "github.com/umbracle/go-web3"
"github.com/umbracle/go-web3/jsonrpc"
"github.com/umbracle/go-web3/testutil"
)
func testListener(t *testing.T, server *testutil.TestServer, tracker BlockTrackerInterface) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
blocks := make(chan *ethgo.Block)
err := tracker.Track(ctx, func(block *ethgo.Block) error {
blocks := make(chan *web3.Block)
err := tracker.Track(ctx, func(block *web3.Block) error {
blocks <- block
return nil
})
Expand Down Expand Up @@ -83,11 +83,10 @@ func TestBlockTracker_Lifecycle(t *testing.T) {
c, _ := jsonrpc.NewClient(s.HTTPAddr())
tr := NewBlockTracker(c.Eth())
assert.NoError(t, tr.Init())
go tr.Start()
sub := tr.Subscribe()
sub := tr.Subscribe().GetEventCh()
for i := 0; i < 10; i++ {
select {
case <-sub:
Expand All @@ -97,46 +96,6 @@ func TestBlockTracker_Lifecycle(t *testing.T) {
}
}
func TestBlockTracker_PopulateBlocks(t *testing.T) {
// more than maxBackLog blocks
{
l := testutil.MockList{}
l.Create(0, 15, func(b *testutil.MockBlock) {})

m := &testutil.MockClient{}
m.AddScenario(l)

tt0 := NewBlockTracker(m)

err := tt0.Init()
if err != nil {
t.Fatal(err)
}
if !testutil.CompareBlocks(l.ToBlocks()[5:], tt0.blocks) {
t.Fatal("bad")
}
}
// less than maxBackLog
{
l0 := testutil.MockList{}
l0.Create(0, 5, func(b *testutil.MockBlock) {})

m1 := &testutil.MockClient{}
m1.AddScenario(l0)

tt1 := NewBlockTracker(m1)
tt1.provider = m1

err := tt1.Init()
if err != nil {
panic(err)
}
if !testutil.CompareBlocks(l0.ToBlocks(), tt1.blocks) {
t.Fatal("bad")
}
}
}

func TestBlockTracker_Events(t *testing.T) {
type TestEvent struct {
Expand Down Expand Up @@ -336,10 +295,10 @@ func TestBlockTracker_Events(t *testing.T) {
// build past block history
for _, b := range c.History.ToBlocks() {
tt.AddBlockLocked(b)
tt.addBlocks(b)
}
sub := tt.Subscribe()
sub := tt.Subscribe().GetEventCh()
for _, b := range c.Reconcile {
if err := tt.HandleReconcile(b.block.Block()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -370,3 +329,4 @@ func TestBlockTracker_Events(t *testing.T) {
})
}
}
*/
Loading

0 comments on commit e851d7f

Please sign in to comment.