Skip to content

Commit

Permalink
golangci-lint: enable containedctx (#13171)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored May 22, 2024
1 parent 6625266 commit 677abe1
Show file tree
Hide file tree
Showing 68 changed files with 762 additions and 694 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linters:
- noctx
- depguard
- whitespace
- containedctx
linters-settings:
exhaustive:
default-signifies-exhaustive: true
Expand Down
9 changes: 3 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,7 @@ type node[
stateLatestTotalDifficulty *big.Int
stateLatestFinalizedBlockNumber int64

// nodeCtx is the node lifetime's context
nodeCtx context.Context
// cancelNodeCtx cancels nodeCtx when stopping the node
cancelNodeCtx context.CancelFunc
stopCh services.StopChan
// wg waits for subsidiary goroutines
wg sync.WaitGroup

Expand Down Expand Up @@ -148,7 +145,7 @@ func NewNode[
if httpuri != nil {
n.http = httpuri
}
n.nodeCtx, n.cancelNodeCtx = context.WithCancel(context.Background())
n.stopCh = make(services.StopChan)
lggr = logger.Named(lggr, "Node")
lggr = logger.With(lggr,
"nodeTier", Primary.String(),
Expand Down Expand Up @@ -205,7 +202,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) close() error {
n.stateMu.Lock()
defer n.stateMu.Unlock()

n.cancelNodeCtx()
close(n.stopCh)
n.state = nodeStateClosed
return nil
}
Expand Down
55 changes: 34 additions & 21 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const rpcSubscriptionMethodNewHeads = "newHeads"
// Should only be run ONCE per node, after a successful Dial
func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -100,7 +102,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Tracew("Alive loop starting", "nodeState", n.State())

headsC := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, headsC, rpcSubscriptionMethodNewHeads)
sub, err := n.rpc.Subscribe(ctx, headsC, rpcSubscriptionMethodNewHeads)
if err != nil {
lggr.Errorw("Initial subscribe for heads failed", "nodeState", n.State())
n.declareUnreachable()
Expand Down Expand Up @@ -151,15 +153,16 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-pollCh:
var version string
promPoolRPCNodePolls.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
ctx, cancel := context.WithTimeout(n.nodeCtx, pollInterval)
version, err := n.RPC().ClientVersion(ctx)
cancel()
version, err := func(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, pollInterval)
defer cancel()
return n.RPC().ClientVersion(ctx)
}(ctx)
if err != nil {
// prevent overflow
if pollFailures < math.MaxUint32 {
Expand Down Expand Up @@ -240,9 +243,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.declareOutOfSync(func(num int64, td *big.Int) bool { return num < highestReceivedBlockNumber })
return
case <-pollFinalizedHeadCh:
ctx, cancel := context.WithTimeout(n.nodeCtx, n.nodePoolCfg.FinalizedBlockPollInterval())
latestFinalized, err := n.RPC().LatestFinalizedBlock(ctx)
cancel()
latestFinalized, err := func(ctx context.Context) (HEAD, error) {
ctx, cancel := context.WithTimeout(ctx, n.nodePoolCfg.FinalizedBlockPollInterval())
defer cancel()
return n.RPC().LatestFinalizedBlock(ctx)
}(ctx)
if err != nil {
lggr.Warnw("Failed to fetch latest finalized block", "err", err)
continue
Expand Down Expand Up @@ -300,6 +305,8 @@ const (
// outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status
func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td *big.Int) bool) {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -319,7 +326,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
lggr.Debugw("Trying to revive out-of-sync RPC node", "nodeState", n.State())

// Need to redial since out-of-sync nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateAlive {
n.declareState(state)
return
Expand All @@ -328,7 +335,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
lggr.Tracew("Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())

ch := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, ch, rpcSubscriptionMethodNewHeads)
sub, err := n.rpc.Subscribe(ctx, ch, rpcSubscriptionMethodNewHeads)
if err != nil {
lggr.Errorw("Failed to subscribe heads on out-of-sync RPC node", "nodeState", n.State(), "err", err)
n.declareUnreachable()
Expand All @@ -338,7 +345,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case head, open := <-ch:
if !open {
Expand Down Expand Up @@ -372,6 +379,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -394,20 +403,20 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(dialRetryBackoff.Duration()):
lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.State())

err := n.rpc.Dial(n.nodeCtx)
err := n.rpc.Dial(ctx)
if err != nil {
lggr.Errorw(fmt.Sprintf("Failed to redial RPC node; still unreachable: %v", err), "err", err, "nodeState", n.State())
continue
}

n.setState(nodeStateDialed)

state := n.verifyConn(n.nodeCtx, lggr)
state := n.verifyConn(ctx, lggr)
switch state {
case nodeStateUnreachable:
n.setState(nodeStateUnreachable)
Expand All @@ -425,6 +434,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -443,7 +454,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {
lggr := logger.Named(n.lfcLog, "InvalidChainID")

// Need to redial since invalid chain ID nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateInvalidChainID {
n.declareState(state)
return
Expand All @@ -455,10 +466,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(chainIDRecheckBackoff.Duration()):
state := n.verifyConn(n.nodeCtx, lggr)
state := n.verifyConn(ctx, lggr)
switch state {
case nodeStateInvalidChainID:
continue
Expand All @@ -475,6 +486,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {

func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -493,7 +506,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
lggr := logger.Sugared(logger.Named(n.lfcLog, "Syncing"))
lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with syncing status", n.String()), "nodeState", n.State())
// Need to redial since syncing nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateSyncing {
n.declareState(state)
return
Expand All @@ -503,11 +516,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(recheckBackoff.Duration()):
lggr.Tracew("Trying to recheck if the node is still syncing", "nodeState", n.State())
isSyncing, err := n.rpc.IsSyncing(n.nodeCtx)
isSyncing, err := n.rpc.IsSyncing(ctx)
if err != nil {
lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.State())
n.declareUnreachable()
Expand Down
21 changes: 11 additions & 10 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ type Confirmer[
enabledAddresses []ADDR

mb *mailbox.Mailbox[HEAD]
ctx context.Context
ctxCancel context.CancelFunc
stopCh services.StopChan
wg sync.WaitGroup
initSync sync.Mutex
isStarted bool
Expand Down Expand Up @@ -207,7 +206,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sta
return fmt.Errorf("Confirmer: failed to load EnabledAddressesForChain: %w", err)
}

ec.ctx, ec.ctxCancel = context.WithCancel(context.Background())
ec.stopCh = make(chan struct{})
ec.wg = sync.WaitGroup{}
ec.wg.Add(1)
go ec.runLoop()
Expand All @@ -228,7 +227,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) clo
if !ec.isStarted {
return fmt.Errorf("Confirmer is not started: %w", services.ErrAlreadyStopped)
}
ec.ctxCancel()
close(ec.stopCh)
ec.wg.Wait()
ec.isStarted = false
return nil
Expand All @@ -248,23 +247,25 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Hea

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
defer ec.wg.Done()
ctx, cancel := ec.stopCh.NewCtx()
defer cancel()
for {
select {
case <-ec.mb.Notify():
for {
if ec.ctx.Err() != nil {
if ctx.Err() != nil {
return
}
head, exists := ec.mb.Retrieve()
if !exists {
break
}
if err := ec.ProcessHead(ec.ctx, head); err != nil {
if err := ec.ProcessHead(ctx, head); err != nil {
ec.lggr.Errorw("Error processing head", "err", err)
continue
}
}
case <-ec.ctx.Done():
case <-ctx.Done():
return
}
}
Expand Down Expand Up @@ -940,7 +941,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens

for _, etx := range etxs {
if !hasReceiptInLongestChain(*etx, head) {
if err := ec.markForRebroadcast(*etx, head); err != nil {
if err := ec.markForRebroadcast(ctx, *etx, head); err != nil {
return fmt.Errorf("markForRebroadcast failed for etx %v: %w", etx.ID, err)
}
}
Expand Down Expand Up @@ -992,7 +993,7 @@ func hasReceiptInLongestChain[
}
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markForRebroadcast(etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error {
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error {
if len(etx.TxAttempts) == 0 {
return fmt.Errorf("invariant violation: expected tx %v to have at least one attempt", etx.ID)
}
Expand Down Expand Up @@ -1027,7 +1028,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
ec.lggr.Infow(fmt.Sprintf("Re-org detected. Rebroadcasting transaction %s which may have been re-org'd out of the main chain", attempt.Hash.String()), logValues...)

// Put it back in progress and delete all receipts (they do not apply to the new chain)
if err := ec.txStore.UpdateTxForRebroadcast(ec.ctx, etx, attempt); err != nil {
if err := ec.txStore.UpdateTxForRebroadcast(ctx, etx, attempt); err != nil {
return fmt.Errorf("markForRebroadcast failed: %w", err)
}

Expand Down
36 changes: 18 additions & 18 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 677abe1

Please sign in to comment.