Skip to content

Commit

Permalink
Fix some strange vsp package bugs
Browse files Browse the repository at this point in the history
Unexport the Queue method, so that external callers are less likely to
abuse it.  It will panic if the queue channel has already closed.

Use returns instead of breaks when inside of a select statement and
the goal was to break out of an outer loop.  The break statement will
break out of the switch statement, not the loop.

Do not queue items from Process.  This was needlessly recursive
(Process -> queue -> Process) and caused the first Process call to
return the original error before attempting the second process
(despite waiting for 2 minutes in the foreground), by which time the
second Process call could fail due to the wallet locking after the
first Process call returned.

Remove a log of expected context cancellation errors.
  • Loading branch information
jrick committed Oct 20, 2020
1 parent 6450b80 commit 829bc2e
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions internal/vsp/vsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ed25519"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -99,7 +100,9 @@ func New(ctx context.Context, vspURL, pubKeyStr string, purchaseAccount, changeA
for {
ntfns, err := c.Recv()
if err != nil {
log.Errorf("Recv failed: %v", err)
if !errors.Is(err, context.Canceled) {
log.Errorf("Recv failed: %v", err)
}
break
}
watch := make(map[chainhash.Hash]chainhash.Hash)
Expand Down Expand Up @@ -134,7 +137,7 @@ func New(ctx context.Context, vspURL, pubKeyStr string, purchaseAccount, changeA
log.Infof("VSP has successfully confirmed the fee tx for %v", txHash)
case "error":
log.Warnf("VSP failed to broadcast feetx for %v -- restarting process", txHash)
v.Queue(ctx, *txHash, nil)
v.queueItem(ctx, *txHash, nil)
default:
log.Warnf("VSP responded with %v for %v", ticketStatus.FeeTxStatus, txHash)
continue
Expand Down Expand Up @@ -180,7 +183,7 @@ func New(ctx context.Context, vspURL, pubKeyStr string, purchaseAccount, changeA
for {
select {
case <-ctx.Done():
break
return
case added := <-t.C:
v.outpointsMu.Lock()
for _, addedHash := range added.UnminedTransactionHashes {
Expand Down Expand Up @@ -214,7 +217,7 @@ func New(ctx context.Context, vspURL, pubKeyStr string, purchaseAccount, changeA
select {
case <-ctx.Done():
close(v.queue)
break
return

case queuedItem := <-v.queue:
feeTx, err := v.Process(ctx, queuedItem.TicketHash, nil)
Expand Down Expand Up @@ -243,7 +246,7 @@ type queueEntry struct {
FeeTx *wire.MsgTx
}

func (v *VSP) Queue(ctx context.Context, ticketHash chainhash.Hash, feeTx *wire.MsgTx) {
func (v *VSP) queueItem(ctx context.Context, ticketHash chainhash.Hash, feeTx *wire.MsgTx) {
queuedTicket := &queueEntry{
TicketHash: ticketHash,
FeeTx: feeTx,
Expand Down Expand Up @@ -275,7 +278,7 @@ func (v *VSP) Sync(ctx context.Context) {
continue
}
// TODO get unpublished fee txs here and sync them
v.Queue(ctx, *ticketSummary.Ticket.Hash, nil)
v.queueItem(ctx, *ticketSummary.Ticket.Hash, nil)
}

return false, nil
Expand All @@ -288,12 +291,19 @@ func (v *VSP) Sync(ctx context.Context) {
}

func (v *VSP) Process(ctx context.Context, ticketHash chainhash.Hash, credits []wallet.Input) (*wire.MsgTx, error) {
feeAmount, err := v.GetFeeAddress(ctx, ticketHash)
if err != nil {
if strings.Contains(err.Error(), "ticket transaction could not be broadcast") {
var feeAmount dcrutil.Amount
var err error
for i := 0; i < 2; i++ {
feeAmount, err = v.GetFeeAddress(ctx, ticketHash)
if err == nil {
break
}
const broadcastMsg = "ticket transaction could not be broadcast"
if err != nil && i == 0 && strings.Contains(err.Error(), broadcastMsg) {
time.Sleep(2 * time.Minute)
v.Queue(ctx, ticketHash, nil)
}
}
if err != nil {
return nil, err
}

Expand Down

0 comments on commit 829bc2e

Please sign in to comment.