Skip to content

Commit

Permalink
feat: refine event for downstream
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Jan 24, 2024
1 parent 56be310 commit c6860c2
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 177 deletions.
4 changes: 2 additions & 2 deletions e2e/tests/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2478,7 +2478,7 @@ func (s *StorageTestSuite) TestUpdateObjectContent() {
// 5 Update the object again
s.SendTxBlock(user, msgUpdateObject)

_, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
queryHeadShadowObjectResponse, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().NoError(err)
updatedAt := queryHeadShadowObjectResponse.ObjectInfo.UpdatedAt

Expand Down Expand Up @@ -2516,6 +2516,6 @@ func (s *StorageTestSuite) TestUpdateObjectContent() {
s.Require().Equal(false, queryHeadObjectResponse.ObjectInfo.IsUpdating)
s.Require().Equal(updatedAt, queryHeadObjectResponse.ObjectInfo.UpdatedAt)
// HeadShadowObject
queryHeadShadowObjectResponse, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
_, err = s.Client.HeadShadowObject(ctx, &queryHeadShadowObjectRequest)
s.Require().Error(err)
}
10 changes: 7 additions & 3 deletions proto/greenfield/storage/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ message EventSealObject {
uint32 global_virtual_group_id = 7;
// local_virtual_group_id defines the unique id of lvg which the object stored
uint32 local_virtual_group_id = 8;
// for_update indicates whether sealing on a updating object
bool for_update = 9;
}

// EventCopyObject is emitted on MsgCopyObject
Expand Down Expand Up @@ -240,6 +242,8 @@ message EventRejectSealObject {
(gogoproto.customtype) = "Uint",
(gogoproto.nullable) = false
];
// for_update indicates whether reject sealing on a updating object
bool for_update = 5;
}

// EventDiscontinueObject is emitted on MsgDiscontinueObject
Expand Down Expand Up @@ -578,10 +582,8 @@ message EventUpdateObjectContent {
uint64 payload_size = 5;
// checksums define the total checksums of the object which generated by redundancy
repeated bytes checksums = 6;
// updated_at define the block timestamp when the object is updated
int64 updated_at = 7;
// version define the version of object
int64 version = 8;
int64 version = 7;
}

// EventUpdateObjectContentSuccess is emitted on the MsgSealObject for updating object
Expand All @@ -606,6 +608,8 @@ message EventUpdateObjectContentSuccess {
repeated bytes new_checksums = 8;
// version define the version of object
int64 version = 9;
// updated_at define the block timestamp when the object is updated
int64 updated_at = 10;
}

