From 457e4a4a4805e8976f9598f9bd2e91367aa09c77 Mon Sep 17 00:00:00 2001 From: Yongseok Date: Thu, 26 Sep 2024 11:44:03 +0900 Subject: [PATCH] [#223] commonStreamResponse's sequenceid per requestid of Active Thread Count * DisableTrace negative transactionId sequence generator It's Java Agent use transactionId's sequence use active request map key --- lib/client/grpc-data-sender.js | 30 +++++++++------------ lib/context/{id-generator.js => span-id.js} | 7 ++--- lib/context/trace-context.js | 4 +-- lib/instrumentation/http-shared.js | 4 +-- lib/instrumentation/request-header-utils.js | 1 + lib/metric/active-thread-count.js | 1 - test/client/grpc-data-sender.test.js | 25 +++++++++-------- test/client/grpc-fixture.js | 4 +-- test/context/trace-id.test.js | 4 +-- test/fixture.js | 4 +-- 10 files changed, 41 insertions(+), 43 deletions(-) rename lib/context/{id-generator.js => span-id.js} (76%) diff --git a/lib/client/grpc-data-sender.js b/lib/client/grpc-data-sender.js index abaab1a6..2156a7d8 100644 --- a/lib/client/grpc-data-sender.js +++ b/lib/client/grpc-data-sender.js @@ -39,14 +39,6 @@ class GrpcDataSender { this.initializeProfilerClients(collectorIp, collectorTcpPort, config) } - setCommandEchoCallArguments(callArguments) { - this.commandEchoCallArguments = callArguments - } - - setCommandStreamActiveThreadCountCallArguments(callArguments) { - this.commandStreamActiveThreadCountCallArguments = callArguments - } - close() { this.closeScheduler() if (this.spanStream) { @@ -145,10 +137,11 @@ class GrpcDataSender { this.profilerClient = new services.ProfilerCommandServiceClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(), profilerBuilder.build()) } - makeCommandStream() { + makeCommandStream(callArguments) { this.commandStream = new GrpcReadableStream(() => { const writable = this.profilerClient.handleCommandV2() + let activeThreadCountSequenceId = 0 writable.on('data', (cmdRequest) => { const requestId = cmdRequest.getRequestid() const command = cmdRequest.getCommandCase() @@ -166,12 +159,13 @@ class GrpcDataSender { const message = cmdRequest.getCommandecho().getMessage() cmdEchoResponse.setMessage(message) - this.sendCommandEcho(cmdEchoResponse, this.commandEchoCallArguments) + this.sendCommandEcho(cmdEchoResponse, callArguments) }, + // ActiveThreadCountStreamSocket.java 'ACTIVE_THREAD_COUNT': () => { const commonStreamResponse = new cmdMessages.PCmdStreamResponse() commonStreamResponse.setResponseid(requestId) - commonStreamResponse.setSequenceid(1) + commonStreamResponse.setSequenceid(++activeThreadCountSequenceId) const stringValue = new wrappers.StringValue() stringValue.setValue('') commonStreamResponse.setMessage(stringValue) @@ -182,7 +176,7 @@ class GrpcDataSender { commandActiveThreadCountResponse.addActivethreadcount(1) commandActiveThreadCountResponse.setTimestamp(Date.now()) - this.sendActiveThreadCount(commandActiveThreadCountResponse) + this.sendActiveThreadCount(commandActiveThreadCountResponse, callArguments) } }) }) @@ -190,9 +184,9 @@ class GrpcDataSender { }) } - makeActiveThreadCountStream() { + makeActiveThreadCountStream(callArguments) { this.activeThreadCountStream = new GrpcReadableStream(() => { - const callArguments = guardCallArguments(this.commandStreamActiveThreadCountCallArguments) + callArguments = guardCallArguments(callArguments) const metadata = callArguments.getMetadata() let options = callArguments.getOptions() const callback = callArguments.getCallback() @@ -361,9 +355,9 @@ class GrpcDataSender { } } - sendSupportedServicesCommand() { + sendSupportedServicesCommand(callArguments) { if (!this.commandStream) { - this.makeCommandStream() + this.makeCommandStream(callArguments) } const pCmdMessage = CommandType.supportedServicesCommandMessage() @@ -388,9 +382,9 @@ class GrpcDataSender { }) } - sendActiveThreadCount(commandActiveThreadCountResponse) { + sendActiveThreadCount(commandActiveThreadCountResponse, callArguments) { if (!this.activeThreadCountStream) { - this.makeActiveThreadCountStream() + this.makeActiveThreadCountStream(callArguments) } this.activeThreadCountStream.push(commandActiveThreadCountResponse) } diff --git a/lib/context/id-generator.js b/lib/context/span-id.js similarity index 76% rename from lib/context/id-generator.js rename to lib/context/span-id.js index c3620af4..ae104742 100644 --- a/lib/context/id-generator.js +++ b/lib/context/span-id.js @@ -6,7 +6,8 @@ 'use strict' -class IdGenerator { +// SpanId.java in Java agent +class SpanId { constructor () { this.MAX_NUM = Number.MAX_SAFE_INTEGER } @@ -15,9 +16,9 @@ class IdGenerator { return Math.floor(Math.random() * this.MAX_NUM) } - stringValueOfNext() { + newSpanId() { return this.next.toString() } } -module.exports = new IdGenerator() +module.exports = new SpanId() diff --git a/lib/context/trace-context.js b/lib/context/trace-context.js index 4f353b12..7132435a 100644 --- a/lib/context/trace-context.js +++ b/lib/context/trace-context.js @@ -9,7 +9,7 @@ const Trace = require('./trace') const TransactionId = require('./transaction-id') const TraceId = require('./trace-id') -const IdGenerator = require('./id-generator') +const SpanId = require('./span-id') const log = require('../utils/logger') const sampler = require('../sampler/sampler') const DisableTrace = require('./disable-trace') @@ -65,7 +65,7 @@ class TraceContext { return new DisableTrace() } const transactionId = new TransactionId(this.agentInfo.agentId, this.agentInfo.agentStartTime.toString()) - const spanId = IdGenerator.stringValueOfNext() + const spanId = SpanId.newSpanId() const traceId = new TraceId(transactionId, spanId) return this.createTraceObject(traceId) } diff --git a/lib/instrumentation/http-shared.js b/lib/instrumentation/http-shared.js index 6d829e38..36245711 100644 --- a/lib/instrumentation/http-shared.js +++ b/lib/instrumentation/http-shared.js @@ -9,7 +9,7 @@ const endOfStream = require('end-of-stream') const url = require('url') const log = require('../utils/logger') -const IdGenerator = require('../context/id-generator') +const SpanId = require('../context/span-id') const annotationKey = require('../constant/annotation-key') const RequestHeaderUtils = require('../instrumentation/request-header-utils') const AntPathMatcher = require('../utils/ant-path-matcher') @@ -129,7 +129,7 @@ exports.traceOutgoingRequest = function (agent, moduleName) { asyncEventRecorder.recordAttribute(annotationKey.HTTP_URL, httpURL) const destinationId = host - const nextSpanId = IdGenerator.stringValueOfNext() + const nextSpanId = SpanId.newSpanId() RequestHeaderUtils.write(req, agent, nextSpanId, destinationId) asyncEventRecorder.recordNextSpanId(nextSpanId) asyncEventRecorder.recordDestinationId(destinationId) diff --git a/lib/instrumentation/request-header-utils.js b/lib/instrumentation/request-header-utils.js index b8d4836a..2305fe18 100644 --- a/lib/instrumentation/request-header-utils.js +++ b/lib/instrumentation/request-header-utils.js @@ -50,6 +50,7 @@ class RequestHeaderUtils { return } + // RequestTraceReader.newTraceId(TraceHeader traceHeader) in Java agent const transactionId = TransactionId.toTransactionId(this.getHeader(request, PinpointHeader.HTTP_TRACE_ID)) if (!transactionId) { return diff --git a/lib/metric/active-thread-count.js b/lib/metric/active-thread-count.js index 61f5e157..6b3c2883 100644 --- a/lib/metric/active-thread-count.js +++ b/lib/metric/active-thread-count.js @@ -8,7 +8,6 @@ const Scheduler = require('../utils/scheduler') const activeTrace = require('./active-trace') -const log = require('../utils/logger') class ActiveThreadCount { constructor(dataSender, streamChannelManager, enabled) { diff --git a/test/client/grpc-data-sender.test.js b/test/client/grpc-data-sender.test.js index 093bd534..cdde06f2 100644 --- a/test/client/grpc-data-sender.test.js +++ b/test/client/grpc-data-sender.test.js @@ -47,7 +47,7 @@ class DataSource extends DataSourceCallCountable { initializeProfilerClients() { } } -test.skip('Should send span ', function (t) { +test('Should send span ', function (t) { const expectedSpan = { 'traceId': { 'transactionId': { @@ -301,7 +301,7 @@ test.skip('sendSpanChunk redis.SET.end', function (t) { }) }) -test.skip('sendSpanChunk redis.GET.end', (t) => { +test('sendSpanChunk redis.GET.end', (t) => { let expectedSpanChunk = { 'agentId': 'express-node-sample-id', 'applicationName': 'express-node-sample-name', @@ -420,7 +420,7 @@ test.skip('sendSpanChunk redis.GET.end', (t) => { }) }) -test.skip('sendSpan', (t) => { +test('sendSpan', (t) => { let expectedSpanChunk = { 'traceId': { 'transactionId': { @@ -805,7 +805,6 @@ const emptyResponseService = (call, callback) => { } }) - const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt') const previousAttempts = call.metadata.get('grpc-previous-rpc-attempts') const callRequests = getCallRequests() @@ -859,8 +858,7 @@ test('sendSupportedServicesCommand and commandEcho', (t) => { t.equal(cmdEchoResponse.getMessage(), 'echo', 'echo message') afterOne(t) }).build() - dataSender.setCommandEchoCallArguments(callArguments) - dataSender.sendSupportedServicesCommand() + dataSender.sendSupportedServicesCommand(callArguments) }) t.teardown(() => { @@ -880,25 +878,30 @@ test('CommandStreamActiveThreadCount', (t) => { server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { dataSender = beforeSpecificOne(port, ProfilerDataSource) + let callCount = 0 dataCallbackOnServerCall = (data) => { + ++callCount const commonStreamResponse = data.getCommonstreamresponse() t.equal(commonStreamResponse.getResponseid(), requestId, 'response id matches request id') - t.equal(commonStreamResponse.getSequenceid(), 1, 'sequenceid is 1') + t.equal(commonStreamResponse.getSequenceid(), callCount, `sequenceid is ${callCount}`) t.equal(commonStreamResponse.getMessage().getValue(), '', 'message is empty') t.equal(data.getHistogramschematype(), 2, 'histogram schema type') t.equal(data.getActivethreadcountList()[0], 1, 'active thread count') - afterOne(t) + + if (callCount == 2) { + afterOne(t) + } } const callArguments = new CallArgumentsBuilder(function (error, response) { serverCallWriter(CommandType.activeThreadCount) + serverCallWriter(CommandType.activeThreadCount) }).build() - dataSender.setCommandEchoCallArguments(callArguments) - dataSender.sendSupportedServicesCommand() + dataSender.sendSupportedServicesCommand(callArguments) }) t.teardown(() => { dataSender.close() server.forceShutdown() }) -}) \ No newline at end of file +}) diff --git a/test/client/grpc-fixture.js b/test/client/grpc-fixture.js index da9066d4..62d1e407 100644 --- a/test/client/grpc-fixture.js +++ b/test/client/grpc-fixture.js @@ -102,9 +102,9 @@ class DataSourceCallCountable extends GrpcDataSender { super.sendSpan(span) } - sendSupportedServicesCommand() { + sendSupportedServicesCommand(callArguments) { increaseCallCount() - super.sendSupportedServicesCommand() + super.sendSupportedServicesCommand(callArguments) } } diff --git a/test/context/trace-id.test.js b/test/context/trace-id.test.js index 18e12296..d18abe5e 100644 --- a/test/context/trace-id.test.js +++ b/test/context/trace-id.test.js @@ -7,7 +7,7 @@ const test = require('tape') const TransactionId = require('../../lib/context/transaction-id') const TraceId = require('../../lib/context/trace-id') -const IdGenerator = require('../../lib/context/id-generator') +const SpanId = require('../../lib/context/span-id') test('Should create', function (t) { t.plan(2) @@ -15,7 +15,7 @@ test('Should create', function (t) { const agentId = 'agent-for-dev' const agentStartTime = Date.now() const transactionId = new TransactionId(agentId, agentStartTime.toString()) - const spanId = IdGenerator.stringValueOfNext() + const spanId = SpanId.newSpanId() const traceId = new TraceId(transactionId, spanId) t.ok(traceId) diff --git a/test/fixture.js b/test/fixture.js index 4809c967..28624f55 100644 --- a/test/fixture.js +++ b/test/fixture.js @@ -6,7 +6,7 @@ const TransactionId = require('../lib/context/transaction-id') const TraceId = require('../lib/context/trace-id') -const IdGenerator = require('../lib/context/id-generator') +const SpanId = require('../lib/context/span-id') const shimmer = require('@pinpoint-apm/shimmer') const testConfig= require('./pinpoint-config-test') require('../lib/config').clear() @@ -22,7 +22,7 @@ const getTransactionId = () => { } const getTraceId = (transactionId) => { - const spanId = IdGenerator.stringValueOfNext() + const spanId = SpanId.newSpanId() return new TraceId(transactionId || getTransactionId(), spanId) }