From 5779805957b5964279ce614d275fac1eed4b732e Mon Sep 17 00:00:00 2001 From: uv-orbs <82281921+uv-orbs@users.noreply.github.com> Date: Tue, 26 Mar 2024 16:39:49 +0200 Subject: [PATCH] Feature/pub fill (#164) * add swap.mined and isolate Fill.go * remove double publish for cancel * add mined ts remove cancelled * publish fill on success fill and update in any case * reuse newFills * make publish available both from evm proc and server proc * fake fill publish fill event --- models/fill.go | 49 ++++++++++++++++++++++++++ models/swap.go | 17 +-------- service/cancel_orders_for_user.go | 2 -- service/evm_check_pending_txs.go | 1 + service/evm_check_pending_txs_utils.go | 9 +++-- service/get_orders_for_user.go | 23 +----------- service/pub_sub.go | 49 +++++++++++++++++++++----- service/taker.go | 5 ++- 8 files changed, 103 insertions(+), 52 deletions(-) create mode 100644 models/fill.go diff --git a/models/fill.go b/models/fill.go new file mode 100644 index 00000000..2be7aea6 --- /dev/null +++ b/models/fill.go @@ -0,0 +1,49 @@ +package models + +import ( + "time" + + "github.com/google/uuid" + "github.com/shopspring/decimal" +) + +// Individual Fill +// get SwapFills for user +type Fill struct { + OrderId uuid.UUID `json:"orderId"` + ClientOId uuid.UUID `json:"clientOrderId"` + SwapId uuid.UUID `json:"swapId"` + Side Side `json:"side"` + Symbol Symbol `json:"symbol"` + Mined time.Time `json:"mined"` + Resolved time.Time `json:"resolved"` + Price decimal.Decimal `json:"price"` + Size decimal.Decimal `json:"size"` + OrderSize decimal.Decimal `json:"orderSize"` +} + +func NewFill(symbol Symbol, swap Swap, frag OrderFrag, order *Order) *Fill { + // create fill res + fill := Fill{ + OrderId: frag.OrderId, + SwapId: swap.Id, + Symbol: symbol, + Mined: swap.Mined, + Resolved: swap.Resolved, + } + + // get order + if order != nil { + // fill size is dependent on the side of the order + fill.Size = frag.InSize + if order.Side == SELL { + fill.Size = frag.OutSize + } + // enrich fill data + fill.Side = order.Side + fill.ClientOId = order.ClientOId + fill.Price = order.Price + fill.OrderSize = order.Size + } + return &fill +} diff --git a/models/swap.go b/models/swap.go index d26f0a7e..ac85ec19 100644 --- a/models/swap.go +++ b/models/swap.go @@ -31,22 +31,6 @@ type OrderFrag struct { InSize decimal.Decimal } -// Individual Fill -// get SwapFills for user -type Fill struct { - OrderId uuid.UUID `json:"orderId"` - ClientOId uuid.UUID `json:"clientOrderId"` - SwapId uuid.UUID `json:"swapId"` - Side Side `json:"side"` - Symbol Symbol `json:"symbol"` - Timestamp time.Time `json:"timestamp"` - Price decimal.Decimal `json:"price"` - Size decimal.Decimal `json:"size"` - OrderSize decimal.Decimal `json:"orderSize"` - - Cancelled bool `json:"cancelled"` -} - func (f *OrderFrag) ToMap() map[string]string { return map[string]string{ "inSize": f.InSize.String(), @@ -68,6 +52,7 @@ type Swap struct { Id uuid.UUID `json:"id"` Created time.Time `json:"created"` Started time.Time `json:"started"` + Mined time.Time `json:"mined"` Resolved time.Time `json:"resolved"` Succeeded bool `json:"succeeded"` TxHash string `json:"txHash"` diff --git a/service/cancel_orders_for_user.go b/service/cancel_orders_for_user.go index 4dd5c74e..ce102f4f 100644 --- a/service/cancel_orders_for_user.go +++ b/service/cancel_orders_for_user.go @@ -35,8 +35,6 @@ func (s *Service) CancelOrdersForUser(ctx context.Context, userId uuid.UUID, sym logctx.Error(ctx, "could not cancel order", logger.Error(err), logger.String("orderId", uid.String())) } - s.publishOrderEvent(ctx, &order) - res = append(res, *uid) } } diff --git a/service/evm_check_pending_txs.go b/service/evm_check_pending_txs.go index b8db69d0..387b6258 100644 --- a/service/evm_check_pending_txs.go +++ b/service/evm_check_pending_txs.go @@ -73,6 +73,7 @@ func (e *EvmClient) CheckPendingTxs(ctx context.Context) error { switch tx.Status { case models.TX_SUCCESS: logctx.Debug(ctx, "Transaction successful", logger.String("txHash", p.TxHash), logger.String("swapId", p.Id.String())) + p.Mined = *tx.Timestamp _, err = e.ResolveSwap(ctx, p, true, &mu) if err != nil { logctx.Error(ctx, "Failed to process successful transaction", logger.Error(err), logger.String("txHash", p.TxHash), logger.String("swapId", p.Id.String())) diff --git a/service/evm_check_pending_txs_utils.go b/service/evm_check_pending_txs_utils.go index 792bb841..812083f4 100644 --- a/service/evm_check_pending_txs_utils.go +++ b/service/evm_check_pending_txs_utils.go @@ -59,7 +59,7 @@ func (e *EvmClient) ResolveSwap(ctx context.Context, swap models.Swap, isSuccess filledOrders := []models.Order{} updatedOrders := []models.Order{} - for _, order := range orders { + for i, order := range orders { size, found := orderSizes[order.Id] if !found { logctx.Error(ctx, "Failed to get order frag size", logger.String("orderId", order.Id.String())) @@ -73,7 +73,9 @@ func (e *EvmClient) ResolveSwap(ctx context.Context, swap models.Swap, isSuccess continue } - e.publishOrderEvent(ctx, &order) + // publish Fill Event + fill := models.NewFill(order.Symbol, swap, swap.Frags[i], &order) + e.publishFillEvent(ctx, order.UserId, *fill) if order.IsFilled() { // add to filled orders if completely filled @@ -91,10 +93,11 @@ func (e *EvmClient) ResolveSwap(ctx context.Context, swap models.Swap, isSuccess updatedOrders = append(updatedOrders, order) } + e.publishOrderEvent(ctx, &order) + // append to users to be updated later userIds[order.UserId] = true } - // TODO: save orders new state // update user(s) keys for userId := range userIds { diff --git a/service/get_orders_for_user.go b/service/get_orders_for_user.go index da5ab847..f25024a8 100644 --- a/service/get_orders_for_user.go +++ b/service/get_orders_for_user.go @@ -75,32 +75,11 @@ func (s *Service) GetSwapFills(ctx context.Context, userId uuid.UUID, symbol mod if swap.Resolved.After(startAt) && swap.Resolved.Before(endAt) { // iterate through fragments for _, frag := range swap.Frags { - // create fill res - fill := models.Fill{ - OrderId: frag.OrderId, - SwapId: swap.Id, - Symbol: symbol, - Timestamp: swap.Resolved, - } - - // get order order, err := s.orderBookStore.FindOrderById(ctx, frag.OrderId, false) if err != nil { logctx.Warn(ctx, "error getting a order", logger.Error(err), logger.String("user_id", userId.String()), logger.String("order_id", id)) - } else { - // fill size is dependent on the side of the order - fill.Size = frag.InSize - if order.Side == models.SELL { - fill.Size = frag.OutSize - } - // enrich fill data - fill.Side = order.Side - fill.ClientOId = order.ClientOId - fill.Price = order.Price - fill.OrderSize = order.Size } - - fills = append(fills, fill) + fills = append(fills, *models.NewFill(symbol, *swap, frag, order)) if len(fills) >= MAX_FILLS { return nil, models.ErrMaxRecExceeded diff --git a/service/pub_sub.go b/service/pub_sub.go index 5787dea9..ad5c0f44 100644 --- a/service/pub_sub.go +++ b/service/pub_sub.go @@ -2,9 +2,11 @@ package service import ( "context" + "encoding/json" "fmt" "github.com/google/uuid" + "github.com/orbs-network/order-book/data/store" "github.com/orbs-network/order-book/models" "github.com/orbs-network/order-book/utils/logger" "github.com/orbs-network/order-book/utils/logger/logctx" @@ -24,30 +26,61 @@ func (s *Service) SubscribeUserOrders(ctx context.Context, userId uuid.UUID) (ch return channel, nil } -func (s *EvmClient) publishOrderEvent(ctx context.Context, order *models.Order) { - key, value, err := createOrderEvent(ctx, order) +func publishFillEvent(ctx context.Context, store store.OrderBookStore, userId uuid.UUID, fill models.Fill) { + //value, err := json.Marshal(fill) + value, err := json.Marshal(struct { + Event string `json:"event"` + models.Fill + }{ + Event: "order-fill", + Fill: fill, + }) if err != nil { - return + logctx.Error(ctx, "failed to marshal order to json", logger.Error(err)) } - if err := s.orderBookStore.PublishEvent(ctx, key, value); err != nil { - logctx.Error(ctx, "failed to publish order event", logger.String("event", key), logger.Error(err)) + key := models.CreateUserOrdersEventKey(userId) + if err := store.PublishEvent(ctx, key, value); err != nil { + logctx.Error(ctx, "failed to publish fill event", logger.String("event", key), logger.Error(err)) } + +} +func (e *EvmClient) publishFillEvent(ctx context.Context, userId uuid.UUID, fill models.Fill) { + publishFillEvent(ctx, e.orderBookStore, userId, fill) } -func (s *Service) publishOrderEvent(ctx context.Context, order *models.Order) { +func (s *Service) publishFillEvent(ctx context.Context, userId uuid.UUID, fill models.Fill) { + publishFillEvent(ctx, s.orderBookStore, userId, fill) +} + +func publishOrderEvent(ctx context.Context, store store.OrderBookStore, order *models.Order) { key, value, err := createOrderEvent(ctx, order) if err != nil { return } - if err := s.orderBookStore.PublishEvent(ctx, key, value); err != nil { + if err := store.PublishEvent(ctx, key, value); err != nil { logctx.Error(ctx, "failed to publish order event", logger.String("event", key), logger.Error(err)) } } +func (e *EvmClient) publishOrderEvent(ctx context.Context, order *models.Order) { + publishOrderEvent(ctx, e.orderBookStore, order) +} +func (s *Service) publishOrderEvent(ctx context.Context, order *models.Order) { + publishOrderEvent(ctx, s.orderBookStore, order) +} + func createOrderEvent(ctx context.Context, order *models.Order) (key string, value []byte, err error) { - value, err = order.ToJson() + //value, err = order.ToJson() + + value, err = json.Marshal(struct { + Event string `json:"event"` + models.Order + }{ + Event: "order-changed", + Order: *order, + }) if err != nil { logctx.Error(ctx, "failed to marshal order to json", logger.Error(err)) } diff --git a/service/taker.go b/service/taker.go index e5683601..76a418ec 100644 --- a/service/taker.go +++ b/service/taker.go @@ -212,12 +212,15 @@ func (s *Service) FillSwap(ctx context.Context, swapId uuid.UUID) error { logctx.Error(ctx, "FillOrder Failed", logger.Error(err)) return err } + // publish fill event + s.publishFillEvent(ctx, order.UserId, *models.NewFill(order.Symbol, *swap, frag, order)) + if filled { filledOrders = append(filledOrders, *order) + } else { openOrders = append(openOrders, *order) } - s.publishOrderEvent(ctx, order) } } // store partial orders