Skip to content

Commit

Permalink
feat: atomic object update
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Feb 2, 2024
1 parent 172192e commit bc85d0f
Show file tree
Hide file tree
Showing 26 changed files with 5,624 additions and 753 deletions.
2 changes: 2 additions & 0 deletions app/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func (app *App) registerPawneeUpgradeHandler() {
app.UpgradeKeeper.SetUpgradeHandler(upgradetypes.Pawnee,
func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
app.Logger().Info("upgrade to ", plan.Name)
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgUpdateObjectContent{}), 1.2e3))
app.GashubKeeper.SetMsgGasParams(ctx, *gashubtypes.NewMsgGasParamsWithFixedGas(sdk.MsgTypeURL(&storagemoduletypes.MsgCancelUpdateObjectContent{}), 1.2e3))
return app.mm.RunMigrations(ctx, app.configurator, fromVM)
})

Expand Down
266 changes: 266 additions & 0 deletions e2e/tests/storage_bill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,3 +2302,269 @@ func (s *PaymentTestSuite) reduceBNBBalance(user, to keys.KeyManager, leftBalanc
// ))
// s.SendTxBlock(from, msgSend)
//}

func (s *PaymentTestSuite) TestStorageBill_UpdateObject() {
var err error
ctx := context.Background()
sp := s.PickStorageProvider()
gvg, found := sp.GetFirstGlobalVirtualGroup()
s.Require().True(found)
queryFamilyResponse, err := s.Client.GlobalVirtualGroupFamily(ctx, &virtualgrouptypes.QueryGlobalVirtualGroupFamilyRequest{
FamilyId: gvg.FamilyId,
})
s.Require().NoError(err)
family := queryFamilyResponse.GlobalVirtualGroupFamily
user := s.GenAndChargeAccounts(1, 1000000)[0]

streamAddresses := []string{
user.GetAddr().String(),
family.VirtualPaymentAddress,
gvg.VirtualPaymentAddress,
paymenttypes.ValidatorTaxPoolAddress.String(),
}
streamRecordsBeforeCreateBucket := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsBeforeCreateBucket: %s", core.YamlString(streamRecordsBeforeCreateBucket))
// create bucket
bucketName := storagetestutils.GenRandomBucketName()
bucketChargedReadQuota := uint64(1000)
msgCreateBucket := storagetypes.NewMsgCreateBucket(
user.GetAddr(), bucketName, storagetypes.VISIBILITY_TYPE_PRIVATE, sp.OperatorKey.GetAddr(),
nil, math.MaxUint, nil, 0)
msgCreateBucket.ChargedReadQuota = bucketChargedReadQuota
msgCreateBucket.PrimarySpApproval.GlobalVirtualGroupFamilyId = gvg.FamilyId
msgCreateBucket.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateBucket.GetApprovalBytes())
s.Require().NoError(err)
s.SendTxBlock(user, msgCreateBucket)

// CreateObject
objectName := storagetestutils.GenRandomObjectName()
// create test buffer
var buffer bytes.Buffer
// Create 10MiB content where each line contains 1024 characters.
for i := 0; i < 10240; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
payloadSize := uint64(buffer.Len())
checksum := sdk.Keccak256(buffer.Bytes())
expectChecksum := [][]byte{checksum, checksum, checksum, checksum, checksum, checksum, checksum}
contextType := "text/event-stream"
msgCreateObject := storagetypes.NewMsgCreateObject(user.GetAddr(), bucketName, objectName, payloadSize, storagetypes.VISIBILITY_TYPE_PRIVATE, expectChecksum, contextType, storagetypes.REDUNDANCY_EC_TYPE, math.MaxUint, nil)
msgCreateObject.PrimarySpApproval.Sig, err = sp.ApprovalKey.Sign(msgCreateObject.GetApprovalBytes())
s.Require().NoError(err)
s.SendTxBlock(user, msgCreateObject)
s.T().Logf("msgCreateObject %s", msgCreateObject.String())

// HeadObject
queryHeadObjectRequest := storagetypes.QueryHeadObjectRequest{
BucketName: bucketName,
ObjectName: objectName,
}
queryHeadObjectResponse, err := s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)

streamRecordsBeforeSealObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsBeforeSealObjectTx %s", core.YamlString(streamRecordsBeforeSealObjectTx))

// SealObject
gvgId := gvg.Id
msgSealObject := storagetypes.NewMsgSealObject(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil)
secondarySigs := make([][]byte, 0)
secondarySPBlsPubKeys := make([]bls.PublicKey, 0)
blsSignHash := storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, queryHeadObjectResponse.ObjectInfo.Id, storagetypes.GenerateHash(queryHeadObjectResponse.ObjectInfo.Checksums[:])).GetBlsSignHash()

// every secondary sp signs the checksums
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], blsSignHash)
s.Require().NoError(err)
secondarySigs = append(secondarySigs, sig)
pk, err := bls.PublicKeyFromBytes(s.StorageProviders[spID].BlsKey.PubKey().Bytes())
s.Require().NoError(err)
secondarySPBlsPubKeys = append(secondarySPBlsPubKeys, pk)
}
aggBlsSig, err := core.BlsAggregateAndVerify(secondarySPBlsPubKeys, blsSignHash, secondarySigs)
s.Require().NoError(err)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msg %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)

queryHeadObjectResponse, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)
storedSizeBeforeUpdateTx := queryHeadObjectResponse.GlobalVirtualGroup.StoredSize

queryHeadBucketRequest := storagetypes.QueryHeadBucketRequest{
BucketName: bucketName,
}
queryHeadBucketResponseAfterCreateObj, err := s.Client.HeadBucket(ctx, &queryHeadBucketRequest)
s.T().Logf("queryHeadBucketResponseAfterCreateObj %s, err: %v", queryHeadBucketResponseAfterCreateObj, err)
s.Require().NoError(err)
queryGlobalSpStorePriceByTime, err := s.Client.QueryGlobalSpStorePriceByTime(ctx, &sptypes.QueryGlobalSpStorePriceByTimeRequest{
Timestamp: queryHeadBucketResponseAfterCreateObj.BucketInfo.CreateAt,
})
s.T().Logf("queryGlobalSpStorePriceByTime %s, err: %v", queryGlobalSpStorePriceByTime, err)
s.Require().NoError(err)

params := s.queryParams()
primaryStorePrice := queryGlobalSpStorePriceByTime.GlobalSpStorePrice.PrimaryStorePrice
secondaryStorePrice := queryGlobalSpStorePriceByTime.GlobalSpStorePrice.SecondaryStorePrice
chargeSize := s.getChargeSize(queryHeadObjectResponse.ObjectInfo.PayloadSize)
orginChargeRate := primaryStorePrice.Add(secondaryStorePrice.MulInt64(6)).MulInt(sdk.NewIntFromUint64(chargeSize)).TruncateInt()
orginChargeRate = params.VersionedParams.ValidatorTaxRate.MulInt(orginChargeRate).TruncateInt().Add(orginChargeRate)

// get stream records after first seal, before update object content tx
streamRecordsBeforeUpdateObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsBeforeObjectContentTx %s", core.YamlString(streamRecordsBeforeUpdateObjectTx))
userStreamAccountBeforeUpdateObjTx := streamRecordsBeforeUpdateObjectTx.User

// 1. update the object, new payload size is 1 MB
var newBuffer bytes.Buffer
for i := 0; i < 1024; i++ {
newBuffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
newPayloadSize := uint64(newBuffer.Len())
newChecksum := sdk.Keccak256(newBuffer.Bytes())
newExpectChecksum := [][]byte{newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum, newChecksum}
msgUpdateObject := storagetypes.NewMsgUpdateObjectContent(user.GetAddr(), bucketName, objectName, newPayloadSize, newExpectChecksum)
s.SendTxBlock(user, msgUpdateObject)
s.T().Logf("msgUpdateObject %s", msgUpdateObject.String())

// only the field updating is set to true.
queryHeadObjectResponse, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)
s.Require().Equal(payloadSize, queryHeadObjectResponse.ObjectInfo.PayloadSize)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.Version)
s.Require().Equal(true, queryHeadObjectResponse.ObjectInfo.IsUpdating)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.UpdatedAt)
s.Require().Equal(storedSizeBeforeUpdateTx, queryHeadObjectResponse.GlobalVirtualGroup.StoredSize)

