Skip to content

Commit

Permalink
add migrations from #548
Browse files Browse the repository at this point in the history
  • Loading branch information
relyt29 committed Aug 30, 2024
1 parent 7b26fbe commit 5bf753e
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 105 deletions.
131 changes: 118 additions & 13 deletions x/emissions/migrations/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

"cosmossdk.io/collections"
"cosmossdk.io/errors"
errorsmod "cosmossdk.io/errors"
cosmosMath "cosmossdk.io/math"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
Expand All @@ -19,43 +19,55 @@ import (
"github.com/gogo/protobuf/proto"
)

const maxPageSize = uint64(10000)

func MigrateStore(ctx sdk.Context, emissionsKeeper keeper.Keeper) error {
ctx.Logger().Info("MIGRATING STORE FROM VERSION 2 TO VERSION 3")
storageService := emissionsKeeper.GetStorageService()
store := runtime.KVStoreAdapter(storageService.OpenKVStore(ctx))
cdc := emissionsKeeper.GetBinaryCodec()

ctx.Logger().Info("INVOKING MIGRATION HANDLER MigrateParams() FROM VERSION 2 TO VERSION 3")
if err := MigrateParams(store, cdc); err != nil {
ctx.Logger().Error("ERROR INVOKING MIGRATION HANDLER MigrateParams() FROM VERSION 2 TO VERSION 3")
return err
}
if err := MigrateActiveTopics(store, ctx, emissionsKeeper); err != nil {

ctx.Logger().Info("INVOKING MIGRATION HANDLER MigrateTopics() FROM VERSION 2 TO VERSION 3")
if err := MigrateTopics(ctx, store, cdc, emissionsKeeper); err != nil {
ctx.Logger().Error("ERROR INVOKING MIGRATION HANDLER MigrateTopics() FROM VERSION 2 TO VERSION 3")
return err
}

ctx.Logger().Info("INVOKING MIGRATION HANDLER ResetMapsWithNonNumericValues() FROM VERSION 2 TO VERSION 3")
ResetMapsWithNonNumericValues(store, cdc)

return nil
}

func MigrateParams(store storetypes.KVStore, cdc codec.BinaryCodec) error {
oldParams := oldtypes.Params{}
oldParamsBytes := store.Get(types.ParamsKey)
if oldParamsBytes == nil {
return errors.Wrapf(types.ErrNotFound, "old parameters not found")
return errorsmod.Wrapf(types.ErrNotFound, "old parameters not found")
}
err := proto.Unmarshal(oldParamsBytes, &oldParams)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal old parameters")
return errorsmod.Wrapf(err, "failed to unmarshal old parameters")
}

defaultParams := types.DefaultParams()

// DIFFERENCE BETWEEN OLD PARAMS AND NEW PARAMS:
// ADDED:
// MaxElementsPerForecast
// MaxActiveTopicsPerBlock
// REMOVED:
// MinEffectiveTopicRevenue
// TopicFeeRevenueDecayRate
// MaxRetriesToFulfilNoncesWorker
// MaxRetriesToFulfilNoncesReputer
// MaxActiveTopicsPerBlock
newParams := types.Params{
Version: oldParams.Version,
MaxSerializedMsgLength: oldParams.MaxSerializedMsgLength,
Expand Down Expand Up @@ -95,18 +107,22 @@ func MigrateParams(store storetypes.KVStore, cdc codec.BinaryCodec) error {
HalfMaxProcessStakeRemovalsEndBlock: oldParams.HalfMaxProcessStakeRemovalsEndBlock,
EpsilonSafeDiv: oldParams.EpsilonSafeDiv,
DataSendingFee: oldParams.DataSendingFee,
MaxElementsPerForecast: defaultParams.MaxElementsPerForecast,
MaxActiveTopicsPerBlock: defaultParams.MaxActiveTopicsPerBlock,
// NEW PARAMS
MaxElementsPerForecast: defaultParams.MaxElementsPerForecast,
MaxActiveTopicsPerBlock: defaultParams.MaxActiveTopicsPerBlock,
}

store.Delete(types.ParamsKey)
store.Set(types.ParamsKey, cdc.MustMarshal(&newParams))
return nil
}

func MigrateActiveTopics(store storetypes.KVStore, ctx sdk.Context, emissionsKeeper keeper.Keeper) error {
sdkCtx := sdk.UnwrapSDKContext(ctx)
sdkCtx.Logger().Warn("MigrateActiveTopics")
func MigrateTopics(
ctx sdk.Context,
store storetypes.KVStore,
cdc codec.BinaryCodec,
emissionsKeeper keeper.Keeper,
) error {
topicStore := prefix.NewStore(store, types.TopicsKey)
topicFeeRevStore := prefix.NewStore(store, types.TopicFeeRevenueKey)
topicStakeStore := prefix.NewStore(store, types.TopicStakeKey)
Expand All @@ -117,35 +133,39 @@ func MigrateActiveTopics(store storetypes.KVStore, ctx sdk.Context, emissionsKee
blockLowestWeightStore := prefix.NewStore(store, types.BlockToLowestActiveTopicWeightKey)
params, err := emissionsKeeper.GetParams(ctx)
if err != nil {
return errors.Wrapf(err, "failed to get params for active topic migration")
return errorsmod.Wrapf(err, "failed to get params for active topic migration")
}
churningBlock := make(map[types.TopicId]types.BlockHeight, 0)
blockToActiveTopics := make(map[types.BlockHeight]types.TopicIds, 0)
lowestWeight := make(map[types.BlockHeight]types.TopicIdWeightPair, 0)

topicWeightData := make(map[types.TopicId]alloraMath.Dec, 0)

topicsToChange := make(map[string]types.Topic, 0)
for ; iterator.Valid(); iterator.Next() {
var oldMsg oldtypes.Topic
err := proto.Unmarshal(iterator.Value(), &oldMsg)
if err != nil {
continue
return errorsmod.Wrapf(err, "failed to unmarshal old topic")
}
var feeRevenue = cosmosMath.NewInt(0)
idArray := make([]byte, 8)
binary.BigEndian.PutUint64(idArray, oldMsg.Id)
err = feeRevenue.Unmarshal(topicFeeRevStore.Get(idArray))
if err != nil {
topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
continue
}
var stake = cosmosMath.NewInt(0)
err = stake.Unmarshal(topicStakeStore.Get(idArray))
if err != nil {
topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
continue
}
var previousWeight = alloraMath.NewDecFromInt64(0)
err = previousWeight.Unmarshal(topicPreviousWeightStore.Get(idArray))
if err != nil {
topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
continue
}
// Get topic's latest weight
Expand All @@ -160,13 +180,15 @@ func MigrateActiveTopics(store storetypes.KVStore, ctx sdk.Context, emissionsKee
emissionsKeeper,
)
if err != nil {
topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
continue
}
topicWeightData[oldMsg.Id] = weight
blockHeight := oldMsg.EpochLastEnded + oldMsg.EpochLength
sdkCtx.Logger().Warn(fmt.Sprintf("update blockHeight %d", blockHeight))
// If the weight less than minimum weight then skip this topic
ctx.Logger().Warn(fmt.Sprintf("update blockHeight %d", blockHeight))
// If the weight is less than minimum weight then skip this topic
if weight.Lt(params.MinTopicWeight) {
topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
continue
}

Expand Down Expand Up @@ -219,11 +241,94 @@ func MigrateActiveTopics(store storetypes.KVStore, ctx sdk.Context, emissionsKee
}
blockToActiveStore.Set(blockHeightBytes, activeTopicsBytes)
blockLowestWeightStore.Set(blockHeightBytes, lowestWeightBytes)

topicsToChange[string(iterator.Key())] = getNewTopic(oldMsg)
}
_ = iterator.Close()
for key, value := range topicsToChange {
topicStore.Set([]byte(key), cdc.MustMarshal(&value))
}

return nil
}

func getNewTopic(oldMsg oldtypes.Topic) types.Topic {
return types.Topic{
Id: oldMsg.Id,
Creator: oldMsg.Creator,
Metadata: oldMsg.Metadata,
LossMethod: oldMsg.LossMethod,
EpochLastEnded: oldMsg.EpochLastEnded, // Add default value
EpochLength: oldMsg.EpochLength,
GroundTruthLag: oldMsg.GroundTruthLag,
PNorm: oldMsg.PNorm,
AlphaRegret: oldMsg.AlphaRegret,
AllowNegative: oldMsg.AllowNegative,
Epsilon: alloraMath.MustNewDecFromString("0.01"),
// InitialRegret is being reset to account for NaNs that were previously stored due to insufficient validation
InitialRegret: alloraMath.MustNewDecFromString("0"),
WorkerSubmissionWindow: oldMsg.WorkerSubmissionWindow,
// These are new fields
MeritSortitionAlpha: alloraMath.MustNewDecFromString("0.1"),
ActiveInfererQuantile: alloraMath.MustNewDecFromString("0.25"),
ActiveForecasterQuantile: alloraMath.MustNewDecFromString("0.25"),
ActiveReputerQuantile: alloraMath.MustNewDecFromString("0.25"),
}
}

// Deletes all keys in the store with the given keyPrefix `maxPageSize` keys at a time
func safelyClearWholeMap(store storetypes.KVStore, keyPrefix []byte) {
s := prefix.NewStore(store, keyPrefix)

// Loop until all keys are deleted.
// Unbounded not best practice but we are sure that the number of keys will be limited
// and not deleting all keys means "poison" will remain in the store.
for {
// Gather keys to eventually delete
iterator := s.Iterator(nil, nil)
keysToDelete := make([][]byte, 0)
count := uint64(0)
for ; iterator.Valid(); iterator.Next() {
if count >= maxPageSize {
break
}

keysToDelete = append(keysToDelete, iterator.Key())
count++
}
iterator.Close()

// If no keys to delete, break => Exit whole function
if len(keysToDelete) == 0 {
break
}

// Delete the keys
for _, key := range keysToDelete {
s.Delete(key)
}
}
}

func ResetMapsWithNonNumericValues(store storetypes.KVStore, cdc codec.BinaryCodec) {
safelyClearWholeMap(store, types.InferenceScoresKey)
safelyClearWholeMap(store, types.ForecastScoresKey)
safelyClearWholeMap(store, types.ReputerScoresKey)
safelyClearWholeMap(store, types.InfererScoreEmasKey)
safelyClearWholeMap(store, types.ForecasterScoreEmasKey)
safelyClearWholeMap(store, types.ReputerScoreEmasKey)
safelyClearWholeMap(store, types.AllLossBundlesKey)
safelyClearWholeMap(store, types.NetworkLossBundlesKey)
safelyClearWholeMap(store, types.InfererNetworkRegretsKey)
safelyClearWholeMap(store, types.ForecasterNetworkRegretsKey)
safelyClearWholeMap(store, types.OneInForecasterNetworkRegretsKey)
safelyClearWholeMap(store, types.LatestNaiveInfererNetworkRegretsKey)
safelyClearWholeMap(store, types.LatestOneOutInfererInfererNetworkRegretsKey)
safelyClearWholeMap(store, types.LatestOneOutInfererForecasterNetworkRegretsKey)
safelyClearWholeMap(store, types.LatestOneOutForecasterInfererNetworkRegretsKey)
safelyClearWholeMap(store, types.LatestOneOutForecasterForecasterNetworkRegretsKey)
}

func getTopicWeight(
feeRevenue, stake cosmosMath.Int,
previousWeight alloraMath.Dec,
Expand Down
Loading

0 comments on commit 5bf753e

Please sign in to comment.