-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(eth-rpc): Conversion types and functions between Ethereum txs and blocks and Tendermint ones. #1856
Merged
Merged
feat(eth-rpc): Conversion types and functions between Ethereum txs and blocks and Tendermint ones. #1856
Changes from 2 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
1a05061
feat(eth-pubsub): Implement in-memory EventBus for real-time topic ma…
Unique-Divine 36730c5
feat(eth-rpc): Conversion types and functions between Ethereum txs an…
Unique-Divine 176b6c6
chore: linter
Unique-Divine ac0701a
test(eth-rpc): more tests for types dir
Unique-Divine 372df12
ci: add CODECOV_TOKEN env var to secrets
Unique-Divine fe913dc
test,refactor(eth): remove unnecessary nesting + more tests
Unique-Divine 31bb3bb
Merge branch 'main' into ud/evm
Unique-Divine c3a6215
Merge branch 'ud/evm' into ud/evm-rpc
Unique-Divine a336983
refactor(eth): rpc/types -> rpc
Unique-Divine b32d385
refactor(eth): evm/types -> evm
Unique-Divine bfac1e8
refactor(eth): ethtypes -> eth
Unique-Divine 4f28f8f
test(eth): eip712 more tests
Unique-Divine 433780d
Merge branch 'main' into ud/evm-rpc
Unique-Divine 14f8a2c
test(eth): more tests
Unique-Divine db03210
test(eth-rpc): more tests
Unique-Divine 045cf6d
Merge branch 'main' into ud/evm-rpc
Unique-Divine File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// Copyright (c) 2023-2024 Nibi, Inc. | ||
package pubsub | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/pkg/errors" | ||
|
||
coretypes "github.com/cometbft/cometbft/rpc/core/types" | ||
) | ||
|
||
type UnsubscribeFunc func() | ||
|
||
// EventBus manages topics and subscriptions. A "topic" is a named channel of | ||
// communication. A "subscription" is the action taken by a subscriber to express | ||
// interest in receiving messages broadcasted from a specific topic. | ||
type EventBus interface { | ||
// AddTopic: Adds a new topic with the specified name and message source | ||
AddTopic(name string, src <-chan coretypes.ResultEvent) error | ||
// RemoveTopic: Removes the specified topic and all its related data, | ||
// ensuring clean up of resources. | ||
RemoveTopic(name string) | ||
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) | ||
Topics() []string | ||
} | ||
|
||
// memEventBus is an implemention of the `EventBus` interface. | ||
type memEventBus struct { | ||
topics map[string]<-chan coretypes.ResultEvent | ||
topicsMux *sync.RWMutex | ||
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent | ||
subscribersMux *sync.RWMutex | ||
currentUniqueID uint64 | ||
} | ||
|
||
// NewEventBus returns a fresh imlpemention of `memEventBus`, which implements | ||
// the `EventBus` interface for managing Ethereum topics and subscriptions. | ||
func NewEventBus() EventBus { | ||
return &memEventBus{ | ||
topics: make(map[string]<-chan coretypes.ResultEvent), | ||
topicsMux: new(sync.RWMutex), | ||
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent), | ||
subscribersMux: new(sync.RWMutex), | ||
} | ||
} | ||
|
||
// GenUniqueID atomically increments and returns a unique identifier for a new subscriber. | ||
// This ID is used internally to manage subscriber-specific channels. | ||
func (m *memEventBus) GenUniqueID() uint64 { | ||
return atomic.AddUint64(&m.currentUniqueID, 1) | ||
} | ||
|
||
// Topics returns a list of all topics currently managed by the EventBus. The | ||
// list is safe for concurrent access and is a snapshot of current topic names. | ||
func (m *memEventBus) Topics() (topics []string) { | ||
m.topicsMux.RLock() | ||
defer m.topicsMux.RUnlock() | ||
|
||
topics = make([]string, 0, len(m.topics)) | ||
for topicName := range m.topics { | ||
topics = append(topics, topicName) | ||
} | ||
|
||
return topics | ||
} | ||
|
||
// AddTopic adds a new topic with the specified name and message source | ||
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error { | ||
m.topicsMux.RLock() | ||
_, ok := m.topics[name] | ||
m.topicsMux.RUnlock() | ||
|
||
if ok { | ||
return errors.New("topic already registered") | ||
} | ||
|
||
m.topicsMux.Lock() | ||
m.topics[name] = src | ||
m.topicsMux.Unlock() | ||
|
||
go m.publishTopic(name, src) | ||
|
||
return nil | ||
} | ||
|
||
// RemoveTopic: Removes the specified topic and all its related data, ensuring | ||
// clean up of resources. | ||
func (m *memEventBus) RemoveTopic(name string) { | ||
m.topicsMux.Lock() | ||
delete(m.topics, name) | ||
m.topicsMux.Unlock() | ||
} | ||
|
||
// Subscribe attempts to create a subscription to the specified topic. It returns | ||
// a channel to receive messages, a function to unsubscribe, and an error if the | ||
// topic does not exist. | ||
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) { | ||
m.topicsMux.RLock() | ||
_, ok := m.topics[name] | ||
m.topicsMux.RUnlock() | ||
|
||
if !ok { | ||
return nil, nil, errors.Errorf("topic not found: %s", name) | ||
} | ||
|
||
ch := make(chan coretypes.ResultEvent) | ||
m.subscribersMux.Lock() | ||
defer m.subscribersMux.Unlock() | ||
|
||
id := m.GenUniqueID() | ||
if _, ok := m.subscribers[name]; !ok { | ||
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent) | ||
} | ||
m.subscribers[name][id] = ch | ||
|
||
unsubscribe := func() { | ||
m.subscribersMux.Lock() | ||
defer m.subscribersMux.Unlock() | ||
delete(m.subscribers[name], id) | ||
} | ||
|
||
return ch, unsubscribe, nil | ||
} | ||
|
||
func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) { | ||
for { | ||
msg, ok := <-src | ||
if !ok { | ||
m.closeAllSubscribers(name) | ||
m.topicsMux.Lock() | ||
delete(m.topics, name) | ||
m.topicsMux.Unlock() | ||
return | ||
} | ||
m.publishAllSubscribers(name, msg) | ||
} | ||
} | ||
|
||
// closeAllSubscribers closes all subscriber channels associated with the | ||
// specified topic and removes the topic from the subscribers map. This function | ||
// is typically called when a topic is deleted or no longer available to ensure | ||
// all resources are released properly and to prevent goroutine leaks. It ensures | ||
// thread-safe execution by locking around the operation. | ||
func (m *memEventBus) closeAllSubscribers(name string) { | ||
m.subscribersMux.Lock() | ||
defer m.subscribersMux.Unlock() | ||
|
||
subscribers := m.subscribers[name] | ||
delete(m.subscribers, name) | ||
// #nosec G705 | ||
for _, sub := range subscribers { | ||
close(sub) | ||
} | ||
} | ||
|
||
// publishAllSubscribers sends a message to all subscribers of the specified | ||
// topic. It uses a non-blocking send operation to deliver the message to | ||
// subscriber channels. If a subscriber's channel is not ready to receive the | ||
// message (i.e., the channel is full), the message is skipped for that | ||
// subscriber to avoid blocking the publisher. This function ensures thread-safe | ||
// access to subscribers by using a read lock. | ||
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) { | ||
m.subscribersMux.RLock() | ||
defer m.subscribersMux.RUnlock() | ||
subscribers := m.subscribers[name] | ||
// #nosec G705 | ||
for _, sub := range subscribers { | ||
select { | ||
case sub <- msg: | ||
default: | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
package pubsub | ||
|
||
import ( | ||
"log" | ||
"sort" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
rpccore "github.com/cometbft/cometbft/rpc/core/types" | ||
"github.com/stretchr/testify/require" | ||
"github.com/stretchr/testify/suite" | ||
) | ||
|
||
// subscribeAndPublish: Helper function used to perform concurrent subscription | ||
// and publishing actions. It concurrently subscribes multiple clients to the | ||
// specified topic and simultanesouly sends an empty message to the topic channel | ||
// for each subscription. | ||
func subscribeAndPublish(t *testing.T, eb EventBus, topic string, topicChan chan rpccore.ResultEvent) { | ||
var ( | ||
wg sync.WaitGroup | ||
subscribersCount = 50 | ||
emptyMsg = rpccore.ResultEvent{} | ||
) | ||
for i := 0; i < subscribersCount; i++ { | ||
wg.Add(1) | ||
// concurrently subscribe to the topic | ||
go func() { | ||
defer wg.Done() | ||
_, _, err := eb.Subscribe(topic) | ||
require.NoError(t, err) | ||
}() | ||
|
||
// send events to the topic | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
topicChan <- emptyMsg | ||
}() | ||
} | ||
wg.Wait() | ||
} | ||
|
||
type SuitePubsub struct { | ||
suite.Suite | ||
} | ||
|
||
func TestSuitePubsub(t *testing.T) { | ||
suite.Run(t, new(SuitePubsub)) | ||
} | ||
|
||
func (s *SuitePubsub) TestAddTopic() { | ||
q := NewEventBus() | ||
// dummy vars | ||
topicA := "guard" | ||
topicB := "cream" | ||
|
||
s.NoError(q.AddTopic(topicA, make(<-chan rpccore.ResultEvent))) | ||
s.NoError(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent))) | ||
s.Error(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent))) | ||
|
||
topics := q.Topics() | ||
sort.Strings(topics) // cream should be first | ||
s.Require().EqualValues([]string{topicB, topicA}, topics) | ||
} | ||
|
||
func (s *SuitePubsub) TestSubscribe() { | ||
q := NewEventBus() | ||
|
||
// dummy vars | ||
topicA := "0xfoo" | ||
topicB := "blockchain" | ||
|
||
srcA := make(chan rpccore.ResultEvent) | ||
err := q.AddTopic(topicA, srcA) | ||
s.NoError(err) | ||
|
||
srcB := make(chan rpccore.ResultEvent) | ||
err = q.AddTopic(topicB, srcB) | ||
s.NoError(err) | ||
|
||
// subscriber channels | ||
subChanA, _, err := q.Subscribe(topicA) | ||
s.NoError(err) | ||
subChanB1, _, err := q.Subscribe(topicB) | ||
s.NoError(err) | ||
subChanB2, _, err := q.Subscribe(topicB) | ||
s.NoError(err) | ||
|
||
wg := new(sync.WaitGroup) | ||
wg.Add(4) | ||
|
||
emptyMsg := rpccore.ResultEvent{} | ||
go func() { | ||
defer wg.Done() | ||
msg := <-subChanA | ||
log.Println(topicA+":", msg) | ||
s.EqualValues(emptyMsg, msg) | ||
}() | ||
|
||
go func() { | ||
defer wg.Done() | ||
msg := <-subChanB1 | ||
log.Println(topicB+":", msg) | ||
s.EqualValues(emptyMsg, msg) | ||
}() | ||
|
||
go func() { | ||
defer wg.Done() | ||
msg := <-subChanB2 | ||
log.Println(topicB+"2:", msg) | ||
s.EqualValues(emptyMsg, msg) | ||
}() | ||
|
||
go func() { | ||
defer wg.Done() | ||
|
||
time.Sleep(time.Second) | ||
|
||
close(srcA) | ||
close(srcB) | ||
}() | ||
|
||
wg.Wait() | ||
time.Sleep(time.Second) | ||
} | ||
|
||
// TestConcurrentSubscribeAndPublish: Stress tests the module to make sure that | ||
// operations are handled properly under concurrent access. | ||
func (s *SuitePubsub) TestConcurrentSubscribeAndPublish() { | ||
var ( | ||
wg sync.WaitGroup | ||
eb = NewEventBus() | ||
topicName = "topic-name" | ||
topicCh = make(chan rpccore.ResultEvent) | ||
runsCount = 5 | ||
) | ||
|
||
err := eb.AddTopic(topicName, topicCh) | ||
s.Require().NoError(err) | ||
|
||
for i := 0; i < runsCount; i++ { | ||
subscribeAndPublish(s.T(), eb, topicName, topicCh) | ||
} | ||
|
||
// close channel to make test end | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
time.Sleep(2 * time.Second) | ||
close(topicCh) | ||
}() | ||
|
||
wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
// Copyright (c) 2023-2024 Nibi, Inc. | ||
package types | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
) | ||
|
||
// AddrLocker is a mutex structure used to avoid querying outdated account data | ||
type AddrLocker struct { | ||
mu sync.Mutex | ||
locks map[common.Address]*sync.Mutex | ||
} | ||
|
||
// lock returns the lock of the given address. | ||
func (l *AddrLocker) lock(address common.Address) *sync.Mutex { | ||
l.mu.Lock() | ||
defer l.mu.Unlock() | ||
if l.locks == nil { | ||
l.locks = make(map[common.Address]*sync.Mutex) | ||
} | ||
if _, ok := l.locks[address]; !ok { | ||
l.locks[address] = new(sync.Mutex) | ||
} | ||
return l.locks[address] | ||
} | ||
|
||
// LockAddr locks an account's mutex. This is used to prevent another tx getting the | ||
// same nonce until the lock is released. The mutex prevents the (an identical nonce) from | ||
// being read again during the time that the first transaction is being signed. | ||
func (l *AddrLocker) LockAddr(address common.Address) { | ||
l.lock(address).Lock() | ||
} | ||
|
||
// UnlockAddr unlocks the mutex of the given account. | ||
func (l *AddrLocker) UnlockAddr(address common.Address) { | ||
l.lock(address).Unlock() | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
AddTopic
method correctly handles the addition of new topics, including thread-safe checks and the initialization of the publishing goroutine. However, consider handling the potential race condition between checking if a topic exists and adding it. This could be mitigated by using a single write lock for the entire operation.Committable suggestion