Skip to content

Commit

Permalink
process operaitons
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill committed Aug 8, 2024
1 parent 898074b commit 9ca9cd7
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/protocol-build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Protocol Build & Push Image to AWS ECR
on: # yamllint disable-line rule:truthy
push:
branches:
- 'wl/sa2'
- 'wl/sa3'
- main
- 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x
Expand Down
24 changes: 2 additions & 22 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,31 +264,11 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
)
},
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate {
subaccount := k.subaccountsKeeper.GetSubaccount(
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(
ctx,
subaccountId,
)
assetPositions := make([]*satypes.SubaccountAssetPosition, len(subaccount.AssetPositions))
for i, ap := range subaccount.AssetPositions {
assetPositions[i] = &satypes.SubaccountAssetPosition{
AssetId: ap.AssetId,
Quantums: ap.Quantums.BigInt().Uint64(),
}
}
perpetualPositions := make([]*satypes.SubaccountPerpetualPosition, len(subaccount.PerpetualPositions))
for i, pp := range subaccount.PerpetualPositions {
perpetualPositions[i] = &satypes.SubaccountPerpetualPosition{
PerpetualId: pp.PerpetualId,
Quantums: pp.Quantums.BigInt().Uint64(),
}
}

return &satypes.StreamSubaccountUpdate{
SubaccountId: &subaccountId,
UpdatedAssetPositions: assetPositions,
UpdatedPerpetualPositions: perpetualPositions,
Snapshot: true,
}
return &subaccountUpdate
},
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
Expand Down
31 changes: 31 additions & 0 deletions protocol/x/clob/keeper/process_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ func fetchOrdersInvolvedInOpQueue(
return orderIdSet
}

// fetchSubaccountIdsInvolvedInOpQueue fetches all SubaccountIds involved in an operations
// queue's matches and returns them as a set.
func fetchSubaccountIdsInvolvedInOpQueue(
operations []types.InternalOperation,
) (subaccountIdSet map[satypes.SubaccountId]struct{}) {
subaccountIdSet = make(map[satypes.SubaccountId]struct{})
for _, operation := range operations {
if clobMatch := operation.GetMatch(); clobMatch != nil {
subaccountIdSetForClobMatch := clobMatch.GetAllSubaccountIds()
subaccountIdSet = lib.MergeMaps(subaccountIdSet, subaccountIdSetForClobMatch)
}
}
return subaccountIdSet
}

// ProcessProposerOperations updates on-chain state given an []OperationRaw operations queue
// representing matches that occurred in the previous block. It performs validation on an operations
// queue. If all validation passes, the operations queue is written to state.
Expand All @@ -58,6 +73,7 @@ func (k Keeper) ProcessProposerOperations(
}

// If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream.
// Also send subaccount snapshots for impacted subaccounts.
// This must be sent out to account for checkState being discarded and deliverState being used.
if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() {
localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx)
Expand All @@ -75,6 +91,21 @@ func (k Keeper) ProcessProposerOperations(
allUpdates.Append(orderbookUpdate)
}
k.SendOrderbookUpdates(ctx, allUpdates)

// send subaccount snapshots
subaccountIdsFromProposed := fetchSubaccountIdsInvolvedInOpQueue(
operations,
)
subaccountIdsFromLocal := fetchSubaccountIdsInvolvedInOpQueue(
localValidatorOperationsQueue,
)
subaccountIdsToUpdate := lib.MergeMaps(subaccountIdsFromLocal, subaccountIdsFromProposed)
allSubaccountUpdates := make([]satypes.StreamSubaccountUpdate, 0)
for subaccountId := range subaccountIdsToUpdate {
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(ctx, subaccountId)
allSubaccountUpdates = append(allSubaccountUpdates, subaccountUpdate)
}
k.subaccountsKeeper.SendSubaccountUpdates(ctx, allSubaccountUpdates)
}

log.DebugLog(ctx, "Processing operations queue",
Expand Down
10 changes: 10 additions & 0 deletions protocol/x/clob/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type SubaccountsKeeper interface {
) (
val satypes.Subaccount,
)
GetStreamSubaccountUpdate(
ctx sdk.Context,
id satypes.SubaccountId,
) (
val satypes.StreamSubaccountUpdate,
)
GetAllSubaccount(
ctx sdk.Context,
) (
Expand Down Expand Up @@ -78,6 +84,10 @@ type SubaccountsKeeper interface {
quantums *big.Int,
perpetualId uint32,
) error
SendSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []satypes.StreamSubaccountUpdate,
)
}

