Skip to content

Commit

Permalink
ethrpc: improvements to stream methods, and introduce CloseStreamConns()
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka committed Oct 7, 2024
1 parent 2923bdc commit 856601f
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 31 deletions.
16 changes: 14 additions & 2 deletions cmd/chain-newheads/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,23 @@ func init() {
}

func main() {
client, err := rpc.Dial(ETH_NODE_WSS_URL)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// go func() {
// time.Sleep(15 * time.Second)
// fmt.Println("cancelling..")
// cancel()
// }()

client, err := rpc.DialContext(ctx, ETH_NODE_WSS_URL)
if err != nil {
log.Fatal(err)
}

ch := make(chan map[string]interface{})

sub, err := client.EthSubscribe(context.Background(), ch, "newHeads")
sub, err := client.EthSubscribe(ctx, ch, "newHeads")
if err != nil {
log.Fatal(err)
}
Expand All @@ -59,6 +68,9 @@ func main() {
for {
select {

case <-ctx.Done():
sub.Unsubscribe()

case err := <-sub.Err():
fmt.Println("sub err!", err)
os.Exit(1)
Expand Down
7 changes: 5 additions & 2 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 {

case err := <-sub.Err():
// if we have an error, we'll reconnect
m.log.Warnf("ethmonitor (chain %s): websocket subscription error: %v", m.chainID.String(), err)
m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket subscription error: %v", m.chainID.String(), err)
m.log.Warnf("ethmonitor (chain %s): websocket subscription closed, error: %v", m.chainID.String(), err)
m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket subscription closed, error: %v", m.chainID.String(), err)
sub.Unsubscribe()

// TODO: call provider.ReportFaultWS(err)
Expand All @@ -377,6 +377,9 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
goto reconnect

case <-blockTimer.C:
// TODO: .. should we keep this..? it can be that some blockchains
// dont produce blocks as often..

// if we haven't received a new block in a while, we'll reconnect.
m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..")
sub.Unsubscribe()
Expand Down
156 changes: 129 additions & 27 deletions ethrpc/ethrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ import (
)

type Provider struct {
log logger.Logger
nodeURL string
nodeWSURL string
httpClient httpClient
br breaker.Breaker
jwtToken string // optional
log logger.Logger
nodeURL string
nodeWSURL string
httpClient httpClient
br breaker.Breaker
jwtToken string // optional
streamClosers []StreamCloser
streamUnsubscribers []StreamUnsubscriber

chainID *big.Int
chainIDMu sync.Mutex

// cache cachestore.Store[[]byte] // NOTE: unused for now
lastRequestID uint64

gethRPC *rpc.Client
mu sync.Mutex
}

func NewProvider(nodeURL string, options ...Option) (*Provider, error) {
Expand Down Expand Up @@ -68,6 +70,14 @@ var _ RawInterface = &Provider{}
// want to break this interface, we could also write an adapter type to keep them compat.
var _ bind.ContractBackend = &Provider{}

type StreamCloser interface {
Close()
}

type StreamUnsubscriber interface {
Unsubscribe()
}

func (s *Provider) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}
Expand Down Expand Up @@ -447,36 +457,133 @@ func (p *Provider) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint6
return result, err
}

// ...
func (p *Provider) IsStreamingEnabled() bool {
return p.nodeWSURL != ""
}

// SubscribeFilterLogs is stubbed below so we can adhere to the bind.ContractBackend interface.
// NOTE: the p.nodeWSURL is setup with a wss:// prefix, which tells the gethRPC to use a
// websocket connection.
//
// The connection will be closed and unsubscribed when the context is cancelled.
func (p *Provider) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
if !p.IsStreamingEnabled() {
return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming")
}
if p.gethRPC == nil {
var err error
p.gethRPC, err = rpc.Dial(p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed: %w", err)
}

gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed to connect to websocket: %w", err)
}

return p.gethRPC.EthSubscribe(ctx, ch, "logs", query)
sub, err := gethRPC.EthSubscribe(ctx, ch, "logs", query)
if err != nil {
gethRPC.Close()
return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed: %w", err)
}

p.mu.Lock()
p.streamClosers = append(p.streamClosers, gethRPC)
p.streamUnsubscribers = append(p.streamUnsubscribers, sub)
p.mu.Unlock()

go func() {
// close the subscription when the context is cancelled
// or when the subscription is explicitly closed
select {
case <-ctx.Done():
sub.Unsubscribe()
case <-sub.Err():
}

p.mu.Lock()
sub.Unsubscribe()
for i, unsub := range p.streamUnsubscribers {
if unsub == sub {
p.streamUnsubscribers = append(p.streamUnsubscribers[:i], p.streamUnsubscribers[i+1:]...)
break
}
}
gethRPC.Close()
for i, closer := range p.streamClosers {
if closer == gethRPC {
p.streamClosers = append(p.streamClosers[:i], p.streamClosers[i+1:]...)
break
}
}
p.mu.Unlock()
}()

return sub, nil
}

// ..
// SubscribeNewHeads listens for new blocks via websocket client. NOTE: the p.nodeWSURL is setup
// with a wss:// prefix, which tells the gethRPC to use a websocket connection.
//
// The connection will be closed and unsubscribed when the context is cancelled.
func (p *Provider) SubscribeNewHeads(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
if !p.IsStreamingEnabled() {
return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming")
}
if p.gethRPC == nil {
var err error
p.gethRPC, err = rpc.Dial(p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed: %w", err)
}

gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL)
if err != nil {
return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed to connect to websocket: %w", err)
}

return p.gethRPC.EthSubscribe(ctx, ch, "newHeads")
sub, err := gethRPC.EthSubscribe(ctx, ch, "newHeads")
if err != nil {
gethRPC.Close()
return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed: %w", err)
}

p.mu.Lock()
p.streamClosers = append(p.streamClosers, gethRPC)
p.streamUnsubscribers = append(p.streamUnsubscribers, sub)
p.mu.Unlock()

go func() {
// close the subscription when the context is cancelled
// or when the subscription is explicitly closed
select {
case <-ctx.Done():
sub.Unsubscribe()
case <-sub.Err():
}

p.mu.Lock()
sub.Unsubscribe()
for i, unsub := range p.streamUnsubscribers {
if unsub == sub {
p.streamUnsubscribers = append(p.streamUnsubscribers[:i], p.streamUnsubscribers[i+1:]...)
break
}
}
gethRPC.Close()
for i, closer := range p.streamClosers {
if closer == gethRPC {
p.streamClosers = append(p.streamClosers[:i], p.streamClosers[i+1:]...)
break
}
}
p.mu.Unlock()
}()

return sub, nil
}

func (p *Provider) CloseStreamConns() {
p.mu.Lock()
defer p.mu.Unlock()
for _, unsub := range p.streamUnsubscribers {
unsub.Unsubscribe()
}
for _, closer := range p.streamClosers {
closer.Close()
}
p.streamClosers = p.streamClosers[:0]
p.streamUnsubscribers = p.streamUnsubscribers[:0]
}

// ie, ContractQuery(context.Background(), "0xabcdef..", "balanceOf(uint256)", "uint256", []string{"1"})
Expand Down Expand Up @@ -508,8 +615,3 @@ func (p *Provider) contractQuery(ctx context.Context, contractAddress string, in
_, err = p.Do(ctx, contractQueryBuilder.Into(&result))
return result, err
}

// ...
func (p *Provider) IsStreamingEnabled() bool {
return p.nodeWSURL != ""
}
3 changes: 3 additions & 0 deletions ethrpc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ type Interface interface {

// ..
SubscribeNewHeads(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)

// ..
CloseStreamConns()
}

// RawInterface also returns the bytes of the response body payload
Expand Down

0 comments on commit 856601f

Please sign in to comment.