diff --git a/.gitignore b/.gitignore index fc3ee87..f1b8306 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ __debug_* *Key.txt address.txt coverage.out -.venv \ No newline at end of file +.venv +.makerc diff --git a/Makefile b/Makefile index 5d2789e..5d313fb 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,8 @@ LOG_LEVEL ?= info -include .makerc export +RPC_URL ?= https://some-rpc-url.com + start: @echo "Starting server and db..." @docker-compose up --build -d diff --git a/cmd/order-book.Dockerfile b/cmd/order-book.Dockerfile index b4a449d..f8bf6cf 100644 --- a/cmd/order-book.Dockerfile +++ b/cmd/order-book.Dockerfile @@ -8,8 +8,8 @@ RUN go mod download COPY . ./ RUN find . -type f ! -name "*.go" ! -name "go.mod" ! -name "go.sum" -delete -RUN CGO_ENABLED=0 GOOS=linux go build -o /order-book ./cmd/order-book +ARG APP_PATH -EXPOSE 8080 +RUN CGO_ENABLED=0 GOOS=linux go build -o /order-book $APP_PATH CMD [ "/order-book" ] \ No newline at end of file diff --git a/cmd/pending-swaps-tracker/main.go b/cmd/pending-swaps-tracker/main.go new file mode 100644 index 0000000..8bf4b0f --- /dev/null +++ b/cmd/pending-swaps-tracker/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/ethereum/go-ethereum/ethclient" + "github.com/orbs-network/order-book/data/evmrepo" + "github.com/orbs-network/order-book/data/redisrepo" + "github.com/orbs-network/order-book/service" + "github.com/redis/go-redis/v9" +) + +var defaultDuration = 10 * time.Second + +func main() { + redisAddress, found := os.LookupEnv("REDIS_URL") + if !found { + panic("REDIS_URL not set") + } + + opt, err := redis.ParseURL(redisAddress) + if err != nil { + panic(fmt.Errorf("failed to parse redis url: %v", err)) + } + + log.Printf("Redis address: %s", opt.Addr) + + rpcUrl, found := os.LookupEnv("RPC_URL") + if !found { + panic("RPC_URL not set") + } + + rdb := redis.NewClient(opt) + + repository, err := redisrepo.NewRedisRepository(rdb) + if err != nil { + log.Fatalf("error creating repository: %v", err) + } + + ethClient, err := ethclient.Dial(rpcUrl) + if err != nil { + log.Fatalf("error creating eth client: %v", err) + } + defer ethClient.Close() + + evmRepo, err := evmrepo.NewEvmRepository(ethClient) + if err != nil { + log.Fatalf("error creating evm repository: %v", err) + } + + evmClient, err := service.NewEvmSvc(repository, evmRepo) + if err != nil { + log.Fatalf("error creating evm client: %v", err) + } + + envDurationStr := os.Getenv("TICKER_DURATION") + + tickerDuration, err := time.ParseDuration(envDurationStr) + if err != nil || envDurationStr == "" { + fmt.Printf("Invalid or missing TICKER_DURATION. Using default of %s\n", defaultDuration) + tickerDuration = defaultDuration + } + + ticker := time.NewTicker(tickerDuration) + defer ticker.Stop() + + ctx := context.Background() + + for range ticker.C { + err := evmClient.CheckPendingTxs(ctx) + if err != nil { + log.Printf("error checking pending txs: %v", err) + } + } +} diff --git a/data/redisrepo/get_pending_swaps.go b/data/redisrepo/get_pending_swaps.go index c68be7e..26c6f7c 100644 --- a/data/redisrepo/get_pending_swaps.go +++ b/data/redisrepo/get_pending_swaps.go @@ -7,6 +7,7 @@ import ( "github.com/orbs-network/order-book/models" ) +// GetPendingSwaps returns all pending swaps that are waiting to be checked for completion func (r *redisRepository) GetPendingSwaps(ctx context.Context) ([]models.Pending, error) { var pendingSwaps []models.Pending diff --git a/data/redisrepo/get_pending_swaps_test.go b/data/redisrepo/get_pending_swaps_test.go index 9a3533b..317f226 100644 --- a/data/redisrepo/get_pending_swaps_test.go +++ b/data/redisrepo/get_pending_swaps_test.go @@ -28,7 +28,7 @@ func TestRedisRepo_GetPendingSwaps(t *testing.T) { } mock.ExpectLRange(CreatePendingSwapTxsKey(), 0, -1).SetVal([]string{ - "{\"swapId\":\"75fff27c-5a98-4f04-9bba-ce932d9c8e67\",\"txHash\":\"cc6fb26e-bc79-4ea3-b761-224893dc7df3\"}", + "{\"swapId\":\"75fff27c-5a98-4f04-9bba-ce932d9c8e67\",\"txHash\":\"0xefd319bb86b954a8e8cd7d9396546db8d3251910209cd8b1b9a674ef8585f226\"}", }) pendingSwaps, err := repo.GetPendingSwaps(ctx) @@ -36,7 +36,7 @@ func TestRedisRepo_GetPendingSwaps(t *testing.T) { assert.NoError(t, err) assert.Len(t, pendingSwaps, 1) assert.Equal(t, uuid.MustParse("75fff27c-5a98-4f04-9bba-ce932d9c8e67"), pendingSwaps[0].SwapId) - assert.Equal(t, uuid.MustParse("cc6fb26e-bc79-4ea3-b761-224893dc7df3"), pendingSwaps[0].TxHash) + assert.Equal(t, "0xefd319bb86b954a8e8cd7d9396546db8d3251910209cd8b1b9a674ef8585f226", pendingSwaps[0].TxHash) }) t.Run("should work with a large list of pending swaps", func(t *testing.T) { diff --git a/data/redisrepo/process_completed_swap_orders.go b/data/redisrepo/process_completed_swap_orders.go new file mode 100644 index 0000000..22b0962 --- /dev/null +++ b/data/redisrepo/process_completed_swap_orders.go @@ -0,0 +1,59 @@ +package redisrepo + +import ( + "context" + "fmt" + + "github.com/google/uuid" + "github.com/orbs-network/order-book/models" + "github.com/orbs-network/order-book/utils/logger" + "github.com/orbs-network/order-book/utils/logger/logctx" +) + +// ProcessCompletedSwapOrders stores the updated swap orders and removes the swap from Redis. It should be called after a swap is completed. +// +// `orders` should be the orders that were part of the swap (with `SizePending` and `SizeFilled` updated accordingly) +// +// `isSuccessful` should be `true` if the swap was successful, `false` otherwise +func (r *redisRepository) ProcessCompletedSwapOrders(ctx context.Context, orders []*models.Order, swapId uuid.UUID, isSuccessful bool) error { + // --- START TRANSACTION --- + transaction := r.client.TxPipeline() + + // 1. Store updated swap orders, handle completely filled orders + if isSuccessful { + for _, order := range orders { + if order.IsFilled() { + if err := storeFilledOrderTx(ctx, transaction, order); err != nil { + logctx.Error(ctx, "failed to store filled order in Redis", logger.Error(err), logger.String("orderId", order.Id.String())) + return fmt.Errorf("failed to store filled order in Redis: %v", err) + } + } else { + if err := storeOrderTX(ctx, transaction, order); err != nil { + logctx.Error(ctx, "failed to store open order in Redis", logger.Error(err), logger.String("orderId", order.Id.String())) + return fmt.Errorf("failed to store open order in Redis: %v", err) + } + } + } + } else { + // Store updated orders + for _, order := range orders { + if err := storeOrderTX(ctx, transaction, order); err != nil { + logctx.Error(ctx, "failed to store open order in Redis", logger.Error(err), logger.String("orderId", order.Id.String())) + return fmt.Errorf("failed to store open order in Redis: %v", err) + } + } + } + + // 2. Remove the swap + swapKey := CreateSwapKey(swapId) + transaction.Del(ctx, swapKey).Err() + + // --- END TRANSACTION --- + _, err := transaction.Exec(ctx) + if err != nil { + logctx.Error(ctx, "failed to execute ProcessCompletedSwapOrders transaction", logger.Error(err)) + return fmt.Errorf("failed to execute ProcessCompletedSwapOrders transaction: %v", err) + } + + return nil +} diff --git a/data/redisrepo/process_completed_swap_orders_test.go b/data/redisrepo/process_completed_swap_orders_test.go new file mode 100644 index 0000000..e9fe1fd --- /dev/null +++ b/data/redisrepo/process_completed_swap_orders_test.go @@ -0,0 +1,127 @@ +package redisrepo + +import ( + "context" + "testing" + "time" + + "github.com/go-redis/redismock/v9" + "github.com/google/uuid" + "github.com/orbs-network/order-book/mocks" + "github.com/orbs-network/order-book/models" + "github.com/redis/go-redis/v9" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" +) + +func TestRedisRepo_ProcessCompletedSwapOrders(t *testing.T) { + ctx := context.Background() + + db, mock := redismock.NewClientMock() + + repo := &redisRepository{ + client: db, + } + + swapId := uuid.MustParse("00000000-0000-0000-0000-000000000007") + timestamp := time.Date(2023, 10, 10, 12, 0, 0, 0, time.UTC) + + t.Run("should process successful swap with completely filled order and partially filled order", func(t *testing.T) { + + filledOrder := models.Order{ + SizeFilled: decimal.NewFromFloat(100), + Size: decimal.NewFromFloat(100), + Side: models.SELL, + Timestamp: timestamp, + } + + partiallyFilledOrder := models.Order{ + SizeFilled: decimal.NewFromFloat(50), + Size: decimal.NewFromFloat(100), + Side: models.SELL, + Timestamp: timestamp, + } + + mock.ExpectTxPipeline() + + // Store completely filled order + mock.ExpectZRem(CreateUserOpenOrdersKey(filledOrder.UserId), filledOrder.Id.String()).SetVal(1) + mock.ExpectZAdd(CreateUserFilledOrdersKey(filledOrder.UserId), redis.Z{ + Score: float64(timestamp.UTC().UnixNano()), + Member: filledOrder.Id.String(), + }).SetVal(1) + mock.ExpectZRem(CreateSellSidePricesKey(filledOrder.Symbol), filledOrder.Id.String()).SetVal(1) + mock.ExpectHSet(CreateOrderIDKey(filledOrder.Id), filledOrder.OrderToMap()).SetVal(1) + + // Store partially filled order + mock.ExpectZAdd(CreateUserOpenOrdersKey(partiallyFilledOrder.UserId), redis.Z{ + Score: float64(timestamp.UTC().UnixNano()), + Member: partiallyFilledOrder.Id.String(), + }).SetVal(1) + mock.ExpectHSet(CreateOrderIDKey(partiallyFilledOrder.Id), partiallyFilledOrder.OrderToMap()).SetVal(1) + mock.ExpectSet(CreateClientOIDKey(partiallyFilledOrder.ClientOId), partiallyFilledOrder.Id.String(), 0).SetVal("OK") + f64Price, _ := partiallyFilledOrder.Price.Float64() + timestamp := float64(partiallyFilledOrder.Timestamp.UTC().UnixNano()) / 1e9 + score := f64Price + (timestamp / 1e12) + mock.ExpectZAdd(CreateSellSidePricesKey(partiallyFilledOrder.Symbol), redis.Z{ + Score: score, + Member: partiallyFilledOrder.Id.String(), + }).SetVal(1) + + // Remove swap + mock.ExpectDel(CreateSwapKey(swapId)).SetVal(1) + + mock.ExpectTxPipelineExec() + + err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&filledOrder, &partiallyFilledOrder}, swapId, true) + assert.NoError(t, err) + }) + + t.Run("should process failed swap", func(t *testing.T) { + orderToBeRolledback := models.Order{ + SizeFilled: decimal.NewFromFloat(20), + Size: decimal.NewFromFloat(100), + SizePending: decimal.NewFromFloat(0), + Side: models.SELL, + Timestamp: timestamp, + } + + mock.ExpectTxPipeline() + + // Store order + mock.ExpectZAdd(CreateUserOpenOrdersKey(orderToBeRolledback.UserId), redis.Z{ + Score: float64(timestamp.UTC().UnixNano()), + Member: orderToBeRolledback.Id.String(), + }).SetVal(1) + mock.ExpectHSet(CreateOrderIDKey(orderToBeRolledback.Id), orderToBeRolledback.OrderToMap()).SetVal(1) + mock.ExpectSet(CreateClientOIDKey(orderToBeRolledback.ClientOId), orderToBeRolledback.Id.String(), 0).SetVal("OK") + f64Price, _ := orderToBeRolledback.Price.Float64() + timestamp := float64(orderToBeRolledback.Timestamp.UTC().UnixNano()) / 1e9 + score := f64Price + (timestamp / 1e12) + mock.ExpectZAdd(CreateSellSidePricesKey(orderToBeRolledback.Symbol), redis.Z{ + Score: score, + Member: orderToBeRolledback.Id.String(), + }).SetVal(1) + + // Remove swap + mock.ExpectDel(CreateSwapKey(swapId)).SetVal(1) + + mock.ExpectTxPipelineExec() + + err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&orderToBeRolledback}, swapId, false) + assert.NoError(t, err) + }) + + t.Run("no database writes should happen if part of transaction fails", func(t *testing.T) { + + mock.ExpectTxPipeline() + mock.ExpectZAdd(CreateUserOpenOrdersKey(mocks.Order.UserId), redis.Z{ + Score: float64(timestamp.UTC().UnixNano()), + Member: mocks.Order.Id.String(), + }).SetErr(assert.AnError) + + err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&mocks.Order}, swapId, true) + assert.ErrorContains(t, err, "failed to execute ProcessCompletedSwapOrders transaction") + }) + +} diff --git a/data/redisrepo/store_filled_orders.go b/data/redisrepo/store_filled_orders.go index 4bd7f9e..b1edcb8 100644 --- a/data/redisrepo/store_filled_orders.go +++ b/data/redisrepo/store_filled_orders.go @@ -17,7 +17,7 @@ func (r *redisRepository) StoreFilledOrders(ctx context.Context, orders []models transaction := r.client.TxPipeline() for _, order := range orders { - err := storeFilledOrderTx(ctx, transaction, order) + err := storeFilledOrderTx(ctx, transaction, &order) if err != nil { return err } @@ -31,7 +31,7 @@ func (r *redisRepository) StoreFilledOrders(ctx context.Context, orders []models return nil } -func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order models.Order) error { +func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order *models.Order) error { // 1. Remove the order from the user's open orders set userOrdersKey := CreateUserOpenOrdersKey(order.UserId) transaction.ZRem(ctx, userOrdersKey, order.Id.String()) @@ -45,11 +45,11 @@ func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order }) // 3. Remove the order from the buy/sell prices set for that pair - buyPricesKey := CreateBuySidePricesKey(order.Symbol) - sellPricesKey := CreateSellSidePricesKey(order.Symbol) if order.Side == models.BUY { + buyPricesKey := CreateBuySidePricesKey(order.Symbol) transaction.ZRem(ctx, buyPricesKey, order.Id.String()) } else { + sellPricesKey := CreateSellSidePricesKey(order.Symbol) transaction.ZRem(ctx, sellPricesKey, order.Id.String()) } diff --git a/data/redisrepo/store_new_pending_swap.go b/data/redisrepo/store_new_pending_swap.go index 0a74acd..3fc0242 100644 --- a/data/redisrepo/store_new_pending_swap.go +++ b/data/redisrepo/store_new_pending_swap.go @@ -10,6 +10,7 @@ import ( "github.com/orbs-network/order-book/utils/logger/logctx" ) +// StoreNewPendingSwap stores a new pending swap in order for its status (pending/complete) to be checked later func (r *redisRepository) StoreNewPendingSwap(ctx context.Context, p models.Pending) error { key := CreatePendingSwapTxsKey() diff --git a/data/redisrepo/store_new_pending_swap_test.go b/data/redisrepo/store_new_pending_swap_test.go index eca1c50..b7b60fc 100644 --- a/data/redisrepo/store_new_pending_swap_test.go +++ b/data/redisrepo/store_new_pending_swap_test.go @@ -19,7 +19,9 @@ func TestRedisRepo_StoreNewPendingSwap(t *testing.T) { client: db, } - mock.ExpectRPush(CreatePendingSwapTxsKey(), mocks.Pending.PendingToMap()).SetVal(1) + pendingJson, _ := mocks.Pending.ToJson() + + mock.ExpectRPush(CreatePendingSwapTxsKey(), pendingJson).SetVal(1) err := repo.StoreNewPendingSwap(ctx, mocks.Pending) diff --git a/data/redisrepo/store_pending_swaps.go b/data/redisrepo/store_pending_swaps.go index ba4ffe2..fee7461 100644 --- a/data/redisrepo/store_pending_swaps.go +++ b/data/redisrepo/store_pending_swaps.go @@ -10,6 +10,7 @@ import ( "github.com/orbs-network/order-book/utils/logger/logctx" ) +// StorePendingSwaps stores all swaps that are still pending completion in order for their status (pending/complete) to be checked again later func (r *redisRepository) StorePendingSwaps(ctx context.Context, pendingSwaps []models.Pending) error { key := CreatePendingSwapTxsKey() diff --git a/data/store/store.go b/data/store/store.go index f059a59..41cbc80 100644 --- a/data/store/store.go +++ b/data/store/store.go @@ -33,4 +33,5 @@ type OrderBookStore interface { StoreNewPendingSwap(ctx context.Context, pendingSwap models.Pending) error GetPendingSwaps(ctx context.Context) ([]models.Pending, error) StorePendingSwaps(ctx context.Context, pendingSwaps []models.Pending) error + ProcessCompletedSwapOrders(ctx context.Context, orders []*models.Order, swapId uuid.UUID, isSuccessful bool) error } diff --git a/docker-compose.yml b/docker-compose.yml index d35f6a3..479755e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,12 +4,15 @@ services: build: dockerfile: cmd/order-book.Dockerfile context: . + args: + APP_PATH: ./cmd/order-book ports: - "80:8080" environment: - REDIS_URL=redis://db:6379/0 - PORT=8080 - VERIFY_SIGNATURE=true + - RPC_URL=${RPC_URL} depends_on: - db develop: @@ -28,6 +31,25 @@ services: command: redis-server --save 60 1 --loglevel warning volumes: - db:/data + + pending-swaps-tracker: + build: + dockerfile: cmd/order-book.Dockerfile + context: . + args: + APP_PATH: ./cmd/pending-swaps-tracker + environment: + - REDIS_URL=redis://db:6379/0 + - RPC_URL=${RPC_URL} + depends_on: + - db + develop: + watch: + - action: rebuild + files: + - ./**/*.go + - ./go.mod + - ./go.sum volumes: db: driver: local diff --git a/mocks/service.go b/mocks/service.go index 8e265e0..375d70d 100644 --- a/mocks/service.go +++ b/mocks/service.go @@ -151,3 +151,7 @@ func (m *MockOrderBookStore) GetPendingSwaps(ctx context.Context) ([]models.Pend func (m *MockOrderBookStore) StorePendingSwaps(ctx context.Context, pendingSwaps []models.Pending) error { return m.Error } + +func (m *MockOrderBookStore) ProcessCompletedSwapOrders(ctx context.Context, orders []*models.Order, swapId uuid.UUID, isSuccessful bool) error { + return m.Error +} diff --git a/service/evm_check_pending_txs.go b/service/evm_check_pending_txs.go index deaf499..88068be 100644 --- a/service/evm_check_pending_txs.go +++ b/service/evm_check_pending_txs.go @@ -12,12 +12,17 @@ import ( ) func (e *EvmClient) CheckPendingTxs(ctx context.Context) error { + logctx.Info(ctx, "Checking pending transactions...") pendingSwaps, err := e.orderBookStore.GetPendingSwaps(ctx) if err != nil { logctx.Error(ctx, "Failed to get pending swaps", logger.Error(err)) return err } - fmt.Printf("pendingSwaps BEFORE: %#v\n", pendingSwaps) + + if len(pendingSwaps) == 0 { + logctx.Info(ctx, "No pending transactions. Sleeping...") + return nil + } var wg sync.WaitGroup var mu sync.Mutex @@ -33,7 +38,6 @@ func (e *EvmClient) CheckPendingTxs(ctx context.Context) error { tx, err := e.blockchainStore.GetTx(ctx, p.TxHash) if err != nil { if err == models.ErrNotFound { - fmt.Println("Transaction not found") logctx.Error(ctx, "Transaction not found but should be valid", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) } else { logctx.Error(ctx, "Failed to get transaction", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) @@ -42,36 +46,37 @@ func (e *EvmClient) CheckPendingTxs(ctx context.Context) error { } if tx == nil { - fmt.Println("Transaction not found") logctx.Error(ctx, "Transaction not found but should be valid", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) return } switch tx.Status { case models.TX_SUCCESS: - // Process successful transaction - fmt.Println("Transaction successful ----->") - _, err = e.processSuccessfulTransaction(ctx, p, &mu) + logctx.Info(ctx, "Transaction successful", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) + _, err = e.processCompletedTransaction(ctx, p, true, &mu) if err != nil { logctx.Error(ctx, "Failed to process successful transaction", logger.Error(err), logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) return } - case models.TX_FAILURE: - fmt.Printf("Transaction %s failed\n", p.TxHash) - break + logctx.Info(ctx, "Transaction failed", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) + _, err = e.processCompletedTransaction(ctx, p, false, &mu) + if err != nil { + logctx.Error(ctx, "Failed to process failed transaction", logger.Error(err), logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) + return + } case models.TX_PENDING: - fmt.Printf("Transaction %s is still pending\n", p.TxHash) - break + logctx.Info(ctx, "Transaction still pending", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) + ptxs = append(ptxs, p) default: - fmt.Printf("Transaction %s has unknown status\n", p.TxHash) - break + logctx.Error(ctx, "Unknown transaction status", logger.String("txHash", p.TxHash), logger.String("swapId", p.SwapId.String())) + return } }(i) } - wg.Wait() // Wait for all goroutines to complete + wg.Wait() mu.Lock() err = e.orderBookStore.StorePendingSwaps(ctx, ptxs) @@ -82,11 +87,11 @@ func (e *EvmClient) CheckPendingTxs(ctx context.Context) error { return err } - fmt.Println("Updated pending swaps list stored in Redis") + logctx.Info(ctx, "Finished checking pending transactions", logger.Int("numPending", len(ptxs))) return nil } -func (e *EvmClient) processSuccessfulTransaction(ctx context.Context, p models.Pending, mu *sync.Mutex) ([]models.Order, error) { +func (e *EvmClient) processCompletedTransaction(ctx context.Context, p models.Pending, isSuccessful bool, mu *sync.Mutex) ([]models.Order, error) { mu.Lock() defer mu.Unlock() @@ -107,20 +112,23 @@ func (e *EvmClient) processSuccessfulTransaction(ctx context.Context, p models.P return []models.Order{}, fmt.Errorf("failed to get orders: %w", err) } - for _, order := range orders { - isFilled, err := order.MarkSwapSuccess() - if err != nil { - logctx.Error(ctx, "Failed to mark order as filled", logger.Error(err), logger.String("orderId", order.Id.String())) - return []models.Order{}, fmt.Errorf("failed to mark order as filled: %w", err) - } + var swapOrders []*models.Order - if isFilled { - logctx.Info(ctx, "Order is filled", logger.String("orderId", order.Id.String())) - e.orderBookStore.StoreFilledOrders(ctx, []models.Order{order}) + for _, order := range orders { + if isSuccessful { + if _, err := order.MarkSwapSuccess(); err != nil { + logctx.Error(ctx, "Failed to mark order as filled", logger.Error(err), logger.String("orderId", order.Id.String())) + } } else { - logctx.Info(ctx, "Order is partially filled", logger.String("orderId", order.Id.String())) - e.orderBookStore.StoreOpenOrder(ctx, order) + order.MarkSwapFailed() } + swapOrders = append(swapOrders, &order) + } + + err = e.orderBookStore.ProcessCompletedSwapOrders(ctx, swapOrders, p.SwapId, isSuccessful) + if err != nil { + logctx.Error(ctx, "Failed to process completed swap orders", logger.Error(err), logger.String("swapId", p.SwapId.String())) + return []models.Order{}, fmt.Errorf("failed to process completed swap orders: %w", err) } return orders, nil