Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-709] Add flag to send subaccount websocket message from Ender for long term orders #1223

Merged
merged 5 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions indexer/packages/kafka/src/websocket-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ export function getTriggerPrice(
return undefined;
}

export function createSubaccountWebsocketMessage(
export function generateSubaccountMessageContents(
redisOrder: RedisOrder,
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
): Buffer {
): SubaccountMessageContents {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
);
Expand Down Expand Up @@ -90,6 +90,21 @@ export function createSubaccountWebsocketMessage(
},
],
};
return contents;
}

export function createSubaccountWebsocketMessage(
redisOrder: RedisOrder,
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
): Buffer {
const contents: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
order,
perpetualMarket,
placementStatus,
);

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
Expand Down
28 changes: 28 additions & 0 deletions indexer/packages/redis/src/helpers/order-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {IndexerOrder, RedisOrder, RedisOrder_TickerType} from '@dydxprotocol-indexer/v4-protos';
import {OrderTable, PerpetualMarketFromDatabase, protocolTranslations} from '@dydxprotocol-indexer/postgres';

/**
* Creates a `RedisOrder` given an `Order` and the corresponding `PerpetualMarket` for the `Order`.
* @param order
* @param perpetualMarket
* @returns
*/
export function convertToRedisOrder(
order: IndexerOrder,
perpetualMarket: PerpetualMarketFromDatabase,
): RedisOrder {
return {
order,
id: OrderTable.orderIdToUuid(order.orderId!),
ticker: perpetualMarket.ticker,
tickerType: RedisOrder_TickerType.TICKER_TYPE_PERPETUAL,
price: protocolTranslations.subticksToPrice(
order.subticks.toString(),
perpetualMarket,
),
size: protocolTranslations.quantumsToHumanFixedString(
order.quantums.toString(),
perpetualMarket.atomicResolution,
),
};
}
1 change: 1 addition & 0 deletions indexer/packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export * as StateFilledQuantumsCache from './caches/state-filled-quantums-cache'
export { placeOrder } from './caches/place-order';
export { removeOrder } from './caches/remove-order';
export { updateOrder } from './caches/update-order';
export * from './helpers/order-helper';