// HeadShadowObject
queryHeadShadowObjectRequest := storagetypes.QueryHeadShadowObjectRequest{
BucketName: bucketName,
ObjectName: objectName,
}
queryHeadShadowObjectResponse, err := s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().NoError(err)
s.Require().Equal(newPayloadSize, queryHeadShadowObjectResponse.ObjectInfo.PayloadSize)
s.Require().Equal(int64(1), queryHeadShadowObjectResponse.ObjectInfo.Version)

// Should lock balance according to the updated object size after updateObjectContent tx
// get stream records after UpdateObjectContent Tx
streamRecordsAfterUpdateObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsAfterUpdateObjectContentTx %s", core.YamlString(streamRecordsAfterUpdateObjectTx))
userStreamAccountAfterUpdateObjTx := streamRecordsAfterUpdateObjectTx.User

chargeSize = s.getChargeSize(queryHeadShadowObjectResponse.ObjectInfo.PayloadSize)
newChargeRate := primaryStorePrice.Add(secondaryStorePrice.MulInt64(6)).MulInt(sdk.NewIntFromUint64(chargeSize)).TruncateInt()
newChargeRate = params.VersionedParams.ValidatorTaxRate.MulInt(newChargeRate).TruncateInt().Add(newChargeRate)
newLockedBalance := newChargeRate.Mul(sdkmath.NewIntFromUint64(params.VersionedParams.ReserveTime))
// lock balance according to the new object size
s.Require().Equal(newLockedBalance.String(), userStreamAccountAfterUpdateObjTx.LockBalance.String())
// the net flow rate stay still, only lock the updatedObject balance
s.Require().True(userStreamAccountAfterUpdateObjTx.NetflowRate.Equal(userStreamAccountBeforeUpdateObjTx.NetflowRate))

// 2 Cancel the update
msgCancelUpdateObject := storagetypes.NewMsgCancelUpdateObjectContent(user.GetAddr(), bucketName, objectName)
s.Require().NoError(err)
s.SendTxBlock(user, msgCancelUpdateObject)

queryHeadObjectResponse, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)
s.Require().Equal(payloadSize, queryHeadObjectResponse.ObjectInfo.PayloadSize)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.Version)
s.Require().Equal(false, queryHeadObjectResponse.ObjectInfo.IsUpdating)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.UpdatedAt)

// HeadShadowObject not found
_, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().Error(err)

// the stream record of user is reverted if cancel update
streamRecordsAfterCancelUpdateObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsAfterCancelUpdateObjectTx %s\n", core.YamlString(streamRecordsAfterCancelUpdateObjectTx))
userStreamAccountAfterCancelUpdateObjTx := streamRecordsAfterCancelUpdateObjectTx.User
s.Require().Equal(sdkmath.ZeroInt(), userStreamAccountAfterCancelUpdateObjTx.LockBalance)
// lock fee will be applied to static balance
s.Require().True(userStreamAccountAfterUpdateObjTx.NetflowRate.Equal(userStreamAccountAfterCancelUpdateObjTx.NetflowRate))
deductStaticBalanceSinceLastCrudTs := sdkmath.NewInt(userStreamAccountAfterCancelUpdateObjTx.CrudTimestamp - userStreamAccountAfterUpdateObjTx.CrudTimestamp).Mul(userStreamAccountAfterCancelUpdateObjTx.NetflowRate)
s.Require().Equal(userStreamAccountAfterUpdateObjTx.StaticBalance.Add(deductStaticBalanceSinceLastCrudTs).Add(userStreamAccountAfterUpdateObjTx.LockBalance), userStreamAccountAfterCancelUpdateObjTx.StaticBalance)

// 3. update the object again
s.SendTxBlock(user, msgUpdateObject)

