Skip to content

Commit

Permalink
Working
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke-Rogerson committed Dec 17, 2023
1 parent 44a66c7 commit 584112f
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ __debug_*
*Key.txt
address.txt
coverage.out
.venv
.venv
.makerc
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ LOG_LEVEL ?= info
-include .makerc
export

RPC_URL ?= https://some-rpc-url.com

start:
@echo "Starting server and db..."
@docker-compose up --build -d
Expand Down
4 changes: 2 additions & 2 deletions cmd/order-book.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ RUN go mod download
COPY . ./
RUN find . -type f ! -name "*.go" ! -name "go.mod" ! -name "go.sum" -delete

RUN CGO_ENABLED=0 GOOS=linux go build -o /order-book ./cmd/order-book
ARG APP_PATH

EXPOSE 8080
RUN CGO_ENABLED=0 GOOS=linux go build -o /order-book $APP_PATH

CMD [ "/order-book" ]
79 changes: 79 additions & 0 deletions cmd/pending-swaps-tracker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/orbs-network/order-book/data/evmrepo"
"github.com/orbs-network/order-book/data/redisrepo"
"github.com/orbs-network/order-book/service"
"github.com/redis/go-redis/v9"
)

var defaultDuration = 10 * time.Second

func main() {
redisAddress, found := os.LookupEnv("REDIS_URL")
if !found {
panic("REDIS_URL not set")
}

opt, err := redis.ParseURL(redisAddress)
if err != nil {
panic(fmt.Errorf("failed to parse redis url: %v", err))
}

log.Printf("Redis address: %s", opt.Addr)

rpcUrl, found := os.LookupEnv("RPC_URL")
if !found {
panic("RPC_URL not set")
}

rdb := redis.NewClient(opt)

repository, err := redisrepo.NewRedisRepository(rdb)
if err != nil {
log.Fatalf("error creating repository: %v", err)
}

ethClient, err := ethclient.Dial(rpcUrl)
if err != nil {
log.Fatalf("error creating eth client: %v", err)
}
defer ethClient.Close()

evmRepo, err := evmrepo.NewEvmRepository(ethClient)
if err != nil {
log.Fatalf("error creating evm repository: %v", err)
}

evmClient, err := service.NewEvmSvc(repository, evmRepo)
if err != nil {
log.Fatalf("error creating evm client: %v", err)
}

envDurationStr := os.Getenv("TICKER_DURATION")

tickerDuration, err := time.ParseDuration(envDurationStr)
if err != nil || envDurationStr == "" {
fmt.Printf("Invalid or missing TICKER_DURATION. Using default of %s\n", defaultDuration)
tickerDuration = defaultDuration
}

ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()

ctx := context.Background()

for range ticker.C {
err := evmClient.CheckPendingTxs(ctx)
if err != nil {
log.Printf("error checking pending txs: %v", err)
}
}
}
1 change: 1 addition & 0 deletions data/redisrepo/get_pending_swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/orbs-network/order-book/models"
)

// GetPendingSwaps returns all pending swaps that are waiting to be checked for completion
func (r *redisRepository) GetPendingSwaps(ctx context.Context) ([]models.Pending, error) {
var pendingSwaps []models.Pending

Expand Down
4 changes: 2 additions & 2 deletions data/redisrepo/get_pending_swaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func TestRedisRepo_GetPendingSwaps(t *testing.T) {
}

mock.ExpectLRange(CreatePendingSwapTxsKey(), 0, -1).SetVal([]string{
"{\"swapId\":\"75fff27c-5a98-4f04-9bba-ce932d9c8e67\",\"txHash\":\"cc6fb26e-bc79-4ea3-b761-224893dc7df3\"}",
"{\"swapId\":\"75fff27c-5a98-4f04-9bba-ce932d9c8e67\",\"txHash\":\"0xefd319bb86b954a8e8cd7d9396546db8d3251910209cd8b1b9a674ef8585f226\"}",
})

pendingSwaps, err := repo.GetPendingSwaps(ctx)

assert.NoError(t, err)
assert.Len(t, pendingSwaps, 1)
assert.Equal(t, uuid.MustParse("75fff27c-5a98-4f04-9bba-ce932d9c8e67"), pendingSwaps[0].SwapId)
assert.Equal(t, uuid.MustParse("cc6fb26e-bc79-4ea3-b761-224893dc7df3"), pendingSwaps[0].TxHash)
assert.Equal(t, "0xefd319bb86b954a8e8cd7d9396546db8d3251910209cd8b1b9a674ef8585f226", pendingSwaps[0].TxHash)
})

