Skip to content

Commit

Permalink
[#6609] Implement data send side
Browse files Browse the repository at this point in the history
- Implement data send side
- Add proto
  • Loading branch information
koo-taejin committed Dec 1, 2020
1 parent af0a3f2 commit 9a6fa78
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 4 deletions.
21 changes: 21 additions & 0 deletions grpc/src/main/proto/Stat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message PStatMessage {
oneof field {
PAgentStat agentStat = 1;
PAgentStatBatch agentStatBatch = 2;
PAgentUriStat agentUriStat = 3;
}
}

Expand Down Expand Up @@ -168,4 +169,24 @@ message PTotalThread {
message PLoadedClass {
int64 loadedClassCount = 1;
int64 unloadedClassCount = 2;
}


message PAgentUriStat {
int64 timestamp = 1;
int32 interval = 2;
repeated PEachUriStat eachUriStat = 3;
}

message PEachUriStat {
string uri = 1;
PUriHistogram totalHistogram = 2;
PUriHistogram failedHistogram = 3;
}

message PUriHistogram {
int32 count = 1;
double avg = 2;
int64 max = 3;
repeated int32 histogram = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testAgentStatMonitor() throws InterruptedException {

// When
AgentStatMonitor monitor = new DefaultAgentStatMonitor(this.dataSender, "agentId", System.currentTimeMillis(),
agentStatCollector, null, mockProfilerConfig);
agentStatCollector, null, null, mockProfilerConfig);
monitor.start();
Thread.sleep(totalTestDurationMs);
monitor.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.navercorp.pinpoint.grpc.trace.PActiveTraceHistogram;
import com.navercorp.pinpoint.grpc.trace.PAgentStat;
import com.navercorp.pinpoint.grpc.trace.PAgentStatBatch;
import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
import com.navercorp.pinpoint.grpc.trace.PCpuLoad;
import com.navercorp.pinpoint.grpc.trace.PCustomMetricMessage;
import com.navercorp.pinpoint.grpc.trace.PDataSource;
Expand Down Expand Up @@ -54,8 +55,10 @@
import com.navercorp.pinpoint.profiler.monitor.metric.response.ResponseTimeValue;
import com.navercorp.pinpoint.profiler.monitor.metric.totalthread.TotalThreadMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.transaction.TransactionMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;

import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Message;

import java.util.List;

Expand All @@ -66,6 +69,8 @@ public class GrpcStatMessageConverter implements MessageConverter<GeneratedMessa
private GrpcThreadDumpMessageConverter threadDumpMessageConverter = new GrpcThreadDumpMessageConverter();
private GrpcJvmGcTypeMessageConverter jvmGcTypeConverter = new GrpcJvmGcTypeMessageConverter();
private final GrpcCustomMetricMessageConverter customMetricMessageConverter = new GrpcCustomMetricMessageConverter();
private final GrpcUriStatMessageConverter uriStatMessageConverter = new GrpcUriStatMessageConverter();


@Override
public GeneratedMessageV3 toMessage(Object message) {
Expand All @@ -86,6 +91,10 @@ public GeneratedMessageV3 toMessage(Object message) {
final AgentCustomMetricSnapshotBatch agentCustomMetricSnapshotBatch = (AgentCustomMetricSnapshotBatch) message;
final PCustomMetricMessage pCustomMetricMessage = customMetricMessageConverter.toMessage(agentCustomMetricSnapshotBatch);
return pCustomMetricMessage;
} else if (message instanceof AgentUriStatData) {
final AgentUriStatData agentUriStatData = (AgentUriStatData) message;
final PAgentUriStat agentUriStat = uriStatMessageConverter.toMessage(agentUriStatData);
return agentUriStat;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.context.grpc;

import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
import com.navercorp.pinpoint.grpc.trace.PEachUriStat;
import com.navercorp.pinpoint.grpc.trace.PUriHistogram;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.EachUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.UriStatHistogram;

import java.util.Collection;

/**
* @author Taejin Koo
*/
public class GrpcUriStatMessageConverter implements MessageConverter<PAgentUriStat> {

private final static PUriHistogram EMPTY_DETAILED_DATA_INSTANCE = PUriHistogram.getDefaultInstance();

@Override
public PAgentUriStat toMessage(Object message) {
if (!(message instanceof AgentUriStatData)) {
return null;
}

return createPAgentUriStat((AgentUriStatData) message);
}

private PAgentUriStat createPAgentUriStat(AgentUriStatData agentUriStatData) {
long baseTimestamp = agentUriStatData.getBaseTimestamp();
int interval = agentUriStatData.getInterval();

PAgentUriStat.Builder builder = PAgentUriStat.newBuilder();
builder.setTimestamp(baseTimestamp);
builder.setInterval(interval);

Collection<EachUriStatData> allUriStatData = agentUriStatData.getAllUriStatData();
for (EachUriStatData eachUriStatData : allUriStatData) {
PEachUriStat pEachUriStat = createPEachUriStat(eachUriStatData);
builder.addEachUriStat(pEachUriStat);
}

return builder.build();
}

private PEachUriStat createPEachUriStat(EachUriStatData eachUriStatData) {
String uri = eachUriStatData.getUri();

PEachUriStat.Builder builder = PEachUriStat.newBuilder();
builder.setUri(uri);

UriStatHistogram totalHistogram = eachUriStatData.getTotalHistogram();
PUriHistogram totalPUriHistogram = createPUriHistogram(totalHistogram);
builder.setTotalHistogram(totalPUriHistogram);

UriStatHistogram failedHistogram = eachUriStatData.getFailedHistogram();
PUriHistogram failedPUriHistogram = createPUriHistogram(failedHistogram);
builder.setFailedHistogram(failedPUriHistogram);

return builder.build();
}


private PUriHistogram createPUriHistogram(UriStatHistogram uriStatHistogram) {
int count = uriStatHistogram.getCount();
if (uriStatHistogram.getCount() == 0) {
return EMPTY_DETAILED_DATA_INSTANCE;
}

PUriHistogram.Builder builder = PUriHistogram.newBuilder();

long total = uriStatHistogram.getTotal();
long max = uriStatHistogram.getMax();

builder.setAvg(total / count);
builder.setMax(max);

int[] timestampHistograms = uriStatHistogram.getTimestampHistogram();
for (int eachTimestampHistogram : timestampHistograms) {
builder.addHistogram(eachTimestampHistogram);
}

return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.navercorp.pinpoint.profiler.sender.AsyncQueueingExecutor;
import com.navercorp.pinpoint.profiler.sender.AsyncQueueingExecutorListener;

import io.netty.util.internal.shaded.org.jctools.queues.atomic.SpscLinkedAtomicQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Queue;

/**
* @author Taejin Koo
Expand All @@ -53,13 +55,26 @@ public void store(String uri, boolean status, long elapsedTime) {
execute(uriStatInfo);
}

@Override
public AgentUriStatData poll() {
Queue<AgentUriStatData> completedDataQueue = executorListener.getCompletedDataQueue();
return completedDataQueue.poll();
}

@Override
public void close() {
stop();
}

@Override
protected void pollTimeout(long timeout) {
executorListener.executePollTimeout();
}

private static class ExecutorListener implements AsyncQueueingExecutorListener<UriStatInfo> {

private final SpscLinkedAtomicQueue<AgentUriStatData> completedAgentUriStatDataQueue = new SpscLinkedAtomicQueue<>();

private AgentUriStatData agentUriStatData;
private int collectInterval;

Expand All @@ -73,7 +88,7 @@ public void execute(Collection<UriStatInfo> messageList) {
final long currentBaseTimestamp = getBaseTimestamp();
checkAndFlushOldData(currentBaseTimestamp);

AgentUriStatData agentUriStatData = getUriStatBatchBuilder(currentBaseTimestamp);
AgentUriStatData agentUriStatData = getAgentUriStatData(currentBaseTimestamp);

Object[] dataList = messageList.toArray();
for (int i = 0; i < CollectionUtils.nullSafeSize(messageList); i++) {
Expand All @@ -90,7 +105,7 @@ public void execute(UriStatInfo message) {
long currentBaseTimestamp = getBaseTimestamp();
checkAndFlushOldData(currentBaseTimestamp);

AgentUriStatData agentUriStatData = getUriStatBatchBuilder(currentBaseTimestamp);
AgentUriStatData agentUriStatData = getAgentUriStatData(currentBaseTimestamp);
agentUriStatData.add(message);
}

Expand All @@ -111,20 +126,28 @@ private boolean checkAndFlushOldData(long currentBaseTimestamp) {
}

if (currentBaseTimestamp > agentUriStatData.getBaseTimestamp()) {
if (completedAgentUriStatDataQueue.size() > 10) {
completedAgentUriStatDataQueue.remove();
}

completedAgentUriStatDataQueue.offer(agentUriStatData);
// TODO FLUSH
agentUriStatData = null;
return true;
}
return false;
}

private AgentUriStatData getUriStatBatchBuilder(long currentBaseTimestamp) {
private AgentUriStatData getAgentUriStatData(long currentBaseTimestamp) {
if (agentUriStatData == null) {
agentUriStatData = new AgentUriStatData(currentBaseTimestamp, collectInterval);
}
return agentUriStatData;
}

public Queue<AgentUriStatData> getCompletedDataQueue() {
return completedAgentUriStatDataQueue;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.context.storage;

import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.UriStatInfo;

/**
Expand All @@ -28,4 +29,13 @@ public void store(String uri, boolean status, long elapsedTime) {
// Do nothing
}

@Override
public AgentUriStatData poll() {
return null;
}

@Override
public void close() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.context.storage;

import com.navercorp.pinpoint.profiler.monitor.metric.uri.AgentUriStatData;
import com.navercorp.pinpoint.profiler.monitor.metric.uri.UriStatInfo;

/**
Expand All @@ -25,4 +26,8 @@ public interface UriStatStorage {

void store(String uri, boolean status, long elapsedTime);

AgentUriStatData poll();

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.navercorp.pinpoint.profiler.context.module.AgentStartTime;
import com.navercorp.pinpoint.profiler.context.module.StatDataSender;
import com.navercorp.pinpoint.profiler.context.monitor.metric.CustomMetricRegistryService;
import com.navercorp.pinpoint.profiler.context.storage.UriStatStorage;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentCustomMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentStatMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.AgentStatMetricSnapshot;
Expand Down Expand Up @@ -65,6 +66,7 @@ public DefaultAgentStatMonitor(@StatDataSender DataSender dataSender,
@AgentId String agentId, @AgentStartTime long agentStartTimestamp,
@Named("AgentStatCollector") AgentStatMetricCollector<AgentStatMetricSnapshot> agentStatCollector,
CustomMetricRegistryService customMetricRegistryService,
UriStatStorage uriStatStorage,
ProfilerConfig profilerConfig) {
if (dataSender == null) {
throw new NullPointerException("dataSender");
Expand Down Expand Up @@ -100,6 +102,11 @@ public DefaultAgentStatMonitor(@StatDataSender DataSender dataSender,
runnableList.add(customMetricCollectionJob);
}

if (profilerConfig.isUriStatEnable() && uriStatStorage != null) {
Runnable uriStatCollectingJob = new UriStatCollectingJob(dataSender, uriStatStorage);
runnableList.add(uriStatCollectingJob);
}

this.statMonitorJob = new StatMonitorJob(runnableList);

preLoadClass(agentId, agentStartTimestamp, agentStatCollector);
Expand Down Expand Up @@ -128,6 +135,9 @@ public void start() {

@Override
public void stop() {

statMonitorJob.close();

executor.shutdown();
try {
executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.List;

/**
Expand Down Expand Up @@ -49,4 +50,15 @@ public void run() {
}
}

public void close() {
for (Runnable runnable : runnableList) {
if (runnable instanceof Closeable) {
try {
((Closeable) runnable).close();
} catch (Exception e) {
}
}
}
}

}
Loading

0 comments on commit 9a6fa78

Please sign in to comment.