Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-708] Indexer track e2e latency #1237

Merged
merged 12 commits into from
Mar 28, 2024
21 changes: 18 additions & 3 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
logger,
InfoObject,
safeJsonStringify,
STATS_NO_SAMPLING,
} from '@dydxprotocol-indexer/base';
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
Expand Down Expand Up @@ -85,9 +86,10 @@ export class MessageForwarder {
}

public onMessage(topic: string, message: KafkaMessage): void {
const start: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.message_time_in_queue`,
Date.now() - Number(message.timestamp),
start - Number(message.timestamp),
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
Expand Down Expand Up @@ -125,18 +127,31 @@ export class MessageForwarder {
return;
}

const start: number = Date.now();
const startForwardMessage: number = Date.now();
this.forwardMessage(messageToForward);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems to send out websocket messages in batches for certain websocket topics. Can we pipe the initial timestamp all the way to the sendMessage function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should add a message_received_timestamp field to

export interface OutgoingMessage {
  type: OutgoingMessageType;
  connection_id: string;
  message_id: number;
}
```?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, the batches happen at periodic intervals so not necessary to pipe it through.

Does it make more sense to have the stat be end - Number(message.headers?.message_received_timestamp) (after calling forwardMessage) rather than start - Number(message.headers?.message_received_timestamp)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

const end: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.forward_message`,
end - start,
end - startForwardMessage,
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
channel: String(channel),
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
startForwardMessage - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic,
event_type: String(message.headers?.event_type),
},
);
}
}

public forwardMessage(message: MessageToForward): void {
Expand Down
7 changes: 4 additions & 3 deletions indexer/services/vulcan/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
} from '@dydxprotocol-indexer/kafka';
import { OrderbookMessageContents, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres';
import { OffChainUpdateV1, OrderbookMessage, RedisOrder } from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import { OrderbookSide } from 'src/lib/types';

import { orderSideToOrderbookSide } from './helpers';
Expand All @@ -15,11 +16,11 @@ export abstract class Handler {
this.txHash = txHash;
}

protected abstract handle(update: OffChainUpdateV1): Promise<void>;
protected abstract handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void>;

// TODO(DEC-1251): Add stats for message handling.
public async handleUpdate(update: OffChainUpdateV1): Promise<void> {
return this.handle(update);
public async handleUpdate(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
return this.handle(update, headers);
}

protected logAndThrowParseMessageError(
Expand Down
16 changes: 13 additions & 3 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
Expand All @@ -51,7 +51,7 @@ import { convertToRedisOrder } from './helpers';
* being greater than or equal to the expiry of the order in the OrderPlace message, return
*/
export class OrderPlaceHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderPlaceHandler#handle',
message: 'Received OffChainUpdate with OrderPlace.',
Expand Down Expand Up @@ -136,7 +136,7 @@ export class OrderPlaceHandler extends Handler {
});
throw new Error(`Stateful order not found in database: ${orderUuid}`);
}
await this.sendCachedOrderUpdate(orderUuid);
await this.sendCachedOrderUpdate(orderUuid, headers);
}
const subaccountMessage: Message = {
value: createSubaccountWebsocketMessage(
Expand All @@ -145,6 +145,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
placementStatus,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}
Expand All @@ -158,6 +161,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
Expand Down Expand Up @@ -320,6 +326,7 @@ export class OrderPlaceHandler extends Handler {
*/
protected async sendCachedOrderUpdate(
orderId: string,
headers: IHeaders,
): Promise<void> {
const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache
.removeStatefulOrderUpdate(
Expand All @@ -337,6 +344,9 @@ export class OrderPlaceHandler extends Handler {
value: Buffer.from(
Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()),
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN);
}
Expand Down
24 changes: 18 additions & 6 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { Big } from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
Expand Down Expand Up @@ -68,7 +68,7 @@ import { getStateRemainingQuantums } from './helpers';
*/
export class OrderRemoveHandler extends Handler {
// eslint-disable-next-line @typescript-eslint/require-await
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderRemoveHandler#handle',
message: 'Received OffChainUpdate with OrderRemove.',
Expand Down Expand Up @@ -131,11 +131,11 @@ export class OrderRemoveHandler extends Handler {
}

if (this.isStatefulOrderCancelation(orderRemove)) {
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult);
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult, headers);
return;
}

await this.handleOrderRemoval(orderRemove, removeOrderResult);
await this.handleOrderRemoval(orderRemove, removeOrderResult, headers);
}

protected validateOrderRemove(orderRemove: OrderRemoveV1): void {
Expand Down Expand Up @@ -193,6 +193,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleStatefulOrderCancelation(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
const order: OrderFromDatabase | undefined = await runFuncWithTimingStat(
OrderTable.findById(
Expand Down Expand Up @@ -230,6 +231,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket.ticker,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);

Expand All @@ -241,7 +245,7 @@ export class OrderRemoveHandler extends Handler {
removeOrderResult.removed &&
removeOrderResult.restingOnBook === true &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}

}
Expand All @@ -259,6 +263,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleOrderRemoval(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
if (!removeOrderResult.removed) {
logger.info({
Expand Down Expand Up @@ -306,6 +311,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};

if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult, stateRemainingQuantums)) {
Expand All @@ -322,7 +330,7 @@ export class OrderRemoveHandler extends Handler {
!remainingQuantums.eq('0') &&
removeOrderResult.restingOnBook !== false &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}
// TODO: consolidate remove handler logic into a single lua script.
await this.addOrderToCanceledOrdersCache(
Expand All @@ -339,6 +347,7 @@ export class OrderRemoveHandler extends Handler {
protected async updateOrderbook(
removeOrderResult: RemoveOrderResult,
perpetualMarket: PerpetualMarketFromDatabase,
headers: IHeaders,
): Promise<void> {
const updatedQuantums: number = await runFuncWithTimingStat(
this.updatePriceLevelsCache(
Expand All @@ -352,6 +361,9 @@ export class OrderRemoveHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
Expand All @@ -51,7 +51,7 @@ import { Handler } from './handler';
* price level is capped to the size of the order in quantums
*/
export class OrderUpdateHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderUpdateHandler#handle',
message: 'Received OffChainUpdate with OrderUpdate.',
Expand Down Expand Up @@ -171,6 +171,9 @@ export class OrderUpdateHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
Expand Down
26 changes: 25 additions & 1 deletion indexer/services/vulcan/src/lib/on-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved
start - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
},
);
}

const messageValue: Buffer = message.value;
const offset: string = message.offset;
let update: OffChainUpdateV1;
Expand All @@ -86,7 +98,19 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
const handler: Handler = new (getHandler(update))!(
getTransactionHashFromHeaders(message.headers),
);
await handler.handleUpdate(update);
await handler.handleUpdate(update, message.headers ?? {});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also emit a stat here comparing to message_received_timestamp ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already do that here in the same code block right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's before handling the message though

const postProcessingTime: number = Date.now();
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received_post_processing`,
postProcessingTime - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
},
);
}

success = true;
} catch (error) {
Expand Down
Loading