streamRecordsAfterUpdateObjectTx = s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsAfterUpdateObjectContentTx 2 %s", core.YamlString(streamRecordsAfterUpdateObjectTx))
userStreamAccountAfterUpdateObjTx = streamRecordsAfterUpdateObjectTx.User

// 4 reject seal by SP
msgRejectSealObject := storagetypes.NewMsgRejectUnsealedObject(sp.SealKey.GetAddr(), bucketName, objectName)
s.SendTxBlock(sp.SealKey, msgRejectSealObject)

// object
queryHeadObjectResponse, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)
s.Require().Equal(payloadSize, queryHeadObjectResponse.ObjectInfo.PayloadSize)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.Version)
s.Require().Equal(false, queryHeadObjectResponse.ObjectInfo.IsUpdating)
s.Require().Equal(int64(0), queryHeadObjectResponse.ObjectInfo.UpdatedAt)

// HeadShadowObject not found
_, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().Error(err)

// the stream record of user is reverted if reject seal by SP
streamRecordsAfterRejectSealObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsAfterRejectSealObjectTx %s", core.YamlString(streamRecordsAfterRejectSealObjectTx))
userStreamAccountRejectSealObjTx := streamRecordsAfterRejectSealObjectTx.User
s.Require().Equal(sdkmath.ZeroInt(), userStreamAccountRejectSealObjTx.LockBalance)

// lock fee will be applied to static balance
s.Require().True(userStreamAccountAfterUpdateObjTx.NetflowRate.Equal(userStreamAccountRejectSealObjTx.NetflowRate))
deductStaticBalanceSinceLastCrudTs = sdkmath.NewInt(userStreamAccountRejectSealObjTx.CrudTimestamp - userStreamAccountAfterUpdateObjTx.CrudTimestamp).Mul(userStreamAccountRejectSealObjTx.NetflowRate)
s.Require().Equal(userStreamAccountAfterUpdateObjTx.StaticBalance.Add(deductStaticBalanceSinceLastCrudTs).Add(userStreamAccountAfterUpdateObjTx.LockBalance),
userStreamAccountRejectSealObjTx.StaticBalance)

// 5 Update the object again
s.SendTxBlock(user, msgUpdateObject)

queryHeadShadowObjectResponse, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().NoError(err)
updatedAt := queryHeadShadowObjectResponse.ObjectInfo.UpdatedAt

// 6 SP seal the object
blsSignHash = storagetypes.NewSecondarySpSealObjectSignDoc(s.GetChainID(), gvgId, queryHeadObjectResponse.ObjectInfo.Id, storagetypes.GenerateHash(queryHeadShadowObjectResponse.ObjectInfo.Checksums[:])).GetBlsSignHash()
msgSealObject = storagetypes.NewMsgSealObject(sp.SealKey.GetAddr(), bucketName, objectName, gvg.Id, nil)
secondarySigs = make([][]byte, 0)
secondarySPBlsPubKeys = make([]bls.PublicKey, 0)

// every secondary sp signs the checksums
for _, spID := range gvg.SecondarySpIds {
sig, err := core.BlsSignAndVerify(s.StorageProviders[spID], blsSignHash)
s.Require().NoError(err)
secondarySigs = append(secondarySigs, sig)
pk, err := bls.PublicKeyFromBytes(s.StorageProviders[spID].BlsKey.PubKey().Bytes())
s.Require().NoError(err)
secondarySPBlsPubKeys = append(secondarySPBlsPubKeys, pk)
}
aggBlsSig, err = core.BlsAggregateAndVerify(secondarySPBlsPubKeys, blsSignHash, secondarySigs)
s.Require().NoError(err)
msgSealObject.SecondarySpBlsAggSignatures = aggBlsSig
s.T().Logf("msgSealObject %s", msgSealObject.String())
s.SendTxBlock(sp.SealKey, msgSealObject)

// HeadObject
queryHeadObjectResponse, err = s.Client.HeadObject(ctx, &queryHeadObjectRequest)
s.Require().NoError(err)
s.Require().Equal(newPayloadSize, queryHeadObjectResponse.ObjectInfo.PayloadSize)
s.Require().Equal(int64(1), queryHeadObjectResponse.ObjectInfo.Version)
s.Require().Equal(false, queryHeadObjectResponse.ObjectInfo.IsUpdating)
s.Require().Equal(updatedAt, queryHeadObjectResponse.ObjectInfo.UpdatedAt)
// HeadShadowObject not found
_, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().Error(err)

