Skip to content

Commit

Permalink
fix: reduce load
Browse files Browse the repository at this point in the history
  • Loading branch information
Majorfi committed Jun 19, 2023
1 parent 05b47d8 commit d2f1e88
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 186 deletions.
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func main() {
wg.Wait()

logs.Success(`Server ready on port 8080 !`)
// pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)

select {}

case ProcessPartnerFees:
Expand Down
23 changes: 23 additions & 0 deletions common/store/store.getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,26 @@ func ListAllStrategiess(chainID uint64) (

return asMap, asSlice
}

/**************************************************************************************************
** GetStrategy will return a strategy stored in the caching system for a given chainID and address.
**************************************************************************************************/
func GetStrategy(chainID uint64, strategyAddress common.Address) (models.TStrategyAdded, bool) {
/**********************************************************************************************
** We first retrieve the syncMap. This syncMap should be initialized first via the
** `LoadStrategies` function which take the data from the database/badger and store it in it.
**********************************************************************************************/
syncMap := _strategiesSyncMap[chainID]
if syncMap == nil {
syncMap = &sync.Map{}
_strategiesSyncMap[chainID] = syncMap
}

/**********************************************************************************************
** We can just iterate over the syncMap and add the strategies to the map and slice.
** As the stored vault data are only a subset of static, we need to use the actual structure
** and not the DBVault one.
**********************************************************************************************/
strategy, ok := syncMap.Load(strategyAddress)
return strategy.(models.TStrategyAdded), ok
}
50 changes: 37 additions & 13 deletions common/store/store.setter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/yearn/ydaemon/common/logs"
"github.com/yearn/ydaemon/internal/models"
"golang.org/x/time/rate"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

var storeRateLimiter = rate.NewLimiter(2, 4)
Expand Down Expand Up @@ -91,6 +93,34 @@ func StoreHistoricalPrice(chainID uint64, blockNumber uint64, tokenAddress commo
}
}

/**************************************************************************************************
** StoreManyHistoricalPrice is the same as StoreHistoricalPrice but for many prices at once.
**************************************************************************************************/
func StoreManyHistoricalPrice(items []DBHistoricalPrice) {
switch _dbType {
case DBBadger:
// Not implemented
case DBSql:
go func() {
storeRateLimiter.Wait(context.Background())
DATABASE.
Table(`db_historical_prices`).
Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(items)
}()
}
}

/**************************************************************************************************
** AppendInHistoricalMap is the same as StoreHistoricalPrice but only to store
**************************************************************************************************/
func AppendInHistoricalMap(chainID uint64, blockNumber uint64, tokenAddress common.Address, price *bigNumber.Int) {
syncMap := _historicalPriceSyncMap[chainID]
key := strconv.FormatUint(blockNumber, 10) + "_" + tokenAddress.Hex()
syncMap.Store(key, price)
}

/**************************************************************************************************
** StoreNewVaultsFromRegistry will store a new vault in the _newVaultsFromRegistrySyncMap for fast
** access during that same execution, and will store it in the configured DB for future executions.
Expand Down Expand Up @@ -293,22 +323,16 @@ func StoreSyncRegistry(chainID uint64, registryAddess common.Address, end *uint6
** StoreSyncStrategiesAdded will store the sync status indicating we went up to the block number
** to check for new strategies added.
**************************************************************************************************/
func StoreSyncStrategiesAdded(chainID uint64, vaultAddress common.Address, end *uint64) {
func StoreSyncStrategiesAdded(itemsToUpsert []DBStrategyAddedSync) {
switch _dbType {
case DBBadger:
// Not implemented
case DBSql:
go func() {
storeRateLimiter.Wait(context.Background())
DATABASE.Table("db_strategy_added_syncs").
Where("chain_id = ? AND vault = ?", chainID, vaultAddress.Hex()).
Where("block <= ?", end).
Assign(DBStrategyAddedSync{Block: *end}).
FirstOrCreate(&DBStrategyAddedSync{
ChainID: chainID,
Vault: vaultAddress.Hex(),
UUID: GetUUID(vaultAddress.Hex() + strconv.FormatUint(chainID, 10)),
})
}()

DATABASE.Table("db_strategy_added_syncs").
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "chain_id"}, {Name: "vault"}},
DoUpdates: clause.Assignments(map[string]interface{}{"block": gorm.Expr("GREATEST(db_strategy_added_syncs.block, EXCLUDED.block)")}),
}).Create(&itemsToUpsert)
}
}
48 changes: 42 additions & 6 deletions internal/events/filterStrategyAdded.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func filterStrategyAdded(
start uint64,
end *uint64,
syncMap *sync.Map,
) {
) *uint64 {
client := ethereum.GetRPC(vault.ChainID)

/**********************************************************************************************
Expand Down Expand Up @@ -159,7 +159,7 @@ func filterStrategyAdded(
** We are storing in the DB the sync status, indicating we went up to the block number to check
** for new vaults.
**********************************************************************************************/
go store.StoreSyncStrategiesAdded(vault.ChainID, vault.Address, end)
return end
}

