Skip to content

Commit

Permalink
client/{core,cmd/dexc}: load unfunded orders for recovery
Browse files Browse the repository at this point in the history
This addresses the issue of orders that lose their funding coins one way
or another (e.g. spend outside of dexc) can never be loaded. Prior to
this PR, that meant two things:

 - On login, the user will be forever spammed by an "Order coin error"
   / "Source coins retrieval error".
 - Matches for such orders can never be
   recovered, even if they do not require funding coins. This made
   recovery a manual task if there were matches in progress or requiring
   refund, etc.

Unfunded orders that require funding (epoch/booked or with matches
needing to send swap txns) are explicitly blocked from negotiating new
matches, and any offending matches that require funding coins are
blocked with swapErr. However, notably, the trackedTrade is added to the
dc.trades map, unlike before this change. In addition to allowing
unaffected matches to proceed, match status resolution can be performed
for the order and its matches, potentially revoking it, and
revoke_order/revoke_matches requests from the server will be recognized.

This allows recovery or completion of other unaffected matches belonging
to the trade. The order and the unfunded matches stay active (but halted
via swapErr) until they are either revoked or the user possibly resolves
a wallet issue that made the funding coins inaccessible and restarts
dexc to try loading the funding coins again. If the coins are not spent,
they probably are using the wrong wallet that does not control the
referenced coins.

Unfunded standing limit orders that are either epoch or booked are also
canceled to prevent further matches that will be likely to fail.

Unrelated fix: resolve the dexConnection.connected race now that the
conns map is accessed via a helper and connected is not guarded.
  • Loading branch information
chappjc authored Feb 21, 2021
1 parent c67af3f commit a7b5aa0
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 80 deletions.
193 changes: 139 additions & 54 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type dexConnection struct {
epochMtx sync.RWMutex
epoch map[string]uint64
// connected is a best guess on the ws connection status.
connected bool
connected uint32

regConfMtx sync.RWMutex
regConfirms *uint32 // nil regConfirms means no pending registration.
Expand Down Expand Up @@ -244,26 +244,29 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr
func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err error) {
tracker, _, _ := dc.findOrder(oid)
if tracker == nil {
return
return // false, nil
}
found = true

tracker.mtx.Lock()
defer tracker.mtx.Unlock()
return true, c.tryCancelTrade(dc, tracker)
}

// tryCancelTrade attempts to cancel the order.
func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error {
oid := tracker.ID()
if lo, ok := tracker.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF {
err = fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid)
return
return fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid)
}

tracker.mtx.Lock()
defer tracker.mtx.Unlock()

if tracker.cancel != nil {
// Existing cancel might be stale. Deleting it now allows this
// cancel attempt to proceed.
tracker.deleteStaleCancelOrder()

if tracker.cancel != nil {
err = fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. still waiting on cancel order %s to match", oid, tracker.cancel.ID())
return
return fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. "+
"still waiting on cancel order %s to match", oid, tracker.cancel.ID())
}
}

Expand All @@ -281,30 +284,28 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err
},
TargetOrderID: oid,
}
err = order.ValidateOrder(co, order.OrderStatusEpoch, 0)
err := order.ValidateOrder(co, order.OrderStatusEpoch, 0)
if err != nil {
return
return err
}

// Create and send the order message. Check the response before using it.
route, msgOrder := messageOrder(co, nil)
var result = new(msgjson.OrderResult)
err = dc.signAndRequest(msgOrder, route, result, DefaultResponseTimeout)
if err != nil {
return
return err
}
err = validateOrderResponse(dc, result, co, msgOrder)
if err != nil {
err = fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %v",
return fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %w",
preImg[:], result.ServerTime, err)
return
}

// Store the cancel order with the tracker.
err = tracker.cancelTrade(co, preImg)
if err != nil {
err = fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err)
return
return fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err)
}

// Now that the trackedTrade is updated, sync with the preimage request.
Expand All @@ -324,16 +325,15 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err
Order: co,
})
if err != nil {
err = fmt.Errorf("failed to store order in database: %v", err)
return
return fmt.Errorf("failed to store order in database: %w", err)
}

