diff --git a/database/liquidity.go b/database/liquidity.go index dc17bcf..c65197d 100644 --- a/database/liquidity.go +++ b/database/liquidity.go @@ -34,16 +34,18 @@ VALUES ($1, $2, $3, $4, $5, $6)` func (db *CyberDb) SaveSwap( address string, poolID uint64, + orderPrice sdk.Dec, swapPrice sdk.Dec, exchangedOfferCoin sdk.Coin, exchangedDemandCoin sdk.Coin, exchangedOfferCoinFee sdk.Coin, exchangedDemandCoinFee sdk.Coin, height int64, + timestamp time.Time, ) error { stmt := ` -INSERT INTO swaps (pool_id, address, swap_price, exchanged_offer_coin, exchanged_demand_coin, exchanged_offer_coin_fee, exchanged_demand_coin_fee, height) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` +INSERT INTO swaps (pool_id, address, order_price, swap_price, exchanged_offer_coin, exchanged_demand_coin, exchanged_offer_coin_fee, exchanged_demand_coin_fee, height, timestamp) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` exchangedOfferCoinDb := bddbtypes.DbCoin{Amount: exchangedOfferCoin.Amount.String(), Denom: exchangedOfferCoin.Denom} exchangedOfferCoinValue, err := exchangedOfferCoinDb.Value() if err != nil { @@ -68,12 +70,49 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` stmt, poolID, address, + orderPrice.String(), swapPrice.String(), exchangedOfferCoinValue, exchangedDemandCoinValue, exchangedOfferCoinFeeValue, exchangedDemandCoinFeeValue, height, + timestamp, + ) + return err +} + +func (db *CyberDb) SaveFailedSwap( + address string, + poolID uint64, + orderPrice sdk.Dec, + exchangedOfferCoin sdk.Coin, + remainingOfferCoin sdk.Coin, + height int64, + timestamp time.Time, +) error { + stmt := ` +INSERT INTO failed_swaps (pool_id, address, order_price, exchanged_offer_coin, remaining_offer_coin, height, timestamp) +VALUES ($1, $2, $3, $4, $5, $6, $7)` + exchangedOfferCoinDb := bddbtypes.DbCoin{Amount: exchangedOfferCoin.Amount.String(), Denom: exchangedOfferCoin.Denom} + exchangedOfferCoinValue, err := exchangedOfferCoinDb.Value() + if err != nil { + return fmt.Errorf("error while converting coin to dbcoin: %s", err) + } + exchangedDemandCoinDb := bddbtypes.DbCoin{Amount: remainingOfferCoin.Amount.String(), Denom: remainingOfferCoin.Denom} + exchangedDemandCoinValue, err := exchangedDemandCoinDb.Value() + if err != nil { + return fmt.Errorf("error while converting coin to dbcoin: %s", err) + } + _, err = db.Sql.Exec( + stmt, + poolID, + address, + orderPrice.String(), + exchangedOfferCoinValue, + exchangedDemandCoinValue, + height, + timestamp, ) return err } diff --git a/database/schema/08-liquidity.sql b/database/schema/08-liquidity.sql index 31e0466..ef17fe3 100644 --- a/database/schema/08-liquidity.sql +++ b/database/schema/08-liquidity.sql @@ -1,28 +1,43 @@ -CREATE TABLE swaps +CREATE TABLE pools ( pool_id BIGINT NOT NULL PRIMARY KEY, + pool_name TEXT NOT NULL, + address TEXT NOT NULL, + a_denom TEXT NOT NULL, + b_denom TEXT NOT NULL, + pool_denom TEXT NOT NULL +); + +CREATE TABLE swaps +( + id SERIAL PRIMARY KEY, + pool_id BIGINT NOT NULL REFERENCES pools (pool_id), address TEXT NOT NULL, + order_price TEXT NOT NULL, swap_price TEXT NOT NULL, exchanged_offer_coin COIN NOT NULL, exchanged_demand_coin COIN NOT NULL, exchanged_offer_coin_fee COIN NOT NULL, exchanged_demand_coin_fee COIN NOT NULL, - height BIGINT NOT NULL REFERENCES block (height) + height BIGINT NOT NULL REFERENCES block (height), + timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL ); -CREATE TABLE pools +CREATE TABLE failed_swaps ( - pool_id BIGINT NOT NULL PRIMARY KEY, - pool_name TEXT NOT NULL, + id SERIAL PRIMARY KEY, + pool_id BIGINT NOT NULL REFERENCES pools (pool_id), address TEXT NOT NULL, - a_denom TEXT NOT NULL, - b_denom TEXT NOT NULL, - pool_denom TEXT NOT NULL + order_price TEXT NOT NULL, + exchanged_offer_coin COIN NOT NULL, + remaining_offer_coin COIN NOT NULL, + height BIGINT NOT NULL REFERENCES block (height), + timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL ); CREATE TABLE pools_volumes ( - pool_id BIGINT NOT NULL, + pool_id BIGINT NOT NULL REFERENCES pools (pool_id), volume_a BIGINT NOT NULL, volume_b BIGINT NOT NULL, fee_a BIGINT NOT NULL, @@ -32,7 +47,7 @@ CREATE TABLE pools_volumes CREATE TABLE pools_liquidity ( - pool_id BIGINT NOT NULL, + pool_id BIGINT NOT NULL REFERENCES pools (pool_id), liquidity_a BIGINT NOT NULL, liquidity_b BIGINT NOT NULL, timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL @@ -40,7 +55,7 @@ CREATE TABLE pools_liquidity CREATE TABLE pools_rates ( - pool_id BIGINT NOT NULL, + pool_id BIGINT NOT NULL REFERENCES pools (pool_id), rate TEXT NOT NULL, timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL ); \ No newline at end of file diff --git a/modules/liquidity/handle_block.go b/modules/liquidity/handle_block.go index 9390ee8..62426dd 100644 --- a/modules/liquidity/handle_block.go +++ b/modules/liquidity/handle_block.go @@ -3,7 +3,6 @@ package liquidity import ( "fmt" sdk "github.com/cosmos/cosmos-sdk/types" - dbtypes "github.com/cybercongress/cyberindex/database" "github.com/forbole/juno/v3/types" "github.com/rs/zerolog/log" liquiditytypes "github.com/tendermint/liquidity/x/liquidity/types" @@ -26,8 +25,9 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve attrs := eventAttrsFromEvent(event) status, err := attrs.SwapStatus() if err != nil { - return err + continue } + if status { addr, err := attrs.SwapRequesterAddr() if err != nil { @@ -43,24 +43,44 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve return err } + orderPrice, err := attrs.OrderPrice() + if err != nil { + return err + } + exchangedOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueExchangedOfferCoinAmount) + if err != nil { + panic(err) + } exchangedDemandCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueDemandCoinDenom, liquiditytypes.AttributeValueExchangedDemandCoinAmount) + if err != nil { + panic(err) + } exchangedOfferCoinFee, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueOfferCoinFeeAmount) + if err != nil { + panic(err) + } feeDec, err := attrs.DecCoinAttrs(liquiditytypes.AttributeValueDemandCoinDenom, liquiditytypes.AttributeValueExchangedCoinFeeAmount) + if err != nil { + panic(err) + } exchangedDemandCoinFee := sdk.NewCoin(feeDec.Denom, feeDec.Amount.Ceil().TruncateInt()) err = m.db.SaveSwap( addr, poolID, + orderPrice, swapPrice, exchangedOfferCoin, exchangedDemandCoin, exchangedOfferCoinFee, exchangedDemandCoinFee, height, + timestamp, ) if err != nil { fmt.Errorf("error while saving swap: %s", err) + panic(err) } poolsRateMap[poolID] = swapPrice @@ -89,7 +109,9 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve timestamp, ) if err != nil { - fmt.Errorf("error while saving volume: %s", err) + log.Error().Str("module", "liquidity").Err(fmt.Errorf("error while saving volume: %s", err)) + // TODO fails of pool 20 (ibc/4B322204B4F59D770680FE4D7A565DDC3F37BFF035474B717476C66A4F83DD72 - decimal 18) + //panic(err) } } @@ -101,6 +123,7 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve ) if err != nil { fmt.Errorf("error while saving rate: %s", err) + panic(err) } } @@ -108,39 +131,15 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve // return err nil if not found and default pool row, later will panic on error pool, err := m.db.GetPoolInfo(poolID) if err == nil { - // if we sync from non zero and parse current block - // than save pool not from message but with query from application state - if len(pool.Address) == 0 { - poolState, err := m.keeper.GetPool(poolID, height) - if err != nil { - panic(err) - } - pool = dbtypes.PoolRow{ - PoolId: int64(poolState.Id), - PoolName: poolState.Name(), - Address: poolState.ReserveAccountAddress, - ADenom: poolState.ReserveCoinDenoms[0], - BDenom: poolState.ReserveCoinDenoms[1], - PoolDenom: poolState.PoolCoinDenom, - } - err = m.db.SavePool( - poolState.Id, - poolState.ReserveAccountAddress, - poolState.Name(), - poolState.ReserveCoinDenoms[0], - poolState.ReserveCoinDenoms[1], - poolState.PoolCoinDenom, - ) - if err != nil { - panic(err) - } - } - + // NOTE - already updated as transacted, but check the flow anyway err := m.bankModule.RefreshBalances(height, []string{pool.Address}) if err != nil { + //error while storing up-to-date balances: error while storing up-to-date balances: pq: insert or update on table "account_balance" violates foreign key constraint "account_balance_address_fkey" log.Debug().Str("module", "liquidity").Err(err) + //panic(err) } - // if not in db that query application state + + // NOTE if not in db than query application state poolBalances, err := m.db.GetAccountBalance(pool.Address) if len(poolBalances) == 0 { balances, err := m.bankModule.GetBalances([]string{pool.Address}, height) @@ -166,13 +165,52 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve log.Debug().Str("module", "liquidity").Err(err) } } + } else { + fmt.Errorf("error while saving swap: %s", "FAILED_SWAP") + + addr, err := attrs.SwapRequesterAddr() + if err != nil { + return err + } + poolID, err := attrs.PoolID() + if err != nil { + return err + } + + orderPrice, err := attrs.OrderPrice() + if err != nil { + return err + } + + exchangedOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueExchangedOfferCoinAmount) + if err != nil { + panic(err) + } + remainingOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueRemainingOfferCoinAmount) + if err != nil { + panic(err) + } + + err = m.db.SaveFailedSwap( + addr, + poolID, + orderPrice, + exchangedOfferCoin, + remainingOfferCoin, + height, + timestamp, + ) + if err != nil { + fmt.Errorf("error while saving swap: %s", err) + panic(err) + } } } return nil } -// not used, refactor +// not used, to delete before merge func (m *Module) executePoolCreations(height int64, beginBlockEvents []abcitypes.Event, timestamp time.Time) error { events := types.FindEventsByType(beginBlockEvents, liquiditytypes.EventTypeCreatePool) for _, event := range events { @@ -311,6 +349,18 @@ func (attrs EventAttributes) SwapPrice() (sdk.Dec, error) { return d, nil } +func (attrs EventAttributes) OrderPrice() (sdk.Dec, error) { + v, err := attrs.Attr(liquiditytypes.AttributeValueOrderPrice) + if err != nil { + return sdk.Dec{}, err + } + d, err := sdk.NewDecFromStr(v) + if err != nil { + return sdk.Dec{}, fmt.Errorf("parse swap price: %w", err) + } + return d, nil +} + func (attrs EventAttributes) CoinAttrs(denomKey, amountKey string) (sdk.Coin, error) { denom, err := attrs.Attr(denomKey) if err != nil { @@ -383,7 +433,6 @@ func (attrs EventAttributes) ReserveAccount() (string, error) { return v, nil } -//sdk.NewAttribute(types.AttributeValueDepositCoins, msg.DepositCoins.String()), func (attrs EventAttributes) PoolDepositCoins() ([]sdk.Coin, error) { v, err := attrs.Attr(liquiditytypes.AttributeValueDepositCoins) coins := strings.Split(v, ",") diff --git a/modules/liquidity/handle_periodic_operations.go b/modules/liquidity/handle_periodic_operations.go index ad722ed..202c525 100644 --- a/modules/liquidity/handle_periodic_operations.go +++ b/modules/liquidity/handle_periodic_operations.go @@ -1,13 +1,54 @@ package liquidity import ( + "github.com/cybercongress/cyberindex/database" "github.com/go-co-op/gocron" "github.com/rs/zerolog/log" ) -// RegisterPeriodicOperations implements modules.Module func (m *Module) RegisterPeriodicOperations(scheduler *gocron.Scheduler) error { log.Debug().Str("module", "liquidity").Msg("setting up periodic tasks") return nil } + +func (m *Module) RunAdditionalOperations() error { + log.Debug().Str("module", "liquidity").Msg("running additional operation") + + err := m.syncPoolsInfo() + if err != nil { + log.Debug().Str("module", "liquidity").Err(err) + panic(err) + } + + return nil +} + +func (m *Module) syncPoolsInfo() error { + stmt := `SELECT * FROM pools` + var rows []database.PoolRow + err := m.db.Sqlx.Select(&rows, stmt) + if err != nil { + return err + } + pools, err := m.keeper.GetAllPools(0) + if err != nil { + return err + } + if len(rows) != len(pools) { + for _, pool := range pools { + err := m.db.SavePool( + pool.Id, + pool.ReserveAccountAddress, + pool.Name(), + pool.ReserveCoinDenoms[0], + pool.ReserveCoinDenoms[1], + pool.PoolCoinDenom, + ) + if err != nil { + log.Debug().Str("module", "liquidity").Err(err) + } + } + } + return nil +} diff --git a/modules/liquidity/module.go b/modules/liquidity/module.go index 00156ca..fb832b6 100644 --- a/modules/liquidity/module.go +++ b/modules/liquidity/module.go @@ -12,10 +12,11 @@ import ( ) var ( - _ modules.Module = &Module{} - _ modules.MessageModule = &Module{} - _ modules.BlockModule = &Module{} - _ modules.PeriodicOperationsModule + _ modules.Module = &Module{} + _ modules.MessageModule = &Module{} + _ modules.BlockModule = &Module{} + _ modules.PeriodicOperationsModule = &Module{} + _ modules.AdditionalOperationsModule = &Module{} ) type Module struct { diff --git a/modules/liquidity/source/remote/source.go b/modules/liquidity/source/remote/source.go index 0a9ef8d..a638b51 100644 --- a/modules/liquidity/source/remote/source.go +++ b/modules/liquidity/source/remote/source.go @@ -37,6 +37,16 @@ func (s Source) GetPool(poolID uint64, height int64) (liquiditytypes.Pool, error return res.Pool, nil } +func (s Source) GetAllPools(height int64) ([]liquiditytypes.Pool, error) { + header := GetHeightRequestHeader(height) + res, err := s.liquidityClient.LiquidityPools(s.Ctx, &liquiditytypes.QueryLiquidityPoolsRequest{}, header) + if err != nil { + panic(err) + } + + return res.Pools, err +} + func GetHeightRequestHeader(height int64) grpc.CallOption { header := metadata.New(map[string]string{ grpctypes.GRPCBlockHeightHeader: strconv.FormatInt(height, 10), diff --git a/modules/liquidity/source/source.go b/modules/liquidity/source/source.go index 34bcb87..bfb827c 100644 --- a/modules/liquidity/source/source.go +++ b/modules/liquidity/source/source.go @@ -6,4 +6,5 @@ import ( type Source interface { GetPool(poolID uint64, height int64) (types.Pool, error) + GetAllPools(height int64) ([]types.Pool, error) }