/**************************************************************************************************
Expand All @@ -181,7 +181,7 @@ func filterStrategiesMigrated(
start uint64,
end *uint64,
syncMap *sync.Map,
) {
) *uint64 {
/**************************************************************************************************
** First we make sure to connect with our RPC client and get the vault contract.
**************************************************************************************************/
Expand Down Expand Up @@ -251,7 +251,7 @@ func filterStrategiesMigrated(
** We are storing in the DB the sync status, indicating we went up to the block number to check
** for new vaults.
**********************************************************************************************/
go store.StoreSyncStrategiesAdded(vault.ChainID, vault.Address, end)
return end
}

/**************************************************************************************************
Expand Down Expand Up @@ -296,33 +296,69 @@ func HandleStrategyAdded(
**********************************************************************************************/
asyncStrategiesForVaults := &sync.Map{}
asyncStrategiesMigrated := &sync.Map{}
asyncVaultEnd := &sync.Map{}

for _, v := range vaultsMap {
wg := &sync.WaitGroup{}
wg.Add(2)
go func(v models.TVaultsFromRegistry) {
defer wg.Done()
filterStrategyAdded(
end := filterStrategyAdded(
v,
vaultsLastBlockSync,
start,
end,
asyncStrategiesForVaults,
)

endArr, ok := asyncVaultEnd.Load(v.Address)
if !ok {
endArr = []uint64{}
}
endArr = append(endArr.([]uint64), *end)
asyncVaultEnd.Store(v.Address, endArr)
}(v)
go func(v models.TVaultsFromRegistry) {
defer wg.Done()
filterStrategiesMigrated(
end := filterStrategiesMigrated(
v,
vaultsLastBlockSync,
start,
end,
asyncStrategiesMigrated,
)
endArr, ok := asyncVaultEnd.Load(v.Address)
if !ok {
endArr = []uint64{}
}
endArr = append(endArr.([]uint64), *end)
asyncVaultEnd.Store(v.Address, endArr)
}(v)
wg.Wait()
}

itemsToUpsert := []store.DBStrategyAddedSync{}
for _, v := range vaultsMap {
endArr, ok := asyncVaultEnd.Load(v.Address)
if !ok {
endArr = []uint64{}
}
end := uint64(0)
for _, e := range endArr.([]uint64) {
if e > end {
end = e
}
}
itemsToUpsert = append(itemsToUpsert, store.DBStrategyAddedSync{
ChainID: chainID,
Vault: v.Address.Hex(),
UUID: store.GetUUID(v.Address.Hex() + strconv.FormatUint(chainID, 10)),
Block: end,
})

go store.StoreSyncStrategiesAdded(itemsToUpsert)
}

/**********************************************************************************************
** Once all vaults StrategyAdded updates have been retrieved, we need to extract them from the
** sync.Map.
Expand Down
48 changes: 33 additions & 15 deletions internal/meta/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package meta
import (
"encoding/json"
"strconv"
"sync"

"github.com/yearn/ydaemon/common/env"
"github.com/yearn/ydaemon/common/helpers"
Expand All @@ -16,16 +17,31 @@ import (
** The _metaProtocolMap variable is not exported and is only used internally by the functions
** below.
**********************************************************************************************/
var _metaProtocolMap = make(map[uint64]map[string]*models.TProtocolsFromMeta)
var _metaProtocolMap = make(map[uint64]*sync.Map)

func initOrGetMetaProtocolMap(chainID uint64) *sync.Map {
syncMap := _metaProtocolMap[chainID]
if syncMap == nil {
syncMap = &sync.Map{}
_metaProtocolMap[chainID] = syncMap
}
return syncMap
}

func init() {
for _, chainID := range env.SUPPORTED_CHAIN_IDS {
if _, ok := _metaProtocolMap[chainID]; !ok {
_metaProtocolMap[chainID] = &sync.Map{}
}
}
}

/**********************************************************************************************
** setProtocolInMap will put a TProtocolsFromMeta in the _metaProtocolMap variable.
**********************************************************************************************/
func setProtocolInMap(chainID uint64, protocol *models.TProtocolsFromMeta) {
if _, ok := _metaProtocolMap[chainID]; !ok {
_metaProtocolMap[chainID] = make(map[string]*models.TProtocolsFromMeta)
}
_metaProtocolMap[chainID][protocol.Name] = protocol
syncMap := initOrGetMetaProtocolMap(chainID)
syncMap.Store(protocol.Name, protocol)
}

/**********************************************************************************************
Expand All @@ -35,24 +51,26 @@ func setProtocolInMap(chainID uint64, protocol *models.TProtocolsFromMeta) {
** not.
**********************************************************************************************/
func GetMetaProtocol(chainID uint64, protocolName string) (*models.TProtocolsFromMeta, bool) {
if protocolsForChain, ok := _metaProtocolMap[chainID]; ok {
if protocol, ok := protocolsForChain[protocolName]; ok {
return protocol, true
}
syncMap := initOrGetMetaProtocolMap(chainID)
data, ok := syncMap.Load(protocolName)
if !ok {
return nil, false
}
return nil, false
return data.(*models.TProtocolsFromMeta), true
}

/**********************************************************************************************
** ListMetaProtocol will, for a given chainID, list all the protocols from the _metaProtocolMap
** variable.
**********************************************************************************************/
func ListMetaProtocol(chainID uint64) []*models.TProtocolsFromMeta {
var protocols []*models.TProtocolsFromMeta
for _, protocol := range _metaProtocolMap[chainID] {
protocols = append(protocols, protocol)
}
return protocols
syncMap := initOrGetMetaProtocolMap(chainID)
var retValue []*models.TProtocolsFromMeta
syncMap.Range(func(_, data interface{}) bool {
retValue = append(retValue, data.(*models.TProtocolsFromMeta))
return true
})
return retValue
}

/**************************************************************************************************
Expand Down
42 changes: 26 additions & 16 deletions internal/meta/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package meta
import (
"encoding/json"
"strconv"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/yearn/ydaemon/common/env"
Expand All @@ -17,12 +18,21 @@ import (
** The _metaStrategyMap variable is not exported and is only used internally by the functions
** below.
**********************************************************************************************/
var _metaStrategyMap = make(map[uint64]map[common.Address]*models.TStrategyFromMeta)
var _metaStrategyMap = make(map[uint64]*sync.Map)

func initOrGetMetaStrategyMap(chainID uint64) *sync.Map {
syncMap := _metaStrategyMap[chainID]
if syncMap == nil {
syncMap = &sync.Map{}
_metaStrategyMap[chainID] = syncMap
}
return syncMap
}

func init() {
for _, chainID := range env.SUPPORTED_CHAIN_IDS {
if _, ok := _metaStrategyMap[chainID]; !ok {
_metaStrategyMap[chainID] = make(map[common.Address]*models.TStrategyFromMeta)
_metaStrategyMap[chainID] = &sync.Map{}
}
}
}
Expand All @@ -31,10 +41,8 @@ func init() {
** setStrategyInMap will put a TStrategyFromMeta in the _metaStrategyMap variable.
**********************************************************************************************/
func setStrategyInMap(chainID uint64, strategy *models.TStrategyFromMeta) {
if _, ok := _metaStrategyMap[chainID]; !ok {
_metaStrategyMap[chainID] = make(map[common.Address]*models.TStrategyFromMeta)
}
_metaStrategyMap[chainID][strategy.Address] = strategy
syncMap := initOrGetMetaStrategyMap(chainID)
syncMap.Store(strategy.Address, strategy)
}

/**********************************************************************************************
Expand All @@ -44,24 +52,26 @@ func setStrategyInMap(chainID uint64, strategy *models.TStrategyFromMeta) {
** not.
**********************************************************************************************/
func GetMetaStrategy(chainID uint64, strategyAddress common.Address) (*models.TStrategyFromMeta, bool) {
if strategysForChain, ok := _metaStrategyMap[chainID]; ok {
if strategy, ok := strategysForChain[strategyAddress]; ok {
return strategy, true
}
syncMap := initOrGetMetaStrategyMap(chainID)
data, ok := syncMap.Load(strategyAddress)
if !ok {
return nil, false
}
return nil, false
return data.(*models.TStrategyFromMeta), true
}

/**********************************************************************************************
** ListMetaStrategies will, for a given chainID, list all the strategies from the
** _metaStrategyMap variable.
**********************************************************************************************/
func ListMetaStrategies(chainID uint64) []*models.TStrategyFromMeta {
var strategies []*models.TStrategyFromMeta
for _, strategy := range _metaStrategyMap[chainID] {
strategies = append(strategies, strategy)
}
return strategies
syncMap := initOrGetMetaProtocolMap(chainID)
var retValue []*models.TStrategyFromMeta
syncMap.Range(func(_, data interface{}) bool {
retValue = append(retValue, data.(*models.TStrategyFromMeta))
return true
})
return retValue
}

/**************************************************************************************************
Expand Down
Loading

1 comment on commit d2f1e88

@vercel
Copy link

@vercel vercel bot commented on d2f1e88 Jun 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ydaemon – ./docs

ydaemon.yearn.farm
ydaemon-git-main.yearn.farm

Please sign in to comment.