c.log.Infof("Cancel order %s targeting order %s at %s has been placed",
co.ID(), oid, dc.acct.host)

c.notify(newOrderNote("Cancelling order", "A cancel order has been submitted for order "+tracker.token(), db.Poke, tracker.coreOrderInternal()))

return
return nil
}

// Synchronize with the preimage request, in case that request came before
Expand Down Expand Up @@ -393,7 +393,9 @@ type serverMatches struct {
cancel *msgjson.Match
}

// parseMatches sorts the list of matches and associates them with a trade.
// parseMatches sorts the list of matches and associates them with a trade. This
// may be called from handleMatchRoute on receipt of a new 'match' request, or
// by authDEX with the list of active matches returned by the 'connect' request.
func (dc *dexConnection) parseMatches(msgMatches []*msgjson.Match, checkSigs bool) (map[order.OrderID]*serverMatches, []msgjson.Acknowledgement, error) {
var acks []msgjson.Acknowledgement
matches := make(map[order.OrderID]*serverMatches)
Expand Down Expand Up @@ -1042,7 +1044,7 @@ func (c *Core) Exchanges() map[string]*Exchange {
Markets: dc.markets(),
Assets: dc.assets,
FeePending: dc.acct.feePending(),
Connected: dc.connected,
Connected: atomic.LoadUint32(&dc.connected) == 1,
ConfsRequired: requiredConfs,
RegConfirms: dc.getRegConfirms(),
}
Expand Down Expand Up @@ -2537,7 +2539,7 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
// Get the dexConnection and the dex.Asset for each asset.
c.connMtx.RLock()
dc, found := c.conns[host]
connected := found && dc.connected
connected := found && atomic.LoadUint32(&dc.connected) == 1
c.connMtx.RUnlock()
if !found {
return nil, fmt.Errorf("unknown DEX %s", form.Host)
Expand Down Expand Up @@ -3044,6 +3046,31 @@ func (c *Core) authDEX(dc *dexConnection) error {
c.resolveMatchConflicts(dc, matchConflicts)
}

// List and cancel standing limit orders that are in epoch or booked status,
// but without funding coins for new matches. This should be done after the
// order status resolution done above.
var brokenTrades []*trackedTrade
dc.tradeMtx.RLock()
for _, trade := range dc.trades {
if lo, ok := trade.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF {
continue // only standing limit orders need to be canceled
}
trade.mtx.RLock()
status := trade.metaData.Status
if (status == order.OrderStatusEpoch || status == order.OrderStatusBooked) &&
!trade.hasFundingCoins() {
brokenTrades = append(brokenTrades, trade)
}
trade.mtx.RUnlock()
}
dc.tradeMtx.RUnlock()
for _, trade := range brokenTrades {
c.log.Warnf("Canceling unfunded standing limit order %v", trade.ID())
if err = c.tryCancelTrade(dc, trade); err != nil {
c.log.Warnf("Unable to cancel unfunded trade %v: %v", trade.ID(), err)
}
}

// Order, match statuses should now be in sync with the DEX.
// Refresh the user's orders for each of the DEX's markets and
// log updated locked funds info.
Expand Down Expand Up @@ -3407,11 +3434,13 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m
if err != nil {
baseFailed = true
failed[base] = struct{}{}
c.log.Errorf("Connecting to wallet %s failed: %v", unbip(base), err)
} else if !baseWallet.unlocked() {
err = baseWallet.Unlock(crypter)
if err != nil {
baseFailed = true
failed[base] = struct{}{}
c.log.Errorf("Unlock wallet %s failed: %v", unbip(base), err)
}
}
}
Expand All @@ -3420,11 +3449,13 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m
if err != nil {
quoteFailed = true
failed[quote] = struct{}{}
c.log.Errorf("Connecting to wallet %s failed: %v", unbip(quote), err)
} else if !quoteWallet.unlocked() {
err = quoteWallet.Unlock(crypter)
if err != nil {
quoteFailed = true
failed[quote] = struct{}{}
c.log.Errorf("Unlock wallet %s failed: %v", unbip(quote), err)
}
}
}
Expand All @@ -3450,6 +3481,27 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
detail := fmt.Sprintf(s, a...)
c.notify(newOrderNote(subject, detail, db.ErrorLevel, tracker.coreOrder()))
}