export * from './types';
export { redisConfigSchema } from './config';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { updateBlockCache } from '../../../src/caches/block-cache';
import {
createIndexerTendermintBlock,
createIndexerTendermintEvent,
expectOrderSubaccountKafkaMessage,
expectVulcanKafkaMessage,
} from '../../helpers/indexer-proto-helpers';
import { StatefulOrderPlacementHandler } from '../../../src/handlers/stateful-order/stateful-order-placement-handler';
Expand All @@ -44,6 +45,7 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderPlacementHandler', () => {
beforeAll(async () => {
Expand All @@ -61,6 +63,7 @@ describe('statefulOrderPlacementHandler', () => {
afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS = false;
});

afterAll(async () => {
Expand Down Expand Up @@ -135,12 +138,16 @@ describe('statefulOrderPlacementHandler', () => {

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement', defaultStatefulOrderEvent],
['stateful long term order placement', defaultStatefulOrderLongTermEvent],
])('successfully places order with %s', async (
['stateful order placement', defaultStatefulOrderEvent, false],
['stateful long term order placement', defaultStatefulOrderLongTermEvent, false],
['stateful order placement', defaultStatefulOrderEvent, true],
['stateful long term order placement', defaultStatefulOrderLongTermEvent, true],
])('successfully places order with %s (emit subaccount websocket msg: %s)', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
emitSubaccountMessage: boolean,
) => {
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS = emitSubaccountMessage;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
);
Expand Down Expand Up @@ -182,6 +189,13 @@ describe('statefulOrderPlacementHandler', () => {
offchainUpdate: expectedOffchainUpdate,
headers: { message_received_timestamp: kafkaMessage.timestamp, event_type: 'StatefulOrderPlacement' },
});
if (emitSubaccountMessage) {
expectOrderSubaccountKafkaMessage(
producerSendMock,
defaultOrder.orderId!.subaccountId!,
order!,
);
}
});

it.each([
Expand Down
42 changes: 32 additions & 10 deletions indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import {
PerpetualMarketFromDatabase,
PerpetualMarketTable,
IsoString,
fillTypeToTradeType,
fillTypeToTradeType, OrderSubaccountMessageContents,
} from '@dydxprotocol-indexer/postgres';
import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser';
import { getOrderIdHash, ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
import {
LiquidationOrderV1,
MarketMessage,
Expand Down Expand Up @@ -693,6 +693,10 @@ export async function expectFillSubaccountKafkaMessageFromLiquidationEvent(
});
}

function isConditionalOrder(order: OrderFromDatabase): boolean {
return Number(order.orderFlags) === ORDER_FLAG_CONDITIONAL;
}
Comment on lines +696 to +698
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isConditionalOrder function checks if an order is conditional based on its orderFlags. This is a straightforward and efficient way to determine the type of order. However, consider adding a brief comment explaining the significance of ORDER_FLAG_CONDITIONAL for future maintainability.

+ // Determines if an order is conditional based on its orderFlags
function isConditionalOrder(order: OrderFromDatabase): boolean {
  return Number(order.orderFlags) === ORDER_FLAG_CONDITIONAL;
}

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
function isConditionalOrder(order: OrderFromDatabase): boolean {
return Number(order.orderFlags) === ORDER_FLAG_CONDITIONAL;
}
// Determines if an order is conditional based on its orderFlags
function isConditionalOrder(order: OrderFromDatabase): boolean {
return Number(order.orderFlags) === ORDER_FLAG_CONDITIONAL;
}


export function expectOrderSubaccountKafkaMessage(
producerSendMock: jest.SpyInstance,
subaccountIdProto: IndexerSubaccountId,
Expand All @@ -702,16 +706,34 @@ export function expectOrderSubaccountKafkaMessage(
eventIndex: number = 0,
ticker: string = defaultPerpetualMarketTicker,
): void {
const {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
triggerPrice, totalFilled, goodTilBlock, ...orderWithoutUnwantedFields
} = order!;
let orderObject: OrderSubaccountMessageContents;

if (isConditionalOrder(order)) {
orderObject = {
...order!,
timeInForce: apiTranslations.orderTIFToAPITIF(order!.timeInForce),
postOnly: apiTranslations.isOrderTIFPostOnly(order!.timeInForce),
goodTilBlock: order!.goodTilBlock,
goodTilBlockTime: order!.goodTilBlockTime,
ticker,
};
} else {
orderObject = {
...orderWithoutUnwantedFields!,
timeInForce: apiTranslations.orderTIFToAPITIF(order!.timeInForce),
postOnly: apiTranslations.isOrderTIFPostOnly(order!.timeInForce),
goodTilBlockTime: order!.goodTilBlockTime,
ticker,
};
}

const contents: SubaccountMessageContents = {
orders: [
{
...order!,
timeInForce: apiTranslations.orderTIFToAPITIF(order!.timeInForce),
postOnly: apiTranslations.isOrderTIFPostOnly(order!.timeInForce),
goodTilBlock: order!.goodTilBlock,
goodTilBlockTime: order!.goodTilBlockTime,
ticker,
},
orderObject,
],
};

Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS: parseBoolean({
default: false,
}),
};

export default parseSchema(configSchema);
Original file line number Diff line number Diff line change
@@ -1,57 +1,85 @@
import { generateSubaccountMessageContents } from '@dydxprotocol-indexer/kafka';
import {
OrderFromDatabase, OrderModel,
OrderTable,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
SubaccountMessageContents,
} from '@dydxprotocol-indexer/postgres';
import { convertToRedisOrder } from '@dydxprotocol-indexer/redis';
import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser';
import {
OrderPlaceV1_OrderPlacementStatus,
OffChainUpdateV1,
IndexerOrder,
IndexerSubaccountId,
OffChainUpdateV1,
OrderPlaceV1_OrderPlacementStatus,
RedisOrder,
StatefulOrderEventV1,
SubaccountId,
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';

// TODO(IND-334): Rename to LongTermOrderPlacementHandler after deprecating StatefulOrderPlacement
export class StatefulOrderPlacementHandler extends
AbstractStatefulOrderHandler<StatefulOrderEventV1> {
export class StatefulOrderPlacementHandler
extends AbstractStatefulOrderHandler<StatefulOrderEventV1> {
eventType: string = 'StatefulOrderEvent';

public getParallelizationIds(): string[] {
// Stateful Order Events with the same orderId
public getOrderId(): string {
let orderId: string;
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
if (this.event.orderPlace !== undefined) {
orderId = OrderTable.orderIdToUuid(this.event.orderPlace!.order!.orderId!);
} else {
orderId = OrderTable.orderIdToUuid(this.event.longTermOrderPlacement!.order!.orderId!);
}
return this.getParallelizationIdsFromOrderId(orderId);
return orderId;
}

public getSubaccountId(): IndexerSubaccountId {
let subaccountId: IndexerSubaccountId;
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
if (this.event.orderPlace !== undefined) {
subaccountId = this.event.orderPlace!.order!.orderId!.subaccountId!;
} else {
subaccountId = this.event.longTermOrderPlacement!.order!.orderId!.subaccountId!;
}
return subaccountId;
}

public getParallelizationIds(): string[] {
// Stateful Order Events with the same orderId
return this.getParallelizationIdsFromOrderId(this.getOrderId());
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
let order: IndexerOrder;
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
if (this.event.orderPlace !== undefined) {
order = this.event.orderPlace!.order!;
} else {
order = this.event.longTermOrderPlacement!.order!;
}
return this.createKafkaEvents(order);
return this.createKafkaEvents(order, resultRow);
}

private createKafkaEvents(order: IndexerOrder): ConsolidatedKafkaEvent[] {
const kafakEvents: ConsolidatedKafkaEvent[] = [];
private createKafkaEvents(
order: IndexerOrder,
resultRow: pg.QueryResultRow,
): ConsolidatedKafkaEvent[] {
const kafkaEvents: ConsolidatedKafkaEvent[] = [];

const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({
orderPlace: {
order,
placementStatus: OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
},
});
kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent(
kafkaEvents.push(this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(order.orderId!),
offChainUpdate,
{
Expand All @@ -60,6 +88,33 @@ export class StatefulOrderPlacementHandler extends
},
));

return kafakEvents;
if (config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS) {
const perpetualMarket: PerpetualMarketFromDatabase = perpetualMarketRefresher
.getPerpetualMarketFromClobPairId(order.orderId!.clobPairId.toString())!;
const dbOrder: OrderFromDatabase = OrderModel.fromJson(resultRow.order) as OrderFromDatabase;
if (dbOrder === undefined) {
throw new Error(`Order id not found in database: ${this.getOrderId()}`);
}
const redisOrder: RedisOrder = convertToRedisOrder(order, perpetualMarket);
const subaccountContent: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
dbOrder,
perpetualMarket,
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
);

const subaccountIdProto: SubaccountId = {
owner: this.getSubaccountId().owner,
number: this.getSubaccountId().number,
};
kafkaEvents.push(this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(subaccountContent),
subaccountIdProto,
this.getOrderId(),
false,
subaccountContent,
));
}
return kafkaEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';
import Long from 'long';
import { convertToRedisOrder } from '../../src/handlers/helpers';
import { redisClient, redisClient as client } from '../../src/helpers/redis/redis-controller';
import { onMessage } from '../../src/lib/on-message';
import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace } from '../helpers/helpers';
Expand Down Expand Up @@ -118,23 +117,23 @@ describe('order-place-handler', () => {
quantums: Long.fromValue(500_000, true),
subticks: Long.fromValue(1_000_000, true),
};
const replacedOrder: RedisOrder = convertToRedisOrder(
const replacedOrder: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrder,
testConstants.defaultPerpetualMarket,
);
const replacedOrderGoodTilBlockTime: RedisOrder = convertToRedisOrder(
const replacedOrderGoodTilBlockTime: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderGoodTilBlockTime,
testConstants.defaultPerpetualMarket,
);
const replacedOrderConditional: RedisOrder = convertToRedisOrder(
const replacedOrderConditional: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderConditional,
testConstants.defaultPerpetualMarket,
);
const replacedOrderFok: RedisOrder = convertToRedisOrder(
const replacedOrderFok: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderFok,
testConstants.defaultPerpetualMarket,
);
const replacedOrderIoc: RedisOrder = convertToRedisOrder(
const replacedOrderIoc: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderIoc,
testConstants.defaultPerpetualMarket,
);
Expand Down Expand Up @@ -279,7 +278,7 @@ describe('order-place-handler', () => {
expectedOrderUuid: string,
expectSubaccountMessageSent: boolean,
) => {
const expectedOrder: RedisOrder = convertToRedisOrder(
const expectedOrder: RedisOrder = redisPackage.convertToRedisOrder(
orderToPlace,
testConstants.defaultPerpetualMarket,
);
Expand Down
Loading
Loading