t.Run("should work with a large list of pending swaps", func(t *testing.T) {
Expand Down
59 changes: 59 additions & 0 deletions data/redisrepo/process_completed_swap_orders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redisrepo

import (
"context"
"fmt"

"github.com/google/uuid"
"github.com/orbs-network/order-book/models"
"github.com/orbs-network/order-book/utils/logger"
"github.com/orbs-network/order-book/utils/logger/logctx"
)

// ProcessCompletedSwapOrders stores the updated swap orders and removes the swap from Redis. It should be called after a swap is completed.
//
// `orders` should be the orders that were part of the swap (with `SizePending` and `SizeFilled` updated accordingly)
//
// `isSuccessful` should be `true` if the swap was successful, `false` otherwise
func (r *redisRepository) ProcessCompletedSwapOrders(ctx context.Context, orders []*models.Order, swapId uuid.UUID, isSuccessful bool) error {
// --- START TRANSACTION ---
transaction := r.client.TxPipeline()

// 1. Store updated swap orders, handle completely filled orders
if isSuccessful {
for _, order := range orders {
if order.IsFilled() {
if err := storeFilledOrderTx(ctx, transaction, order); err != nil {
logctx.Error(ctx, "failed to store filled order in Redis", logger.Error(err), logger.String("orderId", order.Id.String()))
return fmt.Errorf("failed to store filled order in Redis: %v", err)
}
} else {
if err := storeOrderTX(ctx, transaction, order); err != nil {
logctx.Error(ctx, "failed to store open order in Redis", logger.Error(err), logger.String("orderId", order.Id.String()))
return fmt.Errorf("failed to store open order in Redis: %v", err)
}
}
}
} else {
// Store updated orders
for _, order := range orders {
if err := storeOrderTX(ctx, transaction, order); err != nil {
logctx.Error(ctx, "failed to store open order in Redis", logger.Error(err), logger.String("orderId", order.Id.String()))
return fmt.Errorf("failed to store open order in Redis: %v", err)
}
}
}

// 2. Remove the swap
swapKey := CreateSwapKey(swapId)
transaction.Del(ctx, swapKey).Err()

Check failure on line 49 in data/redisrepo/process_completed_swap_orders.go

View workflow job for this annotation

GitHub Actions / build-and-test

Error return value of `(*github.com/redis/go-redis/v9.baseCmd).Err` is not checked (errcheck)

// --- END TRANSACTION ---
_, err := transaction.Exec(ctx)
if err != nil {
logctx.Error(ctx, "failed to execute ProcessCompletedSwapOrders transaction", logger.Error(err))
return fmt.Errorf("failed to execute ProcessCompletedSwapOrders transaction: %v", err)
}

return nil
}
127 changes: 127 additions & 0 deletions data/redisrepo/process_completed_swap_orders_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package redisrepo

import (
"context"
"testing"
"time"

"github.com/go-redis/redismock/v9"
"github.com/google/uuid"
"github.com/orbs-network/order-book/mocks"
"github.com/orbs-network/order-book/models"
"github.com/redis/go-redis/v9"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
)

func TestRedisRepo_ProcessCompletedSwapOrders(t *testing.T) {
ctx := context.Background()

db, mock := redismock.NewClientMock()

repo := &redisRepository{
client: db,
}

swapId := uuid.MustParse("00000000-0000-0000-0000-000000000007")
timestamp := time.Date(2023, 10, 10, 12, 0, 0, 0, time.UTC)

t.Run("should process successful swap with completely filled order and partially filled order", func(t *testing.T) {

filledOrder := models.Order{
SizeFilled: decimal.NewFromFloat(100),
Size: decimal.NewFromFloat(100),
Side: models.SELL,
Timestamp: timestamp,
}

partiallyFilledOrder := models.Order{
SizeFilled: decimal.NewFromFloat(50),
Size: decimal.NewFromFloat(100),
Side: models.SELL,
Timestamp: timestamp,
}

mock.ExpectTxPipeline()

// Store completely filled order
mock.ExpectZRem(CreateUserOpenOrdersKey(filledOrder.UserId), filledOrder.Id.String()).SetVal(1)
mock.ExpectZAdd(CreateUserFilledOrdersKey(filledOrder.UserId), redis.Z{
Score: float64(timestamp.UTC().UnixNano()),
Member: filledOrder.Id.String(),
}).SetVal(1)
mock.ExpectZRem(CreateSellSidePricesKey(filledOrder.Symbol), filledOrder.Id.String()).SetVal(1)
mock.ExpectHSet(CreateOrderIDKey(filledOrder.Id), filledOrder.OrderToMap()).SetVal(1)

// Store partially filled order
mock.ExpectZAdd(CreateUserOpenOrdersKey(partiallyFilledOrder.UserId), redis.Z{
Score: float64(timestamp.UTC().UnixNano()),
Member: partiallyFilledOrder.Id.String(),
}).SetVal(1)
mock.ExpectHSet(CreateOrderIDKey(partiallyFilledOrder.Id), partiallyFilledOrder.OrderToMap()).SetVal(1)
mock.ExpectSet(CreateClientOIDKey(partiallyFilledOrder.ClientOId), partiallyFilledOrder.Id.String(), 0).SetVal("OK")
f64Price, _ := partiallyFilledOrder.Price.Float64()
timestamp := float64(partiallyFilledOrder.Timestamp.UTC().UnixNano()) / 1e9
score := f64Price + (timestamp / 1e12)
mock.ExpectZAdd(CreateSellSidePricesKey(partiallyFilledOrder.Symbol), redis.Z{
Score: score,
Member: partiallyFilledOrder.Id.String(),
}).SetVal(1)

// Remove swap
mock.ExpectDel(CreateSwapKey(swapId)).SetVal(1)

mock.ExpectTxPipelineExec()

err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&filledOrder, &partiallyFilledOrder}, swapId, true)
assert.NoError(t, err)
})

t.Run("should process failed swap", func(t *testing.T) {
orderToBeRolledback := models.Order{
SizeFilled: decimal.NewFromFloat(20),
Size: decimal.NewFromFloat(100),
SizePending: decimal.NewFromFloat(0),
Side: models.SELL,
Timestamp: timestamp,
}

mock.ExpectTxPipeline()

// Store order
mock.ExpectZAdd(CreateUserOpenOrdersKey(orderToBeRolledback.UserId), redis.Z{
Score: float64(timestamp.UTC().UnixNano()),
Member: orderToBeRolledback.Id.String(),
}).SetVal(1)
mock.ExpectHSet(CreateOrderIDKey(orderToBeRolledback.Id), orderToBeRolledback.OrderToMap()).SetVal(1)
mock.ExpectSet(CreateClientOIDKey(orderToBeRolledback.ClientOId), orderToBeRolledback.Id.String(), 0).SetVal("OK")
f64Price, _ := orderToBeRolledback.Price.Float64()
timestamp := float64(orderToBeRolledback.Timestamp.UTC().UnixNano()) / 1e9
score := f64Price + (timestamp / 1e12)
mock.ExpectZAdd(CreateSellSidePricesKey(orderToBeRolledback.Symbol), redis.Z{
Score: score,
Member: orderToBeRolledback.Id.String(),
}).SetVal(1)

// Remove swap
mock.ExpectDel(CreateSwapKey(swapId)).SetVal(1)

mock.ExpectTxPipelineExec()

err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&orderToBeRolledback}, swapId, false)
assert.NoError(t, err)
})

