Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-708] Indexer track e2e latency #1237

Merged
merged 12 commits into from
Mar 28, 2024
Next Next commit
fwd through message times
jonfung-dydx committed Mar 25, 2024
commit cbc7a99f25d39ca1f13eeb60821e6a572dbc9c57
20 changes: 17 additions & 3 deletions indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import {
logger,
InfoObject,
safeJsonStringify,
STATS_NO_SAMPLING,
} from '@dydxprotocol-indexer/base';
import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
@@ -85,15 +86,28 @@ export class MessageForwarder {
}

public onMessage(topic: string, message: KafkaMessage): void {
const start: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.message_time_in_queue`,
Date.now() - Number(message.timestamp),
start - Number(message.timestamp),
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
start - Number(message.headers?.message_received_timestamp),
STATS_NO_SAMPLING,
{
topic,
},
);
}

const loggerAt: string = 'MessageForwarder#onMessage';
const errProps: Partial<InfoObject> = {
topic,
@@ -125,12 +139,12 @@ export class MessageForwarder {
return;
}

const start: number = Date.now();
const startForwardMessage: number = Date.now();
this.forwardMessage(messageToForward);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems to send out websocket messages in batches for certain websocket topics. Can we pipe the initial timestamp all the way to the sendMessage function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should add a message_received_timestamp field to

export interface OutgoingMessage {
  type: OutgoingMessageType;
  connection_id: string;
  message_id: number;
}
```?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, the batches happen at periodic intervals so not necessary to pipe it through.

Does it make more sense to have the stat be end - Number(message.headers?.message_received_timestamp) (after calling forwardMessage) rather than start - Number(message.headers?.message_received_timestamp)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

const end: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.forward_message`,
end - start,
end - startForwardMessage,
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
7 changes: 4 additions & 3 deletions indexer/services/vulcan/src/handlers/handler.ts
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import {
} from '@dydxprotocol-indexer/kafka';
import { OrderbookMessageContents, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres';
import { OffChainUpdateV1, OrderbookMessage, RedisOrder } from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import { OrderbookSide } from 'src/lib/types';

import { orderSideToOrderbookSide } from './helpers';
@@ -15,11 +16,11 @@ export abstract class Handler {
this.txHash = txHash;
}

protected abstract handle(update: OffChainUpdateV1): Promise<void>;
protected abstract handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void>;

// TODO(DEC-1251): Add stats for message handling.
public async handleUpdate(update: OffChainUpdateV1): Promise<void> {
return this.handle(update);
public async handleUpdate(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
return this.handle(update, headers);
}

protected logAndThrowParseMessageError(
16 changes: 13 additions & 3 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -51,7 +51,7 @@ import { convertToRedisOrder } from './helpers';
* being greater than or equal to the expiry of the order in the OrderPlace message, return
*/
export class OrderPlaceHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderPlaceHandler#handle',
message: 'Received OffChainUpdate with OrderPlace.',
@@ -136,7 +136,7 @@ export class OrderPlaceHandler extends Handler {
});
throw new Error(`Stateful order not found in database: ${orderUuid}`);
}
await this.sendCachedOrderUpdate(orderUuid);
await this.sendCachedOrderUpdate(orderUuid, headers);
}
const subaccountMessage: Message = {
value: createSubaccountWebsocketMessage(
@@ -145,6 +145,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
placementStatus,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}
@@ -158,6 +161,9 @@ export class OrderPlaceHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
@@ -320,6 +326,7 @@ export class OrderPlaceHandler extends Handler {
*/
protected async sendCachedOrderUpdate(
orderId: string,
headers: IHeaders,
): Promise<void> {
const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache
.removeStatefulOrderUpdate(
@@ -337,6 +344,9 @@ export class OrderPlaceHandler extends Handler {
value: Buffer.from(
Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()),
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN);
}
24 changes: 18 additions & 6 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ import {
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { Big } from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -68,7 +68,7 @@ import { getStateRemainingQuantums } from './helpers';
*/
export class OrderRemoveHandler extends Handler {
// eslint-disable-next-line @typescript-eslint/require-await
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderRemoveHandler#handle',
message: 'Received OffChainUpdate with OrderRemove.',
@@ -131,11 +131,11 @@ export class OrderRemoveHandler extends Handler {
}

if (this.isStatefulOrderCancelation(orderRemove)) {
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult);
await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult, headers);
return;
}

await this.handleOrderRemoval(orderRemove, removeOrderResult);
await this.handleOrderRemoval(orderRemove, removeOrderResult, headers);
}

protected validateOrderRemove(orderRemove: OrderRemoveV1): void {
@@ -193,6 +193,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleStatefulOrderCancelation(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
const order: OrderFromDatabase | undefined = await runFuncWithTimingStat(
OrderTable.findById(
@@ -230,6 +231,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket.ticker,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);

@@ -241,7 +245,7 @@ export class OrderRemoveHandler extends Handler {
removeOrderResult.removed &&
removeOrderResult.restingOnBook === true &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}

}
@@ -259,6 +263,7 @@ export class OrderRemoveHandler extends Handler {
protected async handleOrderRemoval(
orderRemove: OrderRemoveV1,
removeOrderResult: RemoveOrderResult,
headers: IHeaders,
): Promise<void> {
if (!removeOrderResult.removed) {
logger.info({
@@ -306,6 +311,9 @@ export class OrderRemoveHandler extends Handler {
orderRemove,
perpetualMarket,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};

if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult, stateRemainingQuantums)) {
@@ -322,7 +330,7 @@ export class OrderRemoveHandler extends Handler {
!remainingQuantums.eq('0') &&
removeOrderResult.restingOnBook !== false &&
!requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) {
await this.updateOrderbook(removeOrderResult, perpetualMarket);
await this.updateOrderbook(removeOrderResult, perpetualMarket, headers);
}
// TODO: consolidate remove handler logic into a single lua script.
await this.addOrderToCanceledOrdersCache(
@@ -339,6 +347,7 @@ export class OrderRemoveHandler extends Handler {
protected async updateOrderbook(
removeOrderResult: RemoveOrderResult,
perpetualMarket: PerpetualMarketFromDatabase,
headers: IHeaders,
): Promise<void> {
const updatedQuantums: number = await runFuncWithTimingStat(
this.updatePriceLevelsCache(
@@ -352,6 +361,9 @@ export class OrderRemoveHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import {
RedisOrder,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
import { IHeaders, Message } from 'kafkajs';

import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
@@ -51,7 +51,7 @@ import { Handler } from './handler';
* price level is capped to the size of the order in quantums
*/
export class OrderUpdateHandler extends Handler {
protected async handle(update: OffChainUpdateV1): Promise<void> {
protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise<void> {
logger.info({
at: 'OrderUpdateHandler#handle',
message: 'Received OffChainUpdate with OrderUpdate.',
@@ -171,6 +171,9 @@ export class OrderUpdateHandler extends Handler {
perpetualMarket,
updatedQuantums,
),
headers: {
message_received_timestamp: headers.message_received_timestamp,
},
};
sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS);
}
14 changes: 13 additions & 1 deletion indexer/services/vulcan/src/lib/on-message.ts
Original file line number Diff line number Diff line change
@@ -62,6 +62,18 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
start - Number(message.headers?.message_received_timestamp),
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
},
);
}

const messageValue: Buffer = message.value;
const offset: string = message.offset;
let update: OffChainUpdateV1;
@@ -86,7 +98,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
const handler: Handler = new (getHandler(update))!(
getTransactionHashFromHeaders(message.headers),
);
await handler.handleUpdate(update);
await handler.handleUpdate(update, message.headers ?? {});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also emit a stat here comparing to message_received_timestamp ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already do that here in the same code block right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's before handling the message though

success = true;
} catch (error) {