diff --git a/cmd/chain-newheads/main.go b/cmd/chain-newheads/main.go index fd696205..85c9248d 100644 --- a/cmd/chain-newheads/main.go +++ b/cmd/chain-newheads/main.go @@ -42,14 +42,23 @@ func init() { } func main() { - client, err := rpc.Dial(ETH_NODE_WSS_URL) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // go func() { + // time.Sleep(15 * time.Second) + // fmt.Println("cancelling..") + // cancel() + // }() + + client, err := rpc.DialContext(ctx, ETH_NODE_WSS_URL) if err != nil { log.Fatal(err) } ch := make(chan map[string]interface{}) - sub, err := client.EthSubscribe(context.Background(), ch, "newHeads") + sub, err := client.EthSubscribe(ctx, ch, "newHeads") if err != nil { log.Fatal(err) } @@ -59,6 +68,9 @@ func main() { for { select { + case <-ctx.Done(): + sub.Unsubscribe() + case err := <-sub.Err(): fmt.Println("sub err!", err) os.Exit(1) diff --git a/ethmonitor/ethmonitor.go b/ethmonitor/ethmonitor.go index 2c6e981e..91654d05 100644 --- a/ethmonitor/ethmonitor.go +++ b/ethmonitor/ethmonitor.go @@ -366,8 +366,8 @@ func (m *Monitor) listenNewHead() <-chan uint64 { case err := <-sub.Err(): // if we have an error, we'll reconnect - m.log.Warnf("ethmonitor (chain %s): websocket subscription error: %v", m.chainID.String(), err) - m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket subscription error: %v", m.chainID.String(), err) + m.log.Warnf("ethmonitor (chain %s): websocket subscription closed, error: %v", m.chainID.String(), err) + m.alert.Alert(context.Background(), "ethmonitor (chain %s): websocket subscription closed, error: %v", m.chainID.String(), err) sub.Unsubscribe() // TODO: call provider.ReportFaultWS(err) @@ -377,6 +377,9 @@ func (m *Monitor) listenNewHead() <-chan uint64 { goto reconnect case <-blockTimer.C: + // TODO: .. should we keep this..? it can be that some blockchains + // dont produce blocks as often.. + // if we haven't received a new block in a while, we'll reconnect. m.log.Warnf("ethmonitor: haven't received block in expected time, reconnecting..") sub.Unsubscribe() diff --git a/ethrpc/ethrpc.go b/ethrpc/ethrpc.go index 47596e9a..eaa6d7d9 100644 --- a/ethrpc/ethrpc.go +++ b/ethrpc/ethrpc.go @@ -24,12 +24,14 @@ import ( ) type Provider struct { - log logger.Logger - nodeURL string - nodeWSURL string - httpClient httpClient - br breaker.Breaker - jwtToken string // optional + log logger.Logger + nodeURL string + nodeWSURL string + httpClient httpClient + br breaker.Breaker + jwtToken string // optional + streamClosers []StreamCloser + streamUnsubscribers []StreamUnsubscriber chainID *big.Int chainIDMu sync.Mutex @@ -37,7 +39,7 @@ type Provider struct { // cache cachestore.Store[[]byte] // NOTE: unused for now lastRequestID uint64 - gethRPC *rpc.Client + mu sync.Mutex } func NewProvider(nodeURL string, options ...Option) (*Provider, error) { @@ -68,6 +70,14 @@ var _ RawInterface = &Provider{} // want to break this interface, we could also write an adapter type to keep them compat. var _ bind.ContractBackend = &Provider{} +type StreamCloser interface { + Close() +} + +type StreamUnsubscriber interface { + Unsubscribe() +} + func (s *Provider) SetHTTPClient(httpClient *http.Client) { s.httpClient = httpClient } @@ -447,36 +457,133 @@ func (p *Provider) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint6 return result, err } +// ... +func (p *Provider) IsStreamingEnabled() bool { + return p.nodeWSURL != "" +} + // SubscribeFilterLogs is stubbed below so we can adhere to the bind.ContractBackend interface. +// NOTE: the p.nodeWSURL is setup with a wss:// prefix, which tells the gethRPC to use a +// websocket connection. +// +// The connection will be closed and unsubscribed when the context is cancelled. func (p *Provider) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { if !p.IsStreamingEnabled() { return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming") } - if p.gethRPC == nil { - var err error - p.gethRPC, err = rpc.Dial(p.nodeWSURL) - if err != nil { - return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed: %w", err) - } + + gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL) + if err != nil { + return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed to connect to websocket: %w", err) } - return p.gethRPC.EthSubscribe(ctx, ch, "logs", query) + sub, err := gethRPC.EthSubscribe(ctx, ch, "logs", query) + if err != nil { + gethRPC.Close() + return nil, fmt.Errorf("ethrpc: SubscribeFilterLogs failed: %w", err) + } + + p.mu.Lock() + p.streamClosers = append(p.streamClosers, gethRPC) + p.streamUnsubscribers = append(p.streamUnsubscribers, sub) + p.mu.Unlock() + + go func() { + // close the subscription when the context is cancelled + // or when the subscription is explicitly closed + select { + case <-ctx.Done(): + sub.Unsubscribe() + case <-sub.Err(): + } + + p.mu.Lock() + sub.Unsubscribe() + for i, unsub := range p.streamUnsubscribers { + if unsub == sub { + p.streamUnsubscribers = append(p.streamUnsubscribers[:i], p.streamUnsubscribers[i+1:]...) + break + } + } + gethRPC.Close() + for i, closer := range p.streamClosers { + if closer == gethRPC { + p.streamClosers = append(p.streamClosers[:i], p.streamClosers[i+1:]...) + break + } + } + p.mu.Unlock() + }() + + return sub, nil } -// .. +// SubscribeNewHeads listens for new blocks via websocket client. NOTE: the p.nodeWSURL is setup +// with a wss:// prefix, which tells the gethRPC to use a websocket connection. +// +// The connection will be closed and unsubscribed when the context is cancelled. func (p *Provider) SubscribeNewHeads(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { if !p.IsStreamingEnabled() { return nil, fmt.Errorf("ethrpc: provider instance has not enabled streaming") } - if p.gethRPC == nil { - var err error - p.gethRPC, err = rpc.Dial(p.nodeWSURL) - if err != nil { - return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed: %w", err) - } + + gethRPC, err := rpc.DialContext(ctx, p.nodeWSURL) + if err != nil { + return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed to connect to websocket: %w", err) } - return p.gethRPC.EthSubscribe(ctx, ch, "newHeads") + sub, err := gethRPC.EthSubscribe(ctx, ch, "newHeads") + if err != nil { + gethRPC.Close() + return nil, fmt.Errorf("ethrpc: SubscribeNewHeads failed: %w", err) + } + + p.mu.Lock() + p.streamClosers = append(p.streamClosers, gethRPC) + p.streamUnsubscribers = append(p.streamUnsubscribers, sub) + p.mu.Unlock() + + go func() { + // close the subscription when the context is cancelled + // or when the subscription is explicitly closed + select { + case <-ctx.Done(): + sub.Unsubscribe() + case <-sub.Err(): + } + + p.mu.Lock() + sub.Unsubscribe() + for i, unsub := range p.streamUnsubscribers { + if unsub == sub { + p.streamUnsubscribers = append(p.streamUnsubscribers[:i], p.streamUnsubscribers[i+1:]...) + break + } + } + gethRPC.Close() + for i, closer := range p.streamClosers { + if closer == gethRPC { + p.streamClosers = append(p.streamClosers[:i], p.streamClosers[i+1:]...) + break + } + } + p.mu.Unlock() + }() + + return sub, nil +} + +func (p *Provider) CloseStreamConns() { + p.mu.Lock() + defer p.mu.Unlock() + for _, unsub := range p.streamUnsubscribers { + unsub.Unsubscribe() + } + for _, closer := range p.streamClosers { + closer.Close() + } + p.streamClosers = p.streamClosers[:0] + p.streamUnsubscribers = p.streamUnsubscribers[:0] } // ie, ContractQuery(context.Background(), "0xabcdef..", "balanceOf(uint256)", "uint256", []string{"1"}) @@ -508,8 +615,3 @@ func (p *Provider) contractQuery(ctx context.Context, contractAddress string, in _, err = p.Do(ctx, contractQueryBuilder.Into(&result)) return result, err } - -// ... -func (p *Provider) IsStreamingEnabled() bool { - return p.nodeWSURL != "" -} diff --git a/ethrpc/interface.go b/ethrpc/interface.go index 3b27159c..c266e069 100644 --- a/ethrpc/interface.go +++ b/ethrpc/interface.go @@ -181,6 +181,9 @@ type Interface interface { // .. SubscribeNewHeads(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + + // .. + CloseStreamConns() } // RawInterface also returns the bytes of the response body payload