type AssetsKeeper interface {
Expand Down
26 changes: 26 additions & 0 deletions protocol/x/clob/types/message_clob_match.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package types

import satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

// NewClobMatchFromMatchOrders creates a `ClobMatch` from the provided `MatchOrders`.
func NewClobMatchFromMatchOrders(
msgMatchOrders *MatchOrders,
Expand Down Expand Up @@ -40,3 +42,27 @@ func (clobMatch *ClobMatch) GetAllOrderIds() (orderIds map[OrderId]struct{}) {
}
return orderIds
}

// GetAllSubaccountIds returns a set of subaccountIds involved in a ClobMatch.
func (clobMatch *ClobMatch) GetAllSubaccountIds() (subaccountIds map[satypes.SubaccountId]struct{}) {
subaccountIds = make(map[satypes.SubaccountId]struct{})
if matchOrders := clobMatch.GetMatchOrders(); matchOrders != nil {
subaccountIds[matchOrders.GetTakerOrderId().SubaccountId] = struct{}{}
for _, makerFill := range matchOrders.GetFills() {
subaccountIds[makerFill.GetMakerOrderId().SubaccountId] = struct{}{}
}
}
if matchOrders := clobMatch.GetMatchPerpetualLiquidation(); matchOrders != nil {
subaccountIds[matchOrders.GetLiquidated()] = struct{}{}
for _, makerFill := range matchOrders.GetFills() {
subaccountIds[makerFill.GetMakerOrderId().SubaccountId] = struct{}{}
}
}
if matchOrders := clobMatch.GetMatchPerpetualDeleveraging(); matchOrders != nil {
subaccountIds[matchOrders.GetLiquidated()] = struct{}{}
for _, makerFill := range matchOrders.GetFills() {
subaccountIds[makerFill.GetOffsettingSubaccountId()] = struct{}{}
}
}
return subaccountIds
}
28 changes: 28 additions & 0 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,34 @@ func (k Keeper) GetSubaccount(
return val
}

func (k Keeper) GetStreamSubaccountUpdate(
ctx sdk.Context,
id types.SubaccountId,
) (val types.StreamSubaccountUpdate) {
subaccount := k.GetSubaccount(ctx, id)
assetPositions := make([]*types.SubaccountAssetPosition, len(subaccount.AssetPositions))
for i, ap := range subaccount.AssetPositions {
assetPositions[i] = &types.SubaccountAssetPosition{
AssetId: ap.AssetId,
Quantums: ap.Quantums.BigInt().Uint64(),
}
}
perpetualPositions := make([]*types.SubaccountPerpetualPosition, len(subaccount.PerpetualPositions))
for i, pp := range subaccount.PerpetualPositions {
perpetualPositions[i] = &types.SubaccountPerpetualPosition{
PerpetualId: pp.PerpetualId,
Quantums: pp.Quantums.BigInt().Uint64(),
}
}

return types.StreamSubaccountUpdate{
SubaccountId: &id,
UpdatedAssetPositions: assetPositions,
UpdatedPerpetualPositions: perpetualPositions,
Snapshot: true,
}
}

// GetAllSubaccount returns all subaccount.
// For more performant searching and iteration, use `ForEachSubaccount`.
func (k Keeper) GetAllSubaccount(ctx sdk.Context) (list []types.Subaccount) {
Expand Down
4 changes: 4 additions & 0 deletions protocol/x/subaccounts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type SubaccountsKeeper interface {
ctx sdk.Context,
id SubaccountId,
) (val Subaccount)
GetStreamSubaccountUpdate(
ctx sdk.Context,
id SubaccountId,
) (val StreamSubaccountUpdate)
LegacyGetNegativeTncSubaccountSeenAtBlock(ctx sdk.Context) (uint32, bool)
GetNegativeTncSubaccountSeenAtBlock(
ctx sdk.Context,
Expand Down

0 comments on commit 9ca9cd7

Please sign in to comment.