// EventCancelUpdateObjectContent is emitted on MsgCancelUpdateObjectContent
Expand Down
49 changes: 32 additions & 17 deletions x/storage/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,13 +798,18 @@ func (k Keeper) SealObject(

isUpdate := objectInfo.IsUpdating
if isUpdate {
// uncharged the current object store fee
err := k.UnChargeObjectStoreFee(ctx, bucketInfo, k.MustGetInternalBucketInfo(ctx, bucketInfo.Id), objectInfo)
internalBucketInfo := k.MustGetInternalBucketInfo(ctx, bucketInfo.Id)
err := k.UnChargeObjectStoreFee(ctx, bucketInfo, internalBucketInfo, objectInfo)
if err != nil {
return err
}
k.SetInternalBucketInfo(ctx, bucketInfo.Id, internalBucketInfo)
err = k.DeleteObjectFromVirtualGroup(ctx, bucketInfo, objectInfo)
if err != nil {
return err
}
shadowObjectInfo := k.MustGetShadowObjectInfo(ctx, bucketName, objectName)

shadowObjectInfo := k.MustGetShadowObjectInfo(ctx, bucketName, objectName)
objectInfo.UpdatedAt = shadowObjectInfo.UpdatedAt
objectInfo.Version = shadowObjectInfo.Version
objectInfo.Checksums = shadowObjectInfo.Checksums
Expand Down Expand Up @@ -850,17 +855,7 @@ func (k Keeper) SealObject(
obz := k.cdc.MustMarshal(objectInfo)
store.Set(types.GetObjectByIDKey(objectInfo.Id), obz)

if err := ctx.EventManager().EmitTypedEvents(&types.EventSealObject{
Operator: spSealAcc.String(),
BucketName: bucketInfo.BucketName,
ObjectName: objectInfo.ObjectName,
ObjectId: objectInfo.Id,
Status: objectInfo.ObjectStatus,
GlobalVirtualGroupId: opts.GlobalVirtualGroupId,
LocalVirtualGroupId: objectInfo.LocalVirtualGroupId,
}); err != nil {
return err
}
//
if isUpdate {
if err := ctx.EventManager().EmitTypedEvents(&types.EventUpdateObjectContentSuccess{
Operator: spSealAcc.String(),
Expand All @@ -872,10 +867,23 @@ func (k Keeper) SealObject(
PrevChecksums: prevCheckSums,
NewChecksums: objectInfo.Checksums,
Version: objectInfo.Version,
UpdatedAt: objectInfo.UpdatedAt,
}); err != nil {
return err
}
}
if err := ctx.EventManager().EmitTypedEvents(&types.EventSealObject{
Operator: spSealAcc.String(),
BucketName: bucketInfo.BucketName,
ObjectName: objectInfo.ObjectName,
ObjectId: objectInfo.Id,
Status: objectInfo.ObjectStatus,
GlobalVirtualGroupId: opts.GlobalVirtualGroupId,
LocalVirtualGroupId: objectInfo.LocalVirtualGroupId,
ForUpdate: isUpdate,
}); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -1198,7 +1206,8 @@ func (k Keeper) RejectSealObject(ctx sdk.Context, operator sdk.AccAddress, bucke
if sp.Id != spInState.Id {
return errors.Wrapf(types.ErrAccessDenied, "Only allowed primary SP to do reject seal object")
}
if objectInfo.IsUpdating {
forUpdate := objectInfo.IsUpdating
if forUpdate {
shadowObjectInfo := k.MustGetShadowObjectInfo(ctx, bucketName, objectName)
err := k.UnlockShadowObjectFeeAndDeleteShadowObjectInfo(ctx, bucketInfo, shadowObjectInfo, objectName)
if err != nil {
Expand All @@ -1219,12 +1228,12 @@ func (k Keeper) RejectSealObject(ctx sdk.Context, operator sdk.AccAddress, bucke
store.Delete(types.GetObjectKey(bucketName, objectName))
store.Delete(types.GetObjectByIDKey(objectInfo.Id))
}
// todo sp receive rejectSeal event for sealed object should not cause GC.
return ctx.EventManager().EmitTypedEvents(&types.EventRejectSealObject{
Operator: operator.String(),
BucketName: bucketInfo.BucketName,
ObjectName: objectInfo.ObjectName,
ObjectId: objectInfo.Id,
ForUpdate: forUpdate,
})
}

Expand Down Expand Up @@ -2453,11 +2462,17 @@ func (k Keeper) UpdateObjectContent(
}

if payloadSize == 0 {
// uncharged the current object store fee
internalBucketInfo := k.MustGetInternalBucketInfo(ctx, bucketInfo.Id)
err := k.UnChargeObjectStoreFee(ctx, bucketInfo, k.MustGetInternalBucketInfo(ctx, bucketInfo.Id), objectInfo)
if err != nil {
return err
}
k.SetInternalBucketInfo(ctx, bucketInfo.Id, internalBucketInfo)
err = k.DeleteObjectFromVirtualGroup(ctx, bucketInfo, objectInfo)
if err != nil {
return err
}

objectInfo.UpdatedAt = ctx.BlockTime().Unix()
objectInfo.Version = objectInfo.Version + 1
objectInfo.PayloadSize = 0
Expand Down
Loading

0 comments on commit c6860c2

Please sign in to comment.