-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-708] Indexer track e2e latency #1237
Changes from 6 commits
cbc7a99
8329a45
dc837e0
57c2f4b
b404164
42b236f
ee03fee
01cfc7e
2622bc8
a755100
d590723
3bd29ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,19 @@ export async function onMessage(message: KafkaMessage): Promise<void> { | |
}, | ||
); | ||
|
||
const originalMessageTimestamp = message.headers?.message_received_timestamp; | ||
if (originalMessageTimestamp !== undefined) { | ||
stats.timing( | ||
`${config.SERVICE_NAME}.message_time_since_received`, | ||
jonfung-dydx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
start - Number(originalMessageTimestamp), | ||
STATS_NO_SAMPLING, | ||
{ | ||
topic: KafkaTopics.TO_VULCAN, | ||
event_type: String(message.headers?.event_type), | ||
}, | ||
); | ||
} | ||
|
||
const messageValue: Buffer = message.value; | ||
const offset: string = message.offset; | ||
let update: OffChainUpdateV1; | ||
|
@@ -86,7 +99,20 @@ export async function onMessage(message: KafkaMessage): Promise<void> { | |
const handler: Handler = new (getHandler(update))!( | ||
getTransactionHashFromHeaders(message.headers), | ||
); | ||
await handler.handleUpdate(update); | ||
await handler.handleUpdate(update, message.headers ?? {}); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also emit a stat here comparing to message_received_timestamp ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already do that here in the same code block right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's before handling the message though |
||
const postProcessingTime: number = Date.now(); | ||
if (originalMessageTimestamp !== undefined) { | ||
stats.timing( | ||
`${config.SERVICE_NAME}.message_time_since_received_post_processing`, | ||
postProcessingTime - Number(originalMessageTimestamp), | ||
STATS_NO_SAMPLING, | ||
{ | ||
topic: KafkaTopics.TO_VULCAN, | ||
event_type: String(message.headers?.event_type), | ||
}, | ||
); | ||
} | ||
|
||
success = true; | ||
} catch (error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function seems to send out websocket messages in batches for certain websocket topics. Can we pipe the initial timestamp all the way to the
sendMessage
function?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should add a message_received_timestamp field to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, the batches happen at periodic intervals so not necessary to pipe it through.
Does it make more sense to have the stat be
end - Number(message.headers?.message_received_timestamp)
(after callingforwardMessage
) rather thanstart - Number(message.headers?.message_received_timestamp)
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done