Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CT-708] Indexer track e2e latency (#1237)
Browse files Browse the repository at this point in the history
* fwd through message times

* use the var i made

* post processing stat emission

* post-forwarding timestamp

* pass through event type from vulcan

* event type to stat emissions

* test fix function calls

* WIP WIP WIP

* fix tests

* unused import

* test that kafka messages are threaded
jonfung-dydx committed Apr 2, 2024

Verified

This commit was signed with the committer’s verified signature.
dwilkie David Wilkie
1 parent a91c1ca commit f46c966
Showing 16 changed files with 234 additions and 58 deletions.
1 change: 1 addition & 0 deletions indexer/packages/v4-protos/src/index.ts
Original file line number Diff line number Diff line change
@@ -16,3 +16,4 @@ export * from './codegen/google/protobuf/timestamp';
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
export * from './utils';
12 changes: 12 additions & 0 deletions indexer/packages/v4-protos/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Timestamp } from './codegen/google/protobuf/timestamp';

export const MILLIS_IN_NANOS: number = 1_000_000;
export const SECONDS_IN_MILLIS: number = 1_000;
export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ import {
IndexerOrderId,
PerpetualMarketCreateEventV1,
DeleveragingEventV1,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import {
PerpetualMarketType,
@@ -63,7 +64,6 @@ import {
generatePerpetualMarketMessage,
generatePerpetualPositionsContents,
} from '../../src/helpers/kafka-helper';
import { protoTimestampToDate } from '../../src/lib/helper';
import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types';

// TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'.
14 changes: 0 additions & 14 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ import {
import {
IndexerTendermintEvent,
IndexerTendermintEvent_BlockEvent,
Timestamp,
OrderFillEventV1,
MarketEventV1,
SubaccountUpdateEventV1,
@@ -29,10 +28,6 @@ import Big from 'big.js';
import _ from 'lodash';
import { DateTime } from 'luxon';

import {
MILLIS_IN_NANOS,
SECONDS_IN_MILLIS,
} from '../constants';
import {
AnnotatedSubaccountMessage,
DydxIndexerSubtypes,
@@ -70,15 +65,6 @@ export function convertToSubaccountMessage(
return subaccountMessage;
}

export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}

export function dateToDateTime(
protoTime: Date,
): DateTime {
21 changes: 18 additions & 3 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import {
logger,
InfoObject,
safeJsonStringify,
STATS_NO_SAMPLING,
} from '@dydxprotocol-indexer/base';
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
@@ -85,9 +86,10 @@ export class MessageForwarder {
}

public onMessage(topic: string, message: KafkaMessage): void {
const start: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.message_time_in_queue`,
Date.now() - Number(message.timestamp),
start - Number(message.timestamp),
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
@@ -125,18 +127,31 @@ export class MessageForwarder {
return;
}

const start: number = Date.now();
const startForwardMessage: number = Date.now();
this.forwardMessage(messageToForward);
const end: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.forward_message`,
end - start,
end - startForwardMessage,
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
channel: String(channel),
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
startForwardMessage - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic,
event_type: String(message.headers?.event_type),
},
);
}
}

public forwardMessage(message: MessageToForward): void {
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { OrderbookSide } from '../../src/lib/types';
import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import { defaultKafkaHeaders } from '../helpers/constants';
import config from '../../src/config';

jest.mock('@dydxprotocol-indexer/base', () => ({
@@ -196,6 +197,12 @@ describe('order-place-handler', () => {
const replacementMessageIoc: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())),
);
[replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional,
replacementMessageFok, replacementMessageIoc].forEach((message) => {
// eslint-disable-next-line no-param-reassign
message.headers = defaultKafkaHeaders;
});

const dbDefaultOrder: OrderFromDatabase = {
...testConstants.defaultOrder,
id: testConstants.defaultOrderId,
@@ -1225,7 +1232,11 @@ function expectWebsocketMessagesSent(
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage);
expectWebsocketSubaccountMessage(
producerSendSpy.mock.calls[callIndex][0],
subaccountMessage,
defaultKafkaHeaders,
);
callIndex += 1;
}

Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ import {
STATS_FUNCTION_NAME,
wrapBackgroundTask,
} from '@dydxprotocol-indexer/base';
import {
defaultTime,
} from '../helpers/constants';
import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev';
import {
ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION,
@@ -52,14 +55,16 @@ import {
OrderRemoveV1_OrderRemovalStatus,
RedisOrder,
SubaccountMessage,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { ProducerRecord } from 'kafkajs';
import { IHeaders, ProducerRecord } from 'kafkajs';
import { DateTime } from 'luxon';

import { OrderRemoveHandler } from '../../src/handlers/order-remove-handler';
import { OrderbookSide } from '../../src/lib/types';
import { redisClient } from '../../src/helpers/redis/redis-controller';

import {
expectCanceledOrderStatus,
expectOpenOrderIds,
@@ -133,6 +138,10 @@ describe('OrderRemoveHandler', () => {
timeInForce: TimeInForce.IOC,
};

const defaultKafkaHeaders: IHeaders = {
message_received_timestamp: String(protoTimestampToDate(defaultTime)),
};

it.each([
[
{
@@ -179,7 +188,10 @@ describe('OrderRemoveHandler', () => {
const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(orderRemoveJson);

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await expect(orderRemoveHandler.handleUpdate(offChainUpdate)).rejects.toThrow(
await expect(orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
)).rejects.toThrow(
new ParseMessageError(errorMessage),
);
expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({
@@ -194,7 +206,10 @@ describe('OrderRemoveHandler', () => {
const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove);

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({
at: 'orderRemoveHandler#handleOrderRemoval',
@@ -215,7 +230,10 @@ describe('OrderRemoveHandler', () => {

const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove);
const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

const ticker: string = testConstants.defaultPerpetualMarket.ticker;
expect(logger.error).toHaveBeenCalledWith({
@@ -320,7 +338,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

// orderbook level reduced by defaultQuantums
const remainingOrderbookLevel: string = Big(
@@ -470,7 +491,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderStatus(expectedOrderUuid, OrderStatus.BEST_EFFORT_CANCELED),
@@ -604,7 +628,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED),
@@ -742,7 +769,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED),
@@ -895,7 +925,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderStatus(expectedOrderUuid, removedOrder.status),
@@ -978,7 +1011,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderStatus(expectedOrderUuid, OrderStatus.FILLED),
@@ -1014,7 +1050,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(producerSendSpy).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({
@@ -1062,7 +1101,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

// Subaccounts message is sent first followed by orderbooks message
const subaccountContents: SubaccountMessageContents = {
@@ -1162,7 +1204,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
// orderbook should not be affected, so it will be set to defaultQuantums
@@ -1279,7 +1324,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

// orderbook level reduced by defaultQuantums
const remainingOrderbookLevel: string = Big(
@@ -1423,7 +1471,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

await Promise.all([
expectOrderbookLevelCache(
@@ -1545,7 +1596,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

// orderbook level reduced by defaultQuantums
const remainingOrderbookLevel: string = Big(
@@ -1676,7 +1730,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

// orderbook level should not be reduced
const remainingOrderbookLevel: string = Big(
@@ -1738,7 +1795,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(producerSendSpy).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({
@@ -1805,7 +1865,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(producerSendSpy).not.toHaveBeenCalled();
expect(stats.increment).toHaveBeenCalledWith('vulcan.indexer_expired_order_not_found', 1);
@@ -1878,7 +1941,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(producerSendSpy).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({
@@ -1940,7 +2006,10 @@ describe('OrderRemoveHandler', () => {
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();

const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler();
await orderRemoveHandler.handleUpdate(offChainUpdate);
await orderRemoveHandler.handleUpdate(
offChainUpdate,
defaultKafkaHeaders,
);

expect(producerSendSpy).not.toHaveBeenCalled();
expect(
@@ -2044,7 +2113,11 @@ describe('OrderRemoveHandler', () => {

if (expectedSubaccountMessage !== undefined) {
const subaccountProducerRecord: ProducerRecord = producerSendSpy.mock.calls[0][0];
expectWebsocketSubaccountMessage(subaccountProducerRecord, expectedSubaccountMessage);
expectWebsocketSubaccountMessage(
subaccountProducerRecord,
expectedSubaccountMessage,
defaultKafkaHeaders,
);
}

if (expectedOrderbookMessage !== undefined) {
19 changes: 19 additions & 0 deletions indexer/services/vulcan/__tests__/helpers/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {
MILLIS_IN_NANOS,
SECONDS_IN_MILLIS,
Timestamp,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import Long from 'long';
import { DateTime } from 'luxon';

const defaultDateTime: DateTime = DateTime.utc(2022, 6, 1, 12, 1, 1, 2);
export const defaultTime: Timestamp = {
seconds: Long.fromValue(Math.floor(defaultDateTime.toSeconds()), true),
nanos: (defaultDateTime.toMillis() % SECONDS_IN_MILLIS) * MILLIS_IN_NANOS,
};

export const defaultKafkaHeaders: IHeaders = {
message_received_timestamp: String(protoTimestampToDate(defaultTime)),
};
3 changes: 3 additions & 0 deletions indexer/services/vulcan/__tests__/helpers/helpers.ts
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import { KafkaMessage } from 'kafkajs';
import { redisClient } from '../../src/helpers/redis/redis-controller';
import { onMessage } from '../../src/lib/on-message';
import { DydxRecordHeaderKeys } from '../../src/lib/types';
import { defaultKafkaHeaders } from './constants';

export async function handleInitialOrderPlace(
orderPlace: redisTestConstants.OffChainUpdateOrderPlaceUpdateMessage,
@@ -23,6 +24,7 @@ export async function handleInitialOrderPlace(
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())),
);
message.headers = defaultKafkaHeaders;

await onMessage(message);
}
@@ -36,6 +38,7 @@ export async function handleOrderUpdate(
const message: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())),
);
message.headers = defaultKafkaHeaders;

await onMessage(message);
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { KafkaTopics } from '@dydxprotocol-indexer/kafka';
import { OffChainUpdateV1, OrderbookMessage, SubaccountMessage } from '@dydxprotocol-indexer/v4-protos';
import { ProducerRecord } from 'kafkajs';
import { IHeaders, ProducerRecord } from 'kafkajs';

export function expectWebsocketSubaccountMessage(
subaccountProducerRecord: ProducerRecord,
expectedSubaccountMessage: SubaccountMessage,
expectedHeaders: IHeaders,
): void {
expect(subaccountProducerRecord.topic).toEqual(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
const subaccountMessageValueBinary: Uint8Array = new Uint8Array(
subaccountProducerRecord.messages[0].value as Buffer,
);
const headers: IHeaders | undefined = subaccountProducerRecord.messages[0].headers;
const subaccountMessage: SubaccountMessage = SubaccountMessage.decode(
subaccountMessageValueBinary,
);
expect(headers).toEqual(expectedHeaders);
expect(subaccountMessage).toEqual(expectedSubaccountMessage);
}

2 changes: 1 addition & 1 deletion indexer/services/vulcan/__tests__/lib/on-message.test.ts
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ describe('onMessage', () => {
await onMessage(message);

expect(handler).toHaveBeenCalledTimes(1);
expect(handleUpdateMock).toHaveBeenCalledWith(update);
expect(handleUpdateMock).toHaveBeenCalledWith(update, message.headers ?? {});
expect(handleUpdateMock).toHaveBeenCalledTimes(1);

expect(stats.increment).toHaveBeenCalledWith('vulcan.received_kafka_message', 1);
7 changes: 4 additions & 3 deletions indexer/services/vulcan/src/handlers/handler.ts
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import {
} from '@dydxprotocol-indexer/kafka';
import { OrderbookMessageContents, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres';
import { OffChainUpdateV1, OrderbookMessage, RedisOrder } from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import { OrderbookSide } from 'src/lib/types';

import { orderSideToOrderbookSide } from './helpers';
@@ -15,11 +16,11 @@ export abstract class Handler {
this.txHash = txHash;
}

protected abstract handle(update: OffChainUpdateV1): Promise<void>;
protected abstract handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void>;

// TODO(DEC-1251): Add stats for message handling.
public async handleUpdate(update: OffChainUpdateV1): Promise<void> {
return this.handle(update);
public async handleUpdate(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
return this.handle(update, headers);
}

protected logAndThrowParseMessageError(
17 changes: 14 additions & 3 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -52,7 +52,7 @@ import { Handler } from './handler';
* being greater than or equal to the expiry of the order in the OrderPlace message, return
*/
export class OrderPlaceHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderPlaceHandler#handle',
message: 'Received OffChainUpdate with OrderPlace.',
@@ -142,7 +142,7 @@ export class OrderPlaceHandler extends Handler {
});
throw new Error(`Stateful order not found in database: ${orderUuid}`);
}
await this.sendCachedOrderUpdate(orderUuid);
await this.sendCachedOrderUpdate(orderUuid, headers);
}
const subaccountMessage: Message = {
value: createSubaccountWebsocketMessage(
@@ -151,6 +151,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
placementStatus,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}
@@ -164,6 +167,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
@@ -334,6 +340,7 @@ export class OrderPlaceHandler extends Handler {
*/
protected async sendCachedOrderUpdate(
orderId: string,
headers: IHeaders,
): Promise<void> {
const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache
.removeStatefulOrderUpdate(
@@ -351,6 +358,10 @@ export class OrderPlaceHandler extends Handler {
value: Buffer.from(
Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()),
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
event_type: String(headers.event_type),
},
};
sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN);
}
24 changes: 18 additions & 6 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ import {
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { Big } from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -68,7 +68,7 @@ import { getStateRemainingQuantums } from './helpers';
*/
export class OrderRemoveHandler extends Handler {
// eslint-disable-next-line @typescript-eslint/require-await
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderRemoveHandler#handle',
message: 'Received OffChainUpdate with OrderRemove.',
@@ -131,11 +131,11 @@ export class OrderRemoveHandler extends Handler {
}

if (this.isStatefulOrderCancelation(orderRemove)) {
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult);
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult, headers);
return;
}

await this.handleOrderRemoval(orderRemove, removeOrderResult);
await this.handleOrderRemoval(orderRemove, removeOrderResult, headers);
}

protected validateOrderRemove(orderRemove: OrderRemoveV1): void {
@@ -193,6 +193,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleStatefulOrderCancelation(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
const order: OrderFromDatabase | undefined = await runFuncWithTimingStat(
OrderTable.findById(
@@ -230,6 +231,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket.ticker,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);

@@ -241,7 +245,7 @@ export class OrderRemoveHandler extends Handler {
removeOrderResult.removed &&
removeOrderResult.restingOnBook === true &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}

}
@@ -259,6 +263,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleOrderRemoval(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
if (!removeOrderResult.removed) {
logger.info({
@@ -306,6 +311,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};

if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult, stateRemainingQuantums)) {
@@ -322,7 +330,7 @@ export class OrderRemoveHandler extends Handler {
!remainingQuantums.eq('0') &&
removeOrderResult.restingOnBook !== false &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}
// TODO: consolidate remove handler logic into a single lua script.
await this.addOrderToCanceledOrdersCache(
@@ -339,6 +347,7 @@ export class OrderRemoveHandler extends Handler {
protected async updateOrderbook(
removeOrderResult: RemoveOrderResult,
perpetualMarket: PerpetualMarketFromDatabase,
headers: IHeaders,
): Promise<void> {
const updatedQuantums: number = await runFuncWithTimingStat(
this.updatePriceLevelsCache(
@@ -352,6 +361,9 @@ export class OrderRemoveHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
7 changes: 5 additions & 2 deletions indexer/services/vulcan/src/handlers/order-update-handler.ts
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -51,7 +51,7 @@ import { Handler } from './handler';
* price level is capped to the size of the order in quantums
*/
export class OrderUpdateHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderUpdateHandler#handle',
message: 'Received OffChainUpdate with OrderUpdate.',
@@ -171,6 +171,9 @@ export class OrderUpdateHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
28 changes: 27 additions & 1 deletion indexer/services/vulcan/src/lib/on-message.ts
Original file line number Diff line number Diff line change
@@ -62,6 +62,19 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
start - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
event_type: String(message.headers?.event_type),
},
);
}

const messageValue: Buffer = message.value;
const offset: string = message.offset;
let update: OffChainUpdateV1;
@@ -86,7 +99,20 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
const handler: Handler = new (getHandler(update))!(
getTransactionHashFromHeaders(message.headers),
);
await handler.handleUpdate(update);
await handler.handleUpdate(update, message.headers ?? {});

const postProcessingTime: number = Date.now();
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received_post_processing`,
postProcessingTime - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
event_type: String(message.headers?.event_type),
},
);
}

success = true;
} catch (error) {

0 comments on commit f46c966

Please sign in to comment.