t.Run("no database writes should happen if part of transaction fails", func(t *testing.T) {

mock.ExpectTxPipeline()
mock.ExpectZAdd(CreateUserOpenOrdersKey(mocks.Order.UserId), redis.Z{
Score: float64(timestamp.UTC().UnixNano()),
Member: mocks.Order.Id.String(),
}).SetErr(assert.AnError)

err := repo.ProcessCompletedSwapOrders(ctx, []*models.Order{&mocks.Order}, swapId, true)
assert.ErrorContains(t, err, "failed to execute ProcessCompletedSwapOrders transaction")
})

}
8 changes: 4 additions & 4 deletions data/redisrepo/store_filled_orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (r *redisRepository) StoreFilledOrders(ctx context.Context, orders []models
transaction := r.client.TxPipeline()

for _, order := range orders {
err := storeFilledOrderTx(ctx, transaction, order)
err := storeFilledOrderTx(ctx, transaction, &order)
if err != nil {
return err
}
Expand All @@ -31,7 +31,7 @@ func (r *redisRepository) StoreFilledOrders(ctx context.Context, orders []models
return nil
}

func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order models.Order) error {
func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order *models.Order) error {
// 1. Remove the order from the user's open orders set
userOrdersKey := CreateUserOpenOrdersKey(order.UserId)
transaction.ZRem(ctx, userOrdersKey, order.Id.String())
Expand All @@ -45,11 +45,11 @@ func storeFilledOrderTx(ctx context.Context, transaction redis.Pipeliner, order
})

// 3. Remove the order from the buy/sell prices set for that pair
buyPricesKey := CreateBuySidePricesKey(order.Symbol)
sellPricesKey := CreateSellSidePricesKey(order.Symbol)
if order.Side == models.BUY {
buyPricesKey := CreateBuySidePricesKey(order.Symbol)
transaction.ZRem(ctx, buyPricesKey, order.Id.String())
} else {
sellPricesKey := CreateSellSidePricesKey(order.Symbol)
transaction.ZRem(ctx, sellPricesKey, order.Id.String())
}

Expand Down
1 change: 1 addition & 0 deletions data/redisrepo/store_new_pending_swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/orbs-network/order-book/utils/logger/logctx"
)