// markUnfunded is used to allow an unfunded order to enter the trades map
// so that status resolution and match negotiation for unaffected matches
// may continue. By not self-revoking, the user may have the opportunity to
// resolve any wallet issues that may have lead to a failure to find the
// funding coins. Otherwise the server will (or already did) revoke some or
// all of the matches and the order itself.
markUnfunded := func(trade *trackedTrade, matches []*matchTracker) {
// Block negotiating new matches.
trade.changeLocked = false
trade.coinsLocked = false
// Block swap txn attempts on matches needing funds.
for _, match := range matches {
match.swapErr = errors.New("no funding coins for swap")
}
// Will not be retired until revoke or cancel of the order and all
// matches, which may happen on status resolution after authenticating
// with the DEX server, or from a revoke_match/revoke_order notification
// after timeout. However, the order should be unconditionally canceled.
}

relocks := make(assetMap)
dc.tradeMtx.Lock()
defer dc.tradeMtx.Unlock()
Expand All @@ -3474,28 +3526,30 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
// If matches haven't redeemed, but the counter-swap has been received,
// reload the audit info.
isActive := tracker.metaData.Status == order.OrderStatusBooked || tracker.metaData.Status == order.OrderStatusEpoch
var needsCoins bool
var matchesNeedingCoins []*matchTracker
for _, match := range tracker.matches {
dbMatch, metaData := match.Match, match.MetaData
var needsAuditInfo bool
var counterSwap []byte
if dbMatch.Side == order.Maker {
if dbMatch.Status < order.MakerSwapCast {
needsCoins = true
matchesNeedingCoins = append(matchesNeedingCoins, match)
}
if dbMatch.Status == order.TakerSwapCast {
needsAuditInfo = true // maker needs AuditInfo for takers contract
counterSwap = metaData.Proof.TakerSwap
}
} else { // Taker
if dbMatch.Status < order.TakerSwapCast {
needsCoins = true
matchesNeedingCoins = append(matchesNeedingCoins, match)
}
if dbMatch.Status < order.MatchComplete && dbMatch.Status >= order.MakerSwapCast {
needsAuditInfo = true // taker needs AuditInfo for maker's contract
counterSwap = metaData.Proof.MakerSwap
}
}
c.log.Tracef("Trade %v match %v needs coins = %v, needs audit info = %v",
tracker.ID(), match.id, len(matchesNeedingCoins) > 0, needsAuditInfo)
if needsAuditInfo {
// Check for unresolvable states.
if len(counterSwap) == 0 {
Expand Down Expand Up @@ -3549,39 +3603,51 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
}
}

// Active orders and orders with matches with unsent swaps need the funding
// coin(s).
// Active orders and orders with matches with unsent swaps need funding
// coin(s). If they are not found, block new matches and swap attempts.
needsCoins := len(matchesNeedingCoins) > 0
if isActive || needsCoins {
coinIDs := trade.Coins
if len(tracker.metaData.ChangeCoin) != 0 {
coinIDs = []order.CoinID{tracker.metaData.ChangeCoin}
}
tracker.coins = map[string]asset.Coin{} // should already be
if len(coinIDs) == 0 {
notifyErr("No funding coins", "Order %s has no %s funding coins", tracker.token(), unbip(wallets.fromAsset.ID))
continue
}
byteIDs := make([]dex.Bytes, 0, len(coinIDs))
for _, cid := range coinIDs {
byteIDs = append(byteIDs, []byte(cid))
}
if len(byteIDs) == 0 {
notifyErr("Order coin error", "No coins for loaded order %s %s: %v", unbip(wallets.fromAsset.ID), tracker.token(), err)
continue
}
coins, err := wallets.fromWallet.FundingCoins(byteIDs)
if err != nil {
notifyErr("Order coin error", "Source coins retrieval error for %s %s: %v", unbip(wallets.fromAsset.ID), tracker.token(), err)
continue
notifyErr("Order coin error", "No funding coins recorded for active order %s", tracker.token())
markUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution
} else {
byteIDs := make([]dex.Bytes, 0, len(coinIDs))
for _, cid := range coinIDs {
byteIDs = append(byteIDs, []byte(cid))
}
coins, err := wallets.fromWallet.FundingCoins(byteIDs)
if err != nil || len(coins) == 0 {
notifyErr("Order coin error", "Source coins retrieval error for order %s (%s): %v",
tracker.token(), unbip(wallets.fromAsset.ID), err)
// Block matches needing funding coins.
markUnfunded(tracker, matchesNeedingCoins)
// Note: tracker is still added to trades map for (1) status
// resolution, (2) continued settlement of matches that no
// longer require funding coins, and (3) cancellation in
// authDEX if the order is booked.
c.log.Warnf("Check the status of your %s wallet and the coins logged above! "+
"Resolve the wallet issue if possible and restart the DEX client.",
strings.ToUpper(unbip(wallets.fromAsset.ID)))
c.log.Warnf("Unfunded order %v will be canceled on connect, but %d active matches need funding coins!",
tracker.ID(), len(matchesNeedingCoins))
// If the funding coins are spent or inaccessible, the user
// can only wait for match revocation.
} else {
// NOTE: change and changeLocked are not set even if the
// funding coins were loaded from the DB's ChangeCoin.
tracker.coinsLocked = true
tracker.coins = mapifyCoins(coins)
}
}
// NOTE: change and changeLocked are not set even if the funding
// coins were loaded from the DB's ChangeCoin.
tracker.coinsLocked = true
tracker.coins = mapifyCoins(coins)
}

// Active orders and orders with matches with unsent swaps need the funding
// coin(s).
// Orders with sent but unspent swaps need to recompute contract-locked amts.
// Balances should be updated for any orders with locked wallet coins,
// or orders with funds locked in contracts.
if isActive || needsCoins || tracker.unspentContractAmounts() > 0 {
relocks.count(wallets.fromAsset.ID)
}
Expand Down Expand Up @@ -3786,7 +3852,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
connected: true,
connected: 1,
}

