Skip to content

Commit

Permalink
[pinpoint-apm#223] commonStreamResponse's sequenceid per requestid of…
Browse files Browse the repository at this point in the history
… Active Thread Count

* DisableTrace negative transactionId sequence generator
It's Java Agent use transactionId's sequence use active request map key
  • Loading branch information
feelform committed Sep 26, 2024
1 parent 0faf843 commit 457e4a4
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 43 deletions.
30 changes: 12 additions & 18 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ class GrpcDataSender {
this.initializeProfilerClients(collectorIp, collectorTcpPort, config)
}

setCommandEchoCallArguments(callArguments) {
this.commandEchoCallArguments = callArguments
}

setCommandStreamActiveThreadCountCallArguments(callArguments) {
this.commandStreamActiveThreadCountCallArguments = callArguments
}

close() {
this.closeScheduler()
if (this.spanStream) {
Expand Down Expand Up @@ -145,10 +137,11 @@ class GrpcDataSender {
this.profilerClient = new services.ProfilerCommandServiceClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(), profilerBuilder.build())
}

makeCommandStream() {
makeCommandStream(callArguments) {
this.commandStream = new GrpcReadableStream(() => {
const writable = this.profilerClient.handleCommandV2()

let activeThreadCountSequenceId = 0
writable.on('data', (cmdRequest) => {
const requestId = cmdRequest.getRequestid()
const command = cmdRequest.getCommandCase()
Expand All @@ -166,12 +159,13 @@ class GrpcDataSender {
const message = cmdRequest.getCommandecho().getMessage()
cmdEchoResponse.setMessage(message)

this.sendCommandEcho(cmdEchoResponse, this.commandEchoCallArguments)
this.sendCommandEcho(cmdEchoResponse, callArguments)
},
// ActiveThreadCountStreamSocket.java
'ACTIVE_THREAD_COUNT': () => {
const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(requestId)
commonStreamResponse.setSequenceid(1)
commonStreamResponse.setSequenceid(++activeThreadCountSequenceId)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)
Expand All @@ -182,17 +176,17 @@ class GrpcDataSender {
commandActiveThreadCountResponse.addActivethreadcount(1)
commandActiveThreadCountResponse.setTimestamp(Date.now())

this.sendActiveThreadCount(commandActiveThreadCountResponse)
this.sendActiveThreadCount(commandActiveThreadCountResponse, callArguments)
}
})
})
return writable
})
}

makeActiveThreadCountStream() {
makeActiveThreadCountStream(callArguments) {
this.activeThreadCountStream = new GrpcReadableStream(() => {
const callArguments = guardCallArguments(this.commandStreamActiveThreadCountCallArguments)
callArguments = guardCallArguments(callArguments)
const metadata = callArguments.getMetadata()
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
Expand Down Expand Up @@ -361,9 +355,9 @@ class GrpcDataSender {
}
}

sendSupportedServicesCommand() {
sendSupportedServicesCommand(callArguments) {
if (!this.commandStream) {
this.makeCommandStream()
this.makeCommandStream(callArguments)
}

const pCmdMessage = CommandType.supportedServicesCommandMessage()
Expand All @@ -388,9 +382,9 @@ class GrpcDataSender {
})
}

sendActiveThreadCount(commandActiveThreadCountResponse) {
sendActiveThreadCount(commandActiveThreadCountResponse, callArguments) {
if (!this.activeThreadCountStream) {
this.makeActiveThreadCountStream()
this.makeActiveThreadCountStream(callArguments)
}
this.activeThreadCountStream.push(commandActiveThreadCountResponse)
}
Expand Down
7 changes: 4 additions & 3 deletions lib/context/id-generator.js → lib/context/span-id.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

'use strict'

class IdGenerator {
// SpanId.java in Java agent
class SpanId {
constructor () {
this.MAX_NUM = Number.MAX_SAFE_INTEGER
}
Expand All @@ -15,9 +16,9 @@ class IdGenerator {
return Math.floor(Math.random() * this.MAX_NUM)
}

stringValueOfNext() {
newSpanId() {
return this.next.toString()
}
}

module.exports = new IdGenerator()
module.exports = new SpanId()
4 changes: 2 additions & 2 deletions lib/context/trace-context.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
const Trace = require('./trace')
const TransactionId = require('./transaction-id')
const TraceId = require('./trace-id')
const IdGenerator = require('./id-generator')
const SpanId = require('./span-id')
const log = require('../utils/logger')
const sampler = require('../sampler/sampler')
const DisableTrace = require('./disable-trace')
Expand Down Expand Up @@ -65,7 +65,7 @@ class TraceContext {
return new DisableTrace()
}
const transactionId = new TransactionId(this.agentInfo.agentId, this.agentInfo.agentStartTime.toString())
const spanId = IdGenerator.stringValueOfNext()
const spanId = SpanId.newSpanId()
const traceId = new TraceId(transactionId, spanId)
return this.createTraceObject(traceId)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/instrumentation/http-shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
const endOfStream = require('end-of-stream')
const url = require('url')
const log = require('../utils/logger')
const IdGenerator = require('../context/id-generator')
const SpanId = require('../context/span-id')
const annotationKey = require('../constant/annotation-key')
const RequestHeaderUtils = require('../instrumentation/request-header-utils')
const AntPathMatcher = require('../utils/ant-path-matcher')
Expand Down Expand Up @@ -129,7 +129,7 @@ exports.traceOutgoingRequest = function (agent, moduleName) {
asyncEventRecorder.recordAttribute(annotationKey.HTTP_URL, httpURL)

const destinationId = host
const nextSpanId = IdGenerator.stringValueOfNext()
const nextSpanId = SpanId.newSpanId()
RequestHeaderUtils.write(req, agent, nextSpanId, destinationId)
asyncEventRecorder.recordNextSpanId(nextSpanId)
asyncEventRecorder.recordDestinationId(destinationId)
Expand Down
1 change: 1 addition & 0 deletions lib/instrumentation/request-header-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class RequestHeaderUtils {
return
}

// RequestTraceReader.newTraceId(TraceHeader traceHeader) in Java agent
const transactionId = TransactionId.toTransactionId(this.getHeader(request, PinpointHeader.HTTP_TRACE_ID))
if (!transactionId) {
return
Expand Down
1 change: 0 additions & 1 deletion lib/metric/active-thread-count.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

const Scheduler = require('../utils/scheduler')
const activeTrace = require('./active-trace')
const log = require('../utils/logger')

class ActiveThreadCount {
constructor(dataSender, streamChannelManager, enabled) {
Expand Down
25 changes: 14 additions & 11 deletions test/client/grpc-data-sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DataSource extends DataSourceCallCountable {
initializeProfilerClients() { }
}

test.skip('Should send span ', function (t) {
test('Should send span ', function (t) {
const expectedSpan = {
'traceId': {
'transactionId': {
Expand Down Expand Up @@ -301,7 +301,7 @@ test.skip('sendSpanChunk redis.SET.end', function (t) {
})
})

test.skip('sendSpanChunk redis.GET.end', (t) => {
test('sendSpanChunk redis.GET.end', (t) => {
let expectedSpanChunk = {
'agentId': 'express-node-sample-id',
'applicationName': 'express-node-sample-name',
Expand Down Expand Up @@ -420,7 +420,7 @@ test.skip('sendSpanChunk redis.GET.end', (t) => {
})
})

test.skip('sendSpan', (t) => {
test('sendSpan', (t) => {
let expectedSpanChunk = {
'traceId': {
'transactionId': {
Expand Down Expand Up @@ -805,7 +805,6 @@ const emptyResponseService = (call, callback) => {
}
})


const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt')
const previousAttempts = call.metadata.get('grpc-previous-rpc-attempts')
const callRequests = getCallRequests()
Expand Down Expand Up @@ -859,8 +858,7 @@ test('sendSupportedServicesCommand and commandEcho', (t) => {
t.equal(cmdEchoResponse.getMessage(), 'echo', 'echo message')
afterOne(t)
}).build()
dataSender.setCommandEchoCallArguments(callArguments)
dataSender.sendSupportedServicesCommand()
dataSender.sendSupportedServicesCommand(callArguments)
})

t.teardown(() => {
Expand All @@ -880,25 +878,30 @@ test('CommandStreamActiveThreadCount', (t) => {
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
dataSender = beforeSpecificOne(port, ProfilerDataSource)

let callCount = 0
dataCallbackOnServerCall = (data) => {
++callCount
const commonStreamResponse = data.getCommonstreamresponse()
t.equal(commonStreamResponse.getResponseid(), requestId, 'response id matches request id')
t.equal(commonStreamResponse.getSequenceid(), 1, 'sequenceid is 1')
t.equal(commonStreamResponse.getSequenceid(), callCount, `sequenceid is ${callCount}`)
t.equal(commonStreamResponse.getMessage().getValue(), '', 'message is empty')

t.equal(data.getHistogramschematype(), 2, 'histogram schema type')
t.equal(data.getActivethreadcountList()[0], 1, 'active thread count')
afterOne(t)

if (callCount == 2) {
afterOne(t)
}
}

const callArguments = new CallArgumentsBuilder(function (error, response) {
serverCallWriter(CommandType.activeThreadCount)
serverCallWriter(CommandType.activeThreadCount)
}).build()
dataSender.setCommandEchoCallArguments(callArguments)
dataSender.sendSupportedServicesCommand()
dataSender.sendSupportedServicesCommand(callArguments)
})
t.teardown(() => {
dataSender.close()
server.forceShutdown()
})
})
})
4 changes: 2 additions & 2 deletions test/client/grpc-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ class DataSourceCallCountable extends GrpcDataSender {
super.sendSpan(span)
}

sendSupportedServicesCommand() {
sendSupportedServicesCommand(callArguments) {
increaseCallCount()
super.sendSupportedServicesCommand()
super.sendSupportedServicesCommand(callArguments)
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/context/trace-id.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
const test = require('tape')
const TransactionId = require('../../lib/context/transaction-id')
const TraceId = require('../../lib/context/trace-id')
const IdGenerator = require('../../lib/context/id-generator')
const SpanId = require('../../lib/context/span-id')

test('Should create', function (t) {
t.plan(2)

const agentId = 'agent-for-dev'
const agentStartTime = Date.now()
const transactionId = new TransactionId(agentId, agentStartTime.toString())
const spanId = IdGenerator.stringValueOfNext()
const spanId = SpanId.newSpanId()
const traceId = new TraceId(transactionId, spanId)

t.ok(traceId)
Expand Down
4 changes: 2 additions & 2 deletions test/fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

const TransactionId = require('../lib/context/transaction-id')
const TraceId = require('../lib/context/trace-id')
const IdGenerator = require('../lib/context/id-generator')
const SpanId = require('../lib/context/span-id')
const shimmer = require('@pinpoint-apm/shimmer')
const testConfig= require('./pinpoint-config-test')
require('../lib/config').clear()
Expand All @@ -22,7 +22,7 @@ const getTransactionId = () => {
}

const getTraceId = (transactionId) => {
const spanId = IdGenerator.stringValueOfNext()
const spanId = SpanId.newSpanId()
return new TraceId(transactionId || getTransactionId(), spanId)
}

Expand Down

0 comments on commit 457e4a4

Please sign in to comment.