// StoreNewPendingSwap stores a new pending swap in order for its status (pending/complete) to be checked later
func (r *redisRepository) StoreNewPendingSwap(ctx context.Context, p models.Pending) error {
key := CreatePendingSwapTxsKey()

Expand Down
4 changes: 3 additions & 1 deletion data/redisrepo/store_new_pending_swap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func TestRedisRepo_StoreNewPendingSwap(t *testing.T) {
client: db,
}

mock.ExpectRPush(CreatePendingSwapTxsKey(), mocks.Pending.PendingToMap()).SetVal(1)
pendingJson, _ := mocks.Pending.ToJson()

mock.ExpectRPush(CreatePendingSwapTxsKey(), pendingJson).SetVal(1)

err := repo.StoreNewPendingSwap(ctx, mocks.Pending)

Expand Down
1 change: 1 addition & 0 deletions data/redisrepo/store_pending_swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/orbs-network/order-book/utils/logger/logctx"
)

// StorePendingSwaps stores all swaps that are still pending completion in order for their status (pending/complete) to be checked again later
func (r *redisRepository) StorePendingSwaps(ctx context.Context, pendingSwaps []models.Pending) error {
key := CreatePendingSwapTxsKey()

Expand Down
1 change: 1 addition & 0 deletions data/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ type OrderBookStore interface {
StoreNewPendingSwap(ctx context.Context, pendingSwap models.Pending) error
GetPendingSwaps(ctx context.Context) ([]models.Pending, error)
StorePendingSwaps(ctx context.Context, pendingSwaps []models.Pending) error
ProcessCompletedSwapOrders(ctx context.Context, orders []*models.Order, swapId uuid.UUID, isSuccessful bool) error
}
Loading

0 comments on commit 584112f

Please sign in to comment.