c.log.Debugf("Broadcast timeout = %v, ticking every %v", bTimeout, dc.tickInterval)
Expand Down Expand Up @@ -3879,7 +3945,11 @@ func (c *Core) handleReconnect(host string) {
func (c *Core) handleConnectEvent(host string, connected bool) {
c.connMtx.Lock()
if dc, found := c.conns[host]; found {
dc.connected = connected
var v uint32
if connected {
v = 1
}
atomic.StoreUint32(&dc.connected, v)
}
c.connMtx.Unlock()
statusStr := "connected"
Expand Down Expand Up @@ -4286,17 +4356,32 @@ func handleMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error {
// requirement, which requires changes to the server's handling.
return err
}

// Warn about new matches for unfunded orders. We still must ack all the
// matches in the 'match' request for the server to accept it, although the
// server doesn't require match acks. See (*Swapper).processMatchAcks.
for oid, srvMatch := range matches {
if !srvMatch.tracker.hasFundingCoins() {
c.log.Warnf("Received new match for unfunded order %v!", oid)
// In runMatches>tracker.negotiate we generate the matchTracker and
// set swapErr after updating order status and filled amount, and
// storing the match to the DB. It may still be possible for the
// user to recover if the issue is just that the wrong wallet is
// connected by fixing wallet config and restarting. p.s. Hopefully
// we are maker.
}
}

resp, err := msgjson.NewResponse(msg.ID, acks, nil)
if err != nil {
return err
}

// Send the match acknowledgments.
// TODO: Consider a "QueueSend" or similar, but do not bail on the matches.
err = dc.Send(resp)
if err != nil {
// Do not bail on the matches on error, just log it.
c.log.Errorf("Send match response: %v", err)
// dc.addPendingSend(resp) // e.g.
}

// Begin match negotiation.
Expand Down
Loading

0 comments on commit a7b5aa0

Please sign in to comment.