// the new flow rate should be only calculated base on the updated object, and deduct the stale obejct's
streamRecordsAfterSealObjectTx := s.getStreamRecords(streamAddresses)
s.T().Logf("streamRecordsAfterSealObjectTx %s", core.YamlString(streamRecordsAfterSealObjectTx))
userStreamAccountAfterSealObjectTx := streamRecordsAfterSealObjectTx.User
s.Require().Equal(sdkmath.ZeroInt(), userStreamAccountAfterSealObjectTx.LockBalance)
s.Require().Equal(userStreamAccountAfterUpdateObjTx.NetflowRate.Sub(orginChargeRate.Neg()).Add(newChargeRate.Neg()), userStreamAccountAfterSealObjectTx.NetflowRate)
}
2 changes: 2 additions & 0 deletions proto/greenfield/permission/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ enum ActionType {
ACTION_UPDATE_GROUP_EXTRA = 12;
ACTION_UPDATE_GROUP_INFO = 13;

ACTION_UPDATE_OBJECT_CONTENT = 14;

ACTION_TYPE_ALL = 99;
}

Expand Down
68 changes: 68 additions & 0 deletions proto/greenfield/storage/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ message EventSealObject {
uint32 global_virtual_group_id = 7;
// local_virtual_group_id defines the unique id of lvg which the object stored
uint32 local_virtual_group_id = 8;
// for_update indicates whether sealing on a updating object
bool for_update = 9;
}

// EventCopyObject is emitted on MsgCopyObject
Expand Down Expand Up @@ -240,6 +242,8 @@ message EventRejectSealObject {
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
// for_update indicates whether reject sealing on a updating object
bool for_update = 5;
}

// EventDiscontinueObject is emitted on MsgDiscontinueObject
Expand Down Expand Up @@ -561,3 +565,67 @@ message EventSetTag {
// tags define the tag of the source
ResourceTags tags = 2;
}

// EventUpdateObjectContent is emitted on MsgUpdateObjectContent
message EventUpdateObjectContent {
// operator define the account address of msg operator
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// object_id is the unique identifier of object
string object_id = 2 [
(cosmos_proto.scalar) = "cosmos.Uint",
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
string bucket_name = 3;
string object_name = 4;
// payload_size define the size of payload data which you want upload
uint64 payload_size = 5;
// checksums define the total checksums of the object which generated by redundancy
repeated bytes checksums = 6;
// version define the version of object
int64 version = 7;
}

// EventUpdateObjectContentSuccess is emitted on the MsgSealObject for updating object
message EventUpdateObjectContentSuccess {
// operator define the account address of msg operator
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// object_id is the unique identifier of object
string object_id = 2 [
(cosmos_proto.scalar) = "cosmos.Uint",
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
string bucket_name = 3;
string object_name = 4;
// content_type define the content type of the payload data
string content_type = 5;
// prev_payload_size define the size of payload data stored previously
uint64 prev_payload_size = 6;
// new_payload_size define the new size of payload data
uint64 new_payload_size = 7;
// prev_checksums define the total checksums of the previous object which generated by redundancy
repeated bytes prev_checksums = 8;
// new_checksums define the total checksums of the updated object which generated by redundancy
repeated bytes new_checksums = 9;
// version define the version of object
int64 version = 10;
// updated_at define the block timestamp when the object is updated
int64 updated_at = 11;
}

// EventCancelUpdateObjectContent is emitted on MsgCancelUpdateObjectContent
message EventCancelUpdateObjectContent {
// operator define the account address of operator who cancel update object
string operator = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];
// bucket_name define the name of the bucket
string bucket_name = 2;
// object_name define the name of the object
string object_name = 3;
// object_name define the id of the object
string object_id = 4 [
(cosmos_proto.scalar) = "cosmos.Uint",
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
}
Loading

0 comments on commit bc85d0f

Please sign in to comment.