Skip to content

Commit

Permalink
GRPC Full node streaming - batching protos (#1626)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx authored Jun 6, 2024
1 parent 0e67d7a commit 82778b9
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 234 deletions.
90 changes: 42 additions & 48 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,35 +267,17 @@ export interface StreamOrderbookUpdatesRequestSDKType {
*/

export interface StreamOrderbookUpdatesResponse {
/** Orderbook updates for the clob pair. */
/** Batch of updates for the clob pair. */
updates: StreamUpdate[];
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponseSDKType {
/** Orderbook updates for the clob pair. */
/** Batch of updates for the clob pair. */
updates: StreamUpdateSDKType[];
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}
/**
* StreamUpdate is an update that will be pushed through the
Expand All @@ -305,6 +287,12 @@ export interface StreamOrderbookUpdatesResponseSDKType {
export interface StreamUpdate {
orderbookUpdate?: StreamOrderbookUpdate;
orderFill?: StreamOrderbookFill;
/** Block height of the update. */

blockHeight: number;
/** Exec mode of the update. */

execMode: number;
}
/**
* StreamUpdate is an update that will be pushed through the
Expand All @@ -314,6 +302,12 @@ export interface StreamUpdate {
export interface StreamUpdateSDKType {
orderbook_update?: StreamOrderbookUpdateSDKType;
order_fill?: StreamOrderbookFillSDKType;
/** Block height of the update. */

block_height: number;
/** Exec mode of the update. */

exec_mode: number;
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
Expand All @@ -328,8 +322,8 @@ export interface StreamOrderbookUpdate {
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

Expand All @@ -348,8 +342,8 @@ export interface StreamOrderbookUpdateSDKType {
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

Expand All @@ -363,7 +357,7 @@ export interface StreamOrderbookUpdateSDKType {
export interface StreamOrderbookFill {
/**
* Clob match. Provides information on which orders were matched
* and the type of order. Fill amounts here are relative.
* and the type of order.
*/
clobMatch?: ClobMatch;
/**
Expand All @@ -384,7 +378,7 @@ export interface StreamOrderbookFill {
export interface StreamOrderbookFillSDKType {
/**
* Clob match. Provides information on which orders were matched
* and the type of order. Fill amounts here are relative.
* and the type of order.
*/
clob_match?: ClobMatchSDKType;
/**
Expand Down Expand Up @@ -1159,9 +1153,7 @@ export const StreamOrderbookUpdatesRequest = {

function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
blockHeight: 0,
execMode: 0
updates: []
};
}

Expand All @@ -1171,14 +1163,6 @@ export const StreamOrderbookUpdatesResponse = {
StreamUpdate.encode(v!, writer.uint32(10).fork()).ldelim();
}

if (message.blockHeight !== 0) {
writer.uint32(16).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(24).uint32(message.execMode);
}

return writer;
},

Expand All @@ -1195,14 +1179,6 @@ export const StreamOrderbookUpdatesResponse = {
message.updates.push(StreamUpdate.decode(reader, reader.uint32()));
break;

case 2:
message.blockHeight = reader.uint32();
break;

case 3:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1215,8 +1191,6 @@ export const StreamOrderbookUpdatesResponse = {
fromPartial(object: DeepPartial<StreamOrderbookUpdatesResponse>): StreamOrderbookUpdatesResponse {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => StreamUpdate.fromPartial(e)) || [];
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand All @@ -1225,7 +1199,9 @@ export const StreamOrderbookUpdatesResponse = {
function createBaseStreamUpdate(): StreamUpdate {
return {
orderbookUpdate: undefined,
orderFill: undefined
orderFill: undefined,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -1239,6 +1215,14 @@ export const StreamUpdate = {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(18).fork()).ldelim();
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -1259,6 +1243,14 @@ export const StreamUpdate = {
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1272,6 +1264,8 @@ export const StreamUpdate = {
const message = createBaseStreamUpdate();
message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined;
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
23 changes: 11 additions & 12 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,8 @@ message StreamOrderbookUpdatesRequest {
// StreamOrderbookUpdatesResponse is a response message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesResponse {
// Orderbook updates for the clob pair.
// Batch of updates for the clob pair.
repeated StreamUpdate updates = 1 [ (gogoproto.nullable) = false ];

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 2;

// Exec mode of the updates.
uint32 exec_mode = 3;
}

// StreamUpdate is an update that will be pushed through the
Expand All @@ -190,6 +183,12 @@ message StreamUpdate {
StreamOrderbookUpdate orderbook_update = 1;
StreamOrderbookFill order_fill = 2;
}

// Block height of the update.
uint32 block_height = 3;

// Exec mode of the update.
uint32 exec_mode = 4;
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
Expand All @@ -201,8 +200,8 @@ message StreamOrderbookUpdate {
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// This is true for the initial response and false for all subsequent updates.
// Note that if the snapshot is true, then all previous entries should be
// All updates should be ignored until snapshot is recieved.
// If the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;
}
Expand All @@ -211,13 +210,13 @@ message StreamOrderbookUpdate {
// the full node GRPC stream.
message StreamOrderbookFill {
// Clob match. Provides information on which orders were matched
// and the type of order. Fill amounts here are relative.
// and the type of order.
ClobMatch clob_match = 1;

// All orders involved in the specified clob match. Used to look up
// price of a match through a given maker order id.
repeated Order orders = 2 [ (gogoproto.nullable) = false ];

// Resulting fill amounts for each order in the orders array.
repeated uint64 fill_amounts = 3 [ (gogoproto.nullable) = false ];
repeated uint64 fill_amounts = 3;
}
14 changes: 5 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
Snapshot: snapshot,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
}

sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

Expand Down Expand Up @@ -166,22 +166,20 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
metrics.IncrCounter(
metrics.GrpcEmitProtocolUpdateCount,
Expand All @@ -208,9 +206,7 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
Updates: streamUpdatesForSubscription,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand Down
Loading

0 comments on commit 82778b9

Please sign in to comment.