From dd4c7a9b4ca1dfa72945766fcc66f38494aced20 Mon Sep 17 00:00:00 2001
From: Jordan Krage <jmank88@gmail.com>
Date: Thu, 3 Nov 2022 10:04:01 -0500
Subject: [PATCH] core/chains/evm/client: fix node race by deferring rpc close
 (#7839)

(cherry picked from commit 20532772586d1a7684f28cdfb73cf32adf9bafce)
---
 core/chains/evm/client/node.go                | 180 +++++++++---------
 core/chains/evm/client/node_lifecycle_test.go |   3 +
 core/internal/testutils/testutils.go          |   2 +-
 3 files changed, 96 insertions(+), 89 deletions(-)

diff --git a/core/chains/evm/client/node.go b/core/chains/evm/client/node.go
index 1a9decf4328..ddc86948e48 100644
--- a/core/chains/evm/client/node.go
+++ b/core/chains/evm/client/node.go
@@ -127,8 +127,6 @@ type rawclient struct {
 // It must have a ws url and may have a http url
 type node struct {
 	utils.StartStopOnce
-	ws      rawclient
-	http    *rawclient
 	lfcLog  logger.Logger
 	rpcLog  logger.Logger
 	name    string
@@ -136,6 +134,9 @@ type node struct {
 	chainID *big.Int
 	cfg     NodeConfig
 
+	ws   rawclient
+	http *rawclient
+
 	state   NodeState
 	stateMu sync.RWMutex
 
@@ -279,9 +280,6 @@ func (n *node) dial(callerCtx context.Context) error {
 		n.http.geth = ethclient.NewClient(httprpc)
 	}
 
-	n.lfcLog.Debugw("RPC dial: success")
-	promEVMPoolRPCNodeDialsSuccess.WithLabelValues(n.chainID.String(), n.name).Inc()
-
 	return nil
 }
 
@@ -343,7 +341,12 @@ func (n *node) verify(callerCtx context.Context) (err error) {
 
 func (n *node) Close() error {
 	return n.StopOnce(n.name, func() error {
-		defer n.wg.Wait()
+		defer func() {
+			n.wg.Wait()
+			if n.ws.rpc != nil {
+				n.ws.rpc.Close()
+			}
+		}()
 
 		n.stateMu.Lock()
 		defer n.stateMu.Unlock()
@@ -351,9 +354,6 @@ func (n *node) Close() error {
 		n.cancelNodeCtx()
 		n.cancelInflightRequests()
 		n.state = NodeStateClosed
-		if n.ws.rpc != nil {
-			n.ws.rpc.Close()
-		}
 		return nil
 	})
 }
@@ -413,7 +413,7 @@ func (n *node) getRPCDomain() string {
 
 // CallContext implementation
 func (n *node) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return err
 	}
@@ -425,10 +425,10 @@ func (n *node) CallContext(ctx context.Context, result interface{}, method strin
 
 	lggr.Debug("RPC call: evmclient.Client#CallContext")
 	start := time.Now()
-	if n.http != nil {
-		err = n.wrapHTTP(n.http.rpc.CallContext(ctx, result, method, args...))
+	if http != nil {
+		err = n.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...))
 	} else {
-		err = n.wrapWS(n.ws.rpc.CallContext(ctx, result, method, args...))
+		err = n.wrapWS(ws.rpc.CallContext(ctx, result, method, args...))
 	}
 	duration := time.Since(start)
 
@@ -438,7 +438,7 @@ func (n *node) CallContext(ctx context.Context, result interface{}, method strin
 }
 
 func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return err
 	}
@@ -447,10 +447,10 @@ func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
 
 	lggr.Debug("RPC call: evmclient.Client#BatchCallContext")
 	start := time.Now()
-	if n.http != nil {
-		err = n.wrapHTTP(n.http.rpc.BatchCallContext(ctx, b))
+	if http != nil {
+		err = n.wrapHTTP(http.rpc.BatchCallContext(ctx, b))
 	} else {
-		err = n.wrapWS(n.ws.rpc.BatchCallContext(ctx, b))
+		err = n.wrapWS(ws.rpc.BatchCallContext(ctx, b))
 	}
 	duration := time.Since(start)
 
@@ -460,7 +460,7 @@ func (n *node) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
 }
 
 func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head, args ...interface{}) (ethereum.Subscription, error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, _, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -469,7 +469,7 @@ func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head,
 
 	lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
 	start := time.Now()
-	sub, err := n.ws.rpc.EthSubscribe(ctx, channel, args...)
+	sub, err := ws.rpc.EthSubscribe(ctx, channel, args...)
 	if err == nil {
 		n.registerSub(sub)
 	}
@@ -483,7 +483,7 @@ func (n *node) EthSubscribe(ctx context.Context, channel chan<- *evmtypes.Head,
 // GethClient wrappers
 
 func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -493,11 +493,11 @@ func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (rece
 	lggr.Debug("RPC call: evmclient.Client#TransactionReceipt")
 
 	start := time.Now()
-	if n.http != nil {
-		receipt, err = n.http.geth.TransactionReceipt(ctx, txHash)
+	if http != nil {
+		receipt, err = http.geth.TransactionReceipt(ctx, txHash)
 		err = n.wrapHTTP(err)
 	} else {
-		receipt, err = n.ws.geth.TransactionReceipt(ctx, txHash)
+		receipt, err = ws.geth.TransactionReceipt(ctx, txHash)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -510,7 +510,7 @@ func (n *node) TransactionReceipt(ctx context.Context, txHash common.Hash) (rece
 }
 
 func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -519,11 +519,11 @@ func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *typ
 
 	lggr.Debug("RPC call: evmclient.Client#HeaderByNumber")
 	start := time.Now()
-	if n.http != nil {
-		header, err = n.http.geth.HeaderByNumber(ctx, number)
+	if http != nil {
+		header, err = http.geth.HeaderByNumber(ctx, number)
 		err = n.wrapHTTP(err)
 	} else {
-		header, err = n.ws.geth.HeaderByNumber(ctx, number)
+		header, err = ws.geth.HeaderByNumber(ctx, number)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -534,7 +534,7 @@ func (n *node) HeaderByNumber(ctx context.Context, number *big.Int) (header *typ
 }
 
 func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -543,11 +543,11 @@ func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *type
 
 	lggr.Debug("RPC call: evmclient.Client#HeaderByHash")
 	start := time.Now()
-	if n.http != nil {
-		header, err = n.http.geth.HeaderByHash(ctx, hash)
+	if http != nil {
+		header, err = http.geth.HeaderByHash(ctx, hash)
 		err = n.wrapHTTP(err)
 	} else {
-		header, err = n.ws.geth.HeaderByHash(ctx, hash)
+		header, err = ws.geth.HeaderByHash(ctx, hash)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -560,7 +560,7 @@ func (n *node) HeaderByHash(ctx context.Context, hash common.Hash) (header *type
 }
 
 func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return err
 	}
@@ -569,10 +569,10 @@ func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error
 
 	lggr.Debug("RPC call: evmclient.Client#SendTransaction")
 	start := time.Now()
-	if n.http != nil {
-		err = n.wrapHTTP(n.http.geth.SendTransaction(ctx, tx))
+	if http != nil {
+		err = n.wrapHTTP(http.geth.SendTransaction(ctx, tx))
 	} else {
-		err = n.wrapWS(n.ws.geth.SendTransaction(ctx, tx))
+		err = n.wrapWS(ws.geth.SendTransaction(ctx, tx))
 	}
 	duration := time.Since(start)
 
@@ -583,7 +583,7 @@ func (n *node) SendTransaction(ctx context.Context, tx *types.Transaction) error
 
 // PendingNonceAt returns one higher than the highest nonce from both mempool and mined transactions
 func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonce uint64, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return 0, err
 	}
@@ -592,11 +592,11 @@ func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonc
 
 	lggr.Debug("RPC call: evmclient.Client#PendingNonceAt")
 	start := time.Now()
-	if n.http != nil {
-		nonce, err = n.http.geth.PendingNonceAt(ctx, account)
+	if http != nil {
+		nonce, err = http.geth.PendingNonceAt(ctx, account)
 		err = n.wrapHTTP(err)
 	} else {
-		nonce, err = n.ws.geth.PendingNonceAt(ctx, account)
+		nonce, err = ws.geth.PendingNonceAt(ctx, account)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -612,7 +612,7 @@ func (n *node) PendingNonceAt(ctx context.Context, account common.Address) (nonc
 // mined nonce at the given block number, but it actually returns the total
 // transaction count which is the highest mined nonce + 1
 func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce uint64, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return 0, err
 	}
@@ -621,11 +621,11 @@ func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber
 
 	lggr.Debug("RPC call: evmclient.Client#NonceAt")
 	start := time.Now()
-	if n.http != nil {
-		nonce, err = n.http.geth.NonceAt(ctx, account, blockNumber)
+	if http != nil {
+		nonce, err = http.geth.NonceAt(ctx, account, blockNumber)
 		err = n.wrapHTTP(err)
 	} else {
-		nonce, err = n.ws.geth.NonceAt(ctx, account, blockNumber)
+		nonce, err = ws.geth.NonceAt(ctx, account, blockNumber)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -638,7 +638,7 @@ func (n *node) NonceAt(ctx context.Context, account common.Address, blockNumber
 }
 
 func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -647,11 +647,11 @@ func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code
 
 	lggr.Debug("RPC call: evmclient.Client#PendingCodeAt")
 	start := time.Now()
-	if n.http != nil {
-		code, err = n.http.geth.PendingCodeAt(ctx, account)
+	if http != nil {
+		code, err = http.geth.PendingCodeAt(ctx, account)
 		err = n.wrapHTTP(err)
 	} else {
-		code, err = n.ws.geth.PendingCodeAt(ctx, account)
+		code, err = ws.geth.PendingCodeAt(ctx, account)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -664,7 +664,7 @@ func (n *node) PendingCodeAt(ctx context.Context, account common.Address) (code
 }
 
 func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -673,11 +673,11 @@ func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber *
 
 	lggr.Debug("RPC call: evmclient.Client#CodeAt")
 	start := time.Now()
-	if n.http != nil {
-		code, err = n.http.geth.CodeAt(ctx, account, blockNumber)
+	if http != nil {
+		code, err = http.geth.CodeAt(ctx, account, blockNumber)
 		err = n.wrapHTTP(err)
 	} else {
-		code, err = n.ws.geth.CodeAt(ctx, account, blockNumber)
+		code, err = ws.geth.CodeAt(ctx, account, blockNumber)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -690,7 +690,7 @@ func (n *node) CodeAt(ctx context.Context, account common.Address, blockNumber *
 }
 
 func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return 0, err
 	}
@@ -699,11 +699,11 @@ func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint
 
 	lggr.Debug("RPC call: evmclient.Client#EstimateGas")
 	start := time.Now()
-	if n.http != nil {
-		gas, err = n.http.geth.EstimateGas(ctx, call)
+	if http != nil {
+		gas, err = http.geth.EstimateGas(ctx, call)
 		err = n.wrapHTTP(err)
 	} else {
-		gas, err = n.ws.geth.EstimateGas(ctx, call)
+		gas, err = ws.geth.EstimateGas(ctx, call)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -716,7 +716,7 @@ func (n *node) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint
 }
 
 func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -725,11 +725,11 @@ func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error)
 
 	lggr.Debug("RPC call: evmclient.Client#SuggestGasPrice")
 	start := time.Now()
-	if n.http != nil {
-		price, err = n.http.geth.SuggestGasPrice(ctx)
+	if http != nil {
+		price, err = http.geth.SuggestGasPrice(ctx)
 		err = n.wrapHTTP(err)
 	} else {
-		price, err = n.ws.geth.SuggestGasPrice(ctx)
+		price, err = ws.geth.SuggestGasPrice(ctx)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -742,7 +742,7 @@ func (n *node) SuggestGasPrice(ctx context.Context) (price *big.Int, err error)
 }
 
 func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) (val []byte, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -751,11 +751,11 @@ func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumb
 
 	lggr.Debug("RPC call: evmclient.Client#CallContract")
 	start := time.Now()
-	if n.http != nil {
-		val, err = n.http.geth.CallContract(ctx, msg, blockNumber)
+	if http != nil {
+		val, err = http.geth.CallContract(ctx, msg, blockNumber)
 		err = n.wrapHTTP(err)
 	} else {
-		val, err = n.ws.geth.CallContract(ctx, msg, blockNumber)
+		val, err = ws.geth.CallContract(ctx, msg, blockNumber)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -769,7 +769,7 @@ func (n *node) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumb
 }
 
 func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Block, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -778,11 +778,11 @@ func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Blo
 
 	lggr.Debug("RPC call: evmclient.Client#BlockByNumber")
 	start := time.Now()
-	if n.http != nil {
-		b, err = n.http.geth.BlockByNumber(ctx, number)
+	if http != nil {
+		b, err = http.geth.BlockByNumber(ctx, number)
 		err = n.wrapHTTP(err)
 	} else {
-		b, err = n.ws.geth.BlockByNumber(ctx, number)
+		b, err = ws.geth.BlockByNumber(ctx, number)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -795,7 +795,7 @@ func (n *node) BlockByNumber(ctx context.Context, number *big.Int) (b *types.Blo
 }
 
 func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Block, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -804,11 +804,11 @@ func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Bloc
 
 	lggr.Debug("RPC call: evmclient.Client#BlockByHash")
 	start := time.Now()
-	if n.http != nil {
-		b, err = n.http.geth.BlockByHash(ctx, hash)
+	if http != nil {
+		b, err = http.geth.BlockByHash(ctx, hash)
 		err = n.wrapHTTP(err)
 	} else {
-		b, err = n.ws.geth.BlockByHash(ctx, hash)
+		b, err = ws.geth.BlockByHash(ctx, hash)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -821,7 +821,7 @@ func (n *node) BlockByHash(ctx context.Context, hash common.Hash) (b *types.Bloc
 }
 
 func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -830,11 +830,11 @@ func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumbe
 
 	lggr.Debug("RPC call: evmclient.Client#BalanceAt")
 	start := time.Now()
-	if n.http != nil {
-		balance, err = n.http.geth.BalanceAt(ctx, account, blockNumber)
+	if http != nil {
+		balance, err = http.geth.BalanceAt(ctx, account, blockNumber)
 		err = n.wrapHTTP(err)
 	} else {
-		balance, err = n.ws.geth.BalanceAt(ctx, account, blockNumber)
+		balance, err = ws.geth.BalanceAt(ctx, account, blockNumber)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -847,7 +847,7 @@ func (n *node) BalanceAt(ctx context.Context, account common.Address, blockNumbe
 }
 
 func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -856,11 +856,11 @@ func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []type
 
 	lggr.Debug("RPC call: evmclient.Client#FilterLogs")
 	start := time.Now()
-	if n.http != nil {
-		l, err = n.http.geth.FilterLogs(ctx, q)
+	if http != nil {
+		l, err = http.geth.FilterLogs(ctx, q)
 		err = n.wrapHTTP(err)
 	} else {
-		l, err = n.ws.geth.FilterLogs(ctx, q)
+		l, err = ws.geth.FilterLogs(ctx, q)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -873,7 +873,7 @@ func (n *node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []type
 }
 
 func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, _, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -882,7 +882,7 @@ func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery,
 
 	lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs")
 	start := time.Now()
-	sub, err = n.ws.geth.SubscribeFilterLogs(ctx, q, ch)
+	sub, err = ws.geth.SubscribeFilterLogs(ctx, q, ch)
 	if err == nil {
 		n.registerSub(sub)
 	}
@@ -895,7 +895,7 @@ func (n *node) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery,
 }
 
 func (n *node) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) {
-	ctx, cancel, err := n.makeLiveQueryCtx(ctx)
+	ctx, cancel, ws, http, err := n.makeLiveQueryCtx(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -904,11 +904,11 @@ func (n *node) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error
 
 	lggr.Debug("RPC call: evmclient.Client#SuggestGasTipCap")
 	start := time.Now()
-	if n.http != nil {
-		tipCap, err = n.http.geth.SuggestGasTipCap(ctx)
+	if http != nil {
+		tipCap, err = http.geth.SuggestGasTipCap(ctx)
 		err = n.wrapHTTP(err)
 	} else {
-		tipCap, err = n.ws.geth.SuggestGasTipCap(ctx)
+		tipCap, err = ws.geth.SuggestGasTipCap(ctx)
 		err = n.wrapWS(err)
 	}
 	duration := time.Since(start)
@@ -990,9 +990,8 @@ func wrap(err error, tp string) error {
 	return errors.Wrapf(err, "%s call failed", tp)
 }
 
-// makeLiveQueryCtx wraps makeQueryCtx but returns error if node is not
-// "alive".
-func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, err error) {
+// makeLiveQueryCtx wraps makeQueryCtx but returns error if node is not NodeStateAlive.
+func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient, err error) {
 	// Need to wrap in mutex because state transition can cancel and replace the
 	// context
 	n.stateMu.RLock()
@@ -1002,6 +1001,11 @@ func (n *node) makeLiveQueryCtx(parentCtx context.Context) (ctx context.Context,
 		return
 	}
 	cancelCh := n.chStopInFlight
+	ws = n.ws
+	if n.http != nil {
+		cp := *n.http
+		http = &cp
+	}
 	n.stateMu.RUnlock()
 	ctx, cancel = makeQueryCtx(parentCtx, cancelCh)
 	return
diff --git a/core/chains/evm/client/node_lifecycle_test.go b/core/chains/evm/client/node_lifecycle_test.go
index 981c2d410e7..c68c88ee4be 100644
--- a/core/chains/evm/client/node_lifecycle_test.go
+++ b/core/chains/evm/client/node_lifecycle_test.go
@@ -233,6 +233,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
 					default:
 					}
 					return `"0x00"`, makeHeadResult(0)
+				case "eth_unsubscribe":
 				case "web3_clientVersion":
 					return `"test client version 2"`, ""
 				default:
@@ -268,6 +269,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
 				switch method {
 				case "eth_subscribe":
 					return `"0x00"`, makeHeadResult(0)
+				case "eth_unsubscribe":
 				default:
 					t.Errorf("unexpected RPC method: %s", method)
 				}
@@ -432,6 +434,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
 					default:
 					}
 					return `"0x00"`, makeHeadResult(0)
+				case "eth_unsubscribe":
 				default:
 					t.Errorf("unexpected RPC method: %s", method)
 				}
diff --git a/core/internal/testutils/testutils.go b/core/internal/testutils/testutils.go
index 1399b3649ad..2e1bf8fe431 100644
--- a/core/internal/testutils/testutils.go
+++ b/core/internal/testutils/testutils.go
@@ -313,7 +313,7 @@ func IntToHex(n int) string {
 
 // TestInterval is just a sensible poll interval that gives fast tests without
 // risk of spamming
-const TestInterval = 10 * time.Millisecond
+const TestInterval = 100 * time.Millisecond
 
 // AssertEventually waits for f to return true
 func AssertEventually(t *testing.T, f func() bool) {