diff --git a/indexer/packages/v4-protos/src/index.ts b/indexer/packages/v4-protos/src/index.ts index 8b081abbf9..6d05ae5fc4 100644 --- a/indexer/packages/v4-protos/src/index.ts +++ b/indexer/packages/v4-protos/src/index.ts @@ -16,4 +16,3 @@ export * from './codegen/google/protobuf/timestamp'; export * from './codegen/dydxprotocol/indexer/protocol/v1/clob'; export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount'; export * from './codegen/dydxprotocol/indexer/shared/removal_reason'; -export * from './utils'; diff --git a/indexer/packages/v4-protos/src/utils.ts b/indexer/packages/v4-protos/src/utils.ts deleted file mode 100644 index dfbd71f77d..0000000000 --- a/indexer/packages/v4-protos/src/utils.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { Timestamp } from './codegen/google/protobuf/timestamp'; - -export const MILLIS_IN_NANOS: number = 1_000_000; -export const SECONDS_IN_MILLIS: number = 1_000; -export function protoTimestampToDate( - protoTime: Timestamp, -): Date { - const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS + - Math.floor(protoTime.nanos / MILLIS_IN_NANOS); - - return new Date(timeInMillis); -} diff --git a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts index 9eef8f5bca..7b87fc196b 100644 --- a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts @@ -50,7 +50,6 @@ import { IndexerOrderId, PerpetualMarketCreateEventV1, DeleveragingEventV1, - protoTimestampToDate, } from '@dydxprotocol-indexer/v4-protos'; import { PerpetualMarketType, @@ -64,6 +63,7 @@ import { generatePerpetualMarketMessage, generatePerpetualPositionsContents, } from '../../src/helpers/kafka-helper'; +import { protoTimestampToDate } from '../../src/lib/helper'; import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types'; // TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'. diff --git a/indexer/services/ender/src/lib/helper.ts b/indexer/services/ender/src/lib/helper.ts index 4478eb0079..a0b639198e 100644 --- a/indexer/services/ender/src/lib/helper.ts +++ b/indexer/services/ender/src/lib/helper.ts @@ -8,6 +8,7 @@ import { import { IndexerTendermintEvent, IndexerTendermintEvent_BlockEvent, + Timestamp, OrderFillEventV1, MarketEventV1, SubaccountUpdateEventV1, @@ -28,6 +29,10 @@ import Big from 'big.js'; import _ from 'lodash'; import { DateTime } from 'luxon'; +import { + MILLIS_IN_NANOS, + SECONDS_IN_MILLIS, +} from '../constants'; import { AnnotatedSubaccountMessage, DydxIndexerSubtypes, @@ -65,6 +70,15 @@ export function convertToSubaccountMessage( return subaccountMessage; } +export function protoTimestampToDate( + protoTime: Timestamp, +): Date { + const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS + + Math.floor(protoTime.nanos / MILLIS_IN_NANOS); + + return new Date(timeInMillis); +} + export function dateToDateTime( protoTime: Date, ): DateTime { diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index abe08d8c57..aa15ba7730 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -3,7 +3,6 @@ import { logger, InfoObject, safeJsonStringify, - STATS_NO_SAMPLING, } from '@dydxprotocol-indexer/base'; import { updateOnMessageFunction } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; @@ -86,10 +85,9 @@ export class MessageForwarder { } public onMessage(topic: string, message: KafkaMessage): void { - const start: number = Date.now(); stats.timing( `${config.SERVICE_NAME}.message_time_in_queue`, - start - Number(message.timestamp), + Date.now() - Number(message.timestamp), config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { topic, @@ -127,31 +125,18 @@ export class MessageForwarder { return; } - const startForwardMessage: number = Date.now(); + const start: number = Date.now(); this.forwardMessage(messageToForward); const end: number = Date.now(); stats.timing( `${config.SERVICE_NAME}.forward_message`, - end - startForwardMessage, + end - start, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { topic, channel: String(channel), }, ); - - const originalMessageTimestamp = message.headers?.message_received_timestamp; - if (originalMessageTimestamp !== undefined) { - stats.timing( - `${config.SERVICE_NAME}.message_time_since_received`, - startForwardMessage - Number(originalMessageTimestamp), - STATS_NO_SAMPLING, - { - topic, - event_type: String(message.headers?.event_type), - }, - ); - } } public forwardMessage(message: MessageToForward): void { diff --git a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts index ceab04794c..d26015e30e 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts @@ -62,7 +62,6 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; import { OrderbookSide } from '../../src/lib/types'; import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser'; -import { defaultKafkaHeaders } from '../helpers/constants'; import config from '../../src/config'; jest.mock('@dydxprotocol-indexer/base', () => ({ @@ -197,12 +196,6 @@ describe('order-place-handler', () => { const replacementMessageIoc: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())), ); - [replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional, - replacementMessageFok, replacementMessageIoc].forEach((message) => { - // eslint-disable-next-line no-param-reassign - message.headers = defaultKafkaHeaders; - }); - const dbDefaultOrder: OrderFromDatabase = { ...testConstants.defaultOrder, id: testConstants.defaultOrderId, @@ -1232,11 +1225,7 @@ function expectWebsocketMessagesSent( version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, }); - expectWebsocketSubaccountMessage( - producerSendSpy.mock.calls[callIndex][0], - subaccountMessage, - defaultKafkaHeaders, - ); + expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage); callIndex += 1; } diff --git a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts index e5c242f510..b0b00075f2 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts @@ -5,9 +5,6 @@ import { STATS_FUNCTION_NAME, wrapBackgroundTask, } from '@dydxprotocol-indexer/base'; -import { - defaultTime, -} from '../helpers/constants'; import { synchronizeWrapBackgroundTask } from '@dydxprotocol-indexer/dev'; import { ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION, @@ -55,16 +52,14 @@ import { OrderRemoveV1_OrderRemovalStatus, RedisOrder, SubaccountMessage, - protoTimestampToDate, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { IHeaders, ProducerRecord } from 'kafkajs'; +import { ProducerRecord } from 'kafkajs'; import { DateTime } from 'luxon'; import { OrderRemoveHandler } from '../../src/handlers/order-remove-handler'; import { OrderbookSide } from '../../src/lib/types'; import { redisClient } from '../../src/helpers/redis/redis-controller'; - import { expectCanceledOrderStatus, expectOpenOrderIds, @@ -138,10 +133,6 @@ describe('OrderRemoveHandler', () => { timeInForce: TimeInForce.IOC, }; - const defaultKafkaHeaders: IHeaders = { - message_received_timestamp: String(protoTimestampToDate(defaultTime)), - }; - it.each([ [ { @@ -188,10 +179,7 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(orderRemoveJson); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await expect(orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - )).rejects.toThrow( + await expect(orderRemoveHandler.handleUpdate(offChainUpdate)).rejects.toThrow( new ParseMessageError(errorMessage), ); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -206,10 +194,7 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ at: 'orderRemoveHandler#handleOrderRemoval', @@ -230,10 +215,7 @@ describe('OrderRemoveHandler', () => { const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); const ticker: string = testConstants.defaultPerpetualMarket.ticker; expect(logger.error).toHaveBeenCalledWith({ @@ -338,10 +320,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -491,10 +470,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.BEST_EFFORT_CANCELED), @@ -628,10 +604,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED), @@ -769,10 +742,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.CANCELED), @@ -925,10 +895,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderStatus(expectedOrderUuid, removedOrder.status), @@ -1011,10 +978,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderStatus(expectedOrderUuid, OrderStatus.FILLED), @@ -1050,10 +1014,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -1101,10 +1062,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); // Subaccounts message is sent first followed by orderbooks message const subaccountContents: SubaccountMessageContents = { @@ -1204,10 +1162,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ // orderbook should not be affected, so it will be set to defaultQuantums @@ -1324,10 +1279,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -1471,10 +1423,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); await Promise.all([ expectOrderbookLevelCache( @@ -1596,10 +1545,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); // orderbook level reduced by defaultQuantums const remainingOrderbookLevel: string = Big( @@ -1730,10 +1676,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); // orderbook level should not be reduced const remainingOrderbookLevel: string = Big( @@ -1795,10 +1738,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -1865,10 +1805,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(producerSendSpy).not.toHaveBeenCalled(); expect(stats.increment).toHaveBeenCalledWith('vulcan.indexer_expired_order_not_found', 1); @@ -1941,10 +1878,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(producerSendSpy).not.toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ @@ -2006,10 +1940,7 @@ describe('OrderRemoveHandler', () => { const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); - await orderRemoveHandler.handleUpdate( - offChainUpdate, - defaultKafkaHeaders, - ); + await orderRemoveHandler.handleUpdate(offChainUpdate); expect(producerSendSpy).not.toHaveBeenCalled(); expect( @@ -2113,11 +2044,7 @@ describe('OrderRemoveHandler', () => { if (expectedSubaccountMessage !== undefined) { const subaccountProducerRecord: ProducerRecord = producerSendSpy.mock.calls[0][0]; - expectWebsocketSubaccountMessage( - subaccountProducerRecord, - expectedSubaccountMessage, - defaultKafkaHeaders, - ); + expectWebsocketSubaccountMessage(subaccountProducerRecord, expectedSubaccountMessage); } if (expectedOrderbookMessage !== undefined) { diff --git a/indexer/services/vulcan/__tests__/helpers/constants.ts b/indexer/services/vulcan/__tests__/helpers/constants.ts deleted file mode 100644 index 5b31462e29..0000000000 --- a/indexer/services/vulcan/__tests__/helpers/constants.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { - MILLIS_IN_NANOS, - SECONDS_IN_MILLIS, - Timestamp, - protoTimestampToDate, -} from '@dydxprotocol-indexer/v4-protos'; -import { IHeaders } from 'kafkajs'; -import Long from 'long'; -import { DateTime } from 'luxon'; - -const defaultDateTime: DateTime = DateTime.utc(2022, 6, 1, 12, 1, 1, 2); -export const defaultTime: Timestamp = { - seconds: Long.fromValue(Math.floor(defaultDateTime.toSeconds()), true), - nanos: (defaultDateTime.toMillis() % SECONDS_IN_MILLIS) * MILLIS_IN_NANOS, -}; - -export const defaultKafkaHeaders: IHeaders = { - message_received_timestamp: String(protoTimestampToDate(defaultTime)), -}; diff --git a/indexer/services/vulcan/__tests__/helpers/helpers.ts b/indexer/services/vulcan/__tests__/helpers/helpers.ts index f648a0989d..370949bdf0 100644 --- a/indexer/services/vulcan/__tests__/helpers/helpers.ts +++ b/indexer/services/vulcan/__tests__/helpers/helpers.ts @@ -13,7 +13,6 @@ import { KafkaMessage } from 'kafkajs'; import { redisClient } from '../../src/helpers/redis/redis-controller'; import { onMessage } from '../../src/lib/on-message'; import { DydxRecordHeaderKeys } from '../../src/lib/types'; -import { defaultKafkaHeaders } from './constants'; export async function handleInitialOrderPlace( orderPlace: redisTestConstants.OffChainUpdateOrderPlaceUpdateMessage, @@ -24,7 +23,6 @@ export async function handleInitialOrderPlace( const message: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())), ); - message.headers = defaultKafkaHeaders; await onMessage(message); } @@ -38,7 +36,6 @@ export async function handleOrderUpdate( const message: KafkaMessage = createKafkaMessage( Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(update).finish())), ); - message.headers = defaultKafkaHeaders; await onMessage(message); } diff --git a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts index f97ff27ace..79a2eee145 100644 --- a/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts +++ b/indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts @@ -1,21 +1,18 @@ import { KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { OffChainUpdateV1, OrderbookMessage, SubaccountMessage } from '@dydxprotocol-indexer/v4-protos'; -import { IHeaders, ProducerRecord } from 'kafkajs'; +import { ProducerRecord } from 'kafkajs'; export function expectWebsocketSubaccountMessage( subaccountProducerRecord: ProducerRecord, expectedSubaccountMessage: SubaccountMessage, - expectedHeaders: IHeaders, ): void { expect(subaccountProducerRecord.topic).toEqual(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); const subaccountMessageValueBinary: Uint8Array = new Uint8Array( subaccountProducerRecord.messages[0].value as Buffer, ); - const headers: IHeaders | undefined = subaccountProducerRecord.messages[0].headers; const subaccountMessage: SubaccountMessage = SubaccountMessage.decode( subaccountMessageValueBinary, ); - expect(headers).toEqual(expectedHeaders); expect(subaccountMessage).toEqual(expectedSubaccountMessage); } diff --git a/indexer/services/vulcan/__tests__/lib/on-message.test.ts b/indexer/services/vulcan/__tests__/lib/on-message.test.ts index 254d781dc2..27167a7143 100644 --- a/indexer/services/vulcan/__tests__/lib/on-message.test.ts +++ b/indexer/services/vulcan/__tests__/lib/on-message.test.ts @@ -64,7 +64,7 @@ describe('onMessage', () => { await onMessage(message); expect(handler).toHaveBeenCalledTimes(1); - expect(handleUpdateMock).toHaveBeenCalledWith(update, message.headers ?? {}); + expect(handleUpdateMock).toHaveBeenCalledWith(update); expect(handleUpdateMock).toHaveBeenCalledTimes(1); expect(stats.increment).toHaveBeenCalledWith('vulcan.received_kafka_message', 1); diff --git a/indexer/services/vulcan/src/handlers/handler.ts b/indexer/services/vulcan/src/handlers/handler.ts index 5c1a859b4b..7c409d00a2 100644 --- a/indexer/services/vulcan/src/handlers/handler.ts +++ b/indexer/services/vulcan/src/handlers/handler.ts @@ -4,7 +4,6 @@ import { } from '@dydxprotocol-indexer/kafka'; import { OrderbookMessageContents, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres'; import { OffChainUpdateV1, OrderbookMessage, RedisOrder } from '@dydxprotocol-indexer/v4-protos'; -import { IHeaders } from 'kafkajs'; import { OrderbookSide } from 'src/lib/types'; import { orderSideToOrderbookSide } from './helpers'; @@ -16,11 +15,11 @@ export abstract class Handler { this.txHash = txHash; } - protected abstract handle(update: OffChainUpdateV1, headers: IHeaders): Promise; + protected abstract handle(update: OffChainUpdateV1): Promise; // TODO(DEC-1251): Add stats for message handling. - public async handleUpdate(update: OffChainUpdateV1, headers: IHeaders): Promise { - return this.handle(update, headers); + public async handleUpdate(update: OffChainUpdateV1): Promise { + return this.handle(update); } protected logAndThrowParseMessageError( diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index ee86d95cba..41449dc2fd 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -32,7 +32,7 @@ import { RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { IHeaders, Message } from 'kafkajs'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -52,7 +52,7 @@ import { Handler } from './handler'; * being greater than or equal to the expiry of the order in the OrderPlace message, return */ export class OrderPlaceHandler extends Handler { - protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { + protected async handle(update: OffChainUpdateV1): Promise { logger.info({ at: 'OrderPlaceHandler#handle', message: 'Received OffChainUpdate with OrderPlace.', @@ -142,7 +142,7 @@ export class OrderPlaceHandler extends Handler { }); throw new Error(`Stateful order not found in database: ${orderUuid}`); } - await this.sendCachedOrderUpdate(orderUuid, headers); + await this.sendCachedOrderUpdate(orderUuid); } const subaccountMessage: Message = { value: createSubaccountWebsocketMessage( @@ -151,9 +151,6 @@ export class OrderPlaceHandler extends Handler { perpetualMarket, placementStatus, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); } @@ -167,9 +164,6 @@ export class OrderPlaceHandler extends Handler { perpetualMarket, updatedQuantums, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } @@ -340,7 +334,6 @@ export class OrderPlaceHandler extends Handler { */ protected async sendCachedOrderUpdate( orderId: string, - headers: IHeaders, ): Promise { const cachedOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache .removeStatefulOrderUpdate( @@ -358,10 +351,6 @@ export class OrderPlaceHandler extends Handler { value: Buffer.from( Uint8Array.from(OffChainUpdateV1.encode({ orderUpdate: cachedOrderUpdate }).finish()), ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - event_type: String(headers.event_type), - }, }; sendMessageWrapper(orderUpdateMessage, KafkaTopics.TO_VULCAN); } diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index 6ee710a605..428c572780 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -38,7 +38,7 @@ import { SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import { Big } from 'big.js'; -import { IHeaders, Message } from 'kafkajs'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -68,7 +68,7 @@ import { getStateRemainingQuantums } from './helpers'; */ export class OrderRemoveHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await - protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { + protected async handle(update: OffChainUpdateV1): Promise { logger.info({ at: 'OrderRemoveHandler#handle', message: 'Received OffChainUpdate with OrderRemove.', @@ -131,11 +131,11 @@ export class OrderRemoveHandler extends Handler { } if (this.isStatefulOrderCancelation(orderRemove)) { - await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult, headers); + await this.handleStatefulOrderCancelation(orderRemove, removeOrderResult); return; } - await this.handleOrderRemoval(orderRemove, removeOrderResult, headers); + await this.handleOrderRemoval(orderRemove, removeOrderResult); } protected validateOrderRemove(orderRemove: OrderRemoveV1): void { @@ -193,7 +193,6 @@ export class OrderRemoveHandler extends Handler { protected async handleStatefulOrderCancelation( orderRemove: OrderRemoveV1, removeOrderResult: RemoveOrderResult, - headers: IHeaders, ): Promise { const order: OrderFromDatabase | undefined = await runFuncWithTimingStat( OrderTable.findById( @@ -231,9 +230,6 @@ export class OrderRemoveHandler extends Handler { orderRemove, perpetualMarket.ticker, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); @@ -245,7 +241,7 @@ export class OrderRemoveHandler extends Handler { removeOrderResult.removed && removeOrderResult.restingOnBook === true && !requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) { - await this.updateOrderbook(removeOrderResult, perpetualMarket, headers); + await this.updateOrderbook(removeOrderResult, perpetualMarket); } } @@ -263,7 +259,6 @@ export class OrderRemoveHandler extends Handler { protected async handleOrderRemoval( orderRemove: OrderRemoveV1, removeOrderResult: RemoveOrderResult, - headers: IHeaders, ): Promise { if (!removeOrderResult.removed) { logger.info({ @@ -311,9 +306,6 @@ export class OrderRemoveHandler extends Handler { orderRemove, perpetualMarket, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; if (this.shouldSendSubaccountMessage(orderRemove, removeOrderResult, stateRemainingQuantums)) { @@ -330,7 +322,7 @@ export class OrderRemoveHandler extends Handler { !remainingQuantums.eq('0') && removeOrderResult.restingOnBook !== false && !requiresImmediateExecution(removeOrderResult.removedOrder!.order!.timeInForce)) { - await this.updateOrderbook(removeOrderResult, perpetualMarket, headers); + await this.updateOrderbook(removeOrderResult, perpetualMarket); } // TODO: consolidate remove handler logic into a single lua script. await this.addOrderToCanceledOrdersCache( @@ -347,7 +339,6 @@ export class OrderRemoveHandler extends Handler { protected async updateOrderbook( removeOrderResult: RemoveOrderResult, perpetualMarket: PerpetualMarketFromDatabase, - headers: IHeaders, ): Promise { const updatedQuantums: number = await runFuncWithTimingStat( this.updatePriceLevelsCache( @@ -361,9 +352,6 @@ export class OrderRemoveHandler extends Handler { perpetualMarket, updatedQuantums, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } diff --git a/indexer/services/vulcan/src/handlers/order-update-handler.ts b/indexer/services/vulcan/src/handlers/order-update-handler.ts index 5c0da95f9a..950b7ee73f 100644 --- a/indexer/services/vulcan/src/handlers/order-update-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-update-handler.ts @@ -24,7 +24,7 @@ import { RedisOrder, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; -import { IHeaders, Message } from 'kafkajs'; +import { Message } from 'kafkajs'; import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; @@ -51,7 +51,7 @@ import { Handler } from './handler'; * price level is capped to the size of the order in quantums */ export class OrderUpdateHandler extends Handler { - protected async handle(update: OffChainUpdateV1, headers: IHeaders): Promise { + protected async handle(update: OffChainUpdateV1): Promise { logger.info({ at: 'OrderUpdateHandler#handle', message: 'Received OffChainUpdate with OrderUpdate.', @@ -171,9 +171,6 @@ export class OrderUpdateHandler extends Handler { perpetualMarket, updatedQuantums, ), - headers: { - message_received_timestamp: headers.message_received_timestamp, - }, }; sendMessageWrapper(orderbookMessage, KafkaTopics.TO_WEBSOCKETS_ORDERBOOKS); } diff --git a/indexer/services/vulcan/src/lib/on-message.ts b/indexer/services/vulcan/src/lib/on-message.ts index 59c7b42b7b..10c49ecf39 100644 --- a/indexer/services/vulcan/src/lib/on-message.ts +++ b/indexer/services/vulcan/src/lib/on-message.ts @@ -62,19 +62,6 @@ export async function onMessage(message: KafkaMessage): Promise { }, ); - const originalMessageTimestamp = message.headers?.message_received_timestamp; - if (originalMessageTimestamp !== undefined) { - stats.timing( - `${config.SERVICE_NAME}.message_time_since_received`, - start - Number(originalMessageTimestamp), - STATS_NO_SAMPLING, - { - topic: KafkaTopics.TO_VULCAN, - event_type: String(message.headers?.event_type), - }, - ); - } - const messageValue: Buffer = message.value; const offset: string = message.offset; let update: OffChainUpdateV1; @@ -99,20 +86,7 @@ export async function onMessage(message: KafkaMessage): Promise { const handler: Handler = new (getHandler(update))!( getTransactionHashFromHeaders(message.headers), ); - await handler.handleUpdate(update, message.headers ?? {}); - - const postProcessingTime: number = Date.now(); - if (originalMessageTimestamp !== undefined) { - stats.timing( - `${config.SERVICE_NAME}.message_time_since_received_post_processing`, - postProcessingTime - Number(originalMessageTimestamp), - STATS_NO_SAMPLING, - { - topic: KafkaTopics.TO_VULCAN, - event_type: String(message.headers?.event_type), - }, - ); - } + await handler.handleUpdate(update); success = true; } catch (error) {