From 6be2a63b7b095763e68042c42ab7b1954fccc9e7 Mon Sep 17 00:00:00 2001 From: dinstone Date: Tue, 16 Jan 2024 22:06:48 +0800 Subject: [PATCH] bugfix for TelemetryInterceptor --- .../server/invoke/LocalInvokeHandler.java | 4 + focus-telemetry/pom.xml | 112 +++++++++--------- .../focus/telemetry/TelemetryInterceptor.java | 34 ++++-- .../src/test/java/focus/TelemetryTest.java | 27 ++++- .../focus-transport-photon/pom.xml | 2 +- .../photon/PhotonMessageProcessor.java | 15 +-- 6 files changed, 118 insertions(+), 76 deletions(-) diff --git a/focus-server/focus-server-core/src/main/java/com/dinstone/focus/server/invoke/LocalInvokeHandler.java b/focus-server/focus-server-core/src/main/java/com/dinstone/focus/server/invoke/LocalInvokeHandler.java index 53ae197..31db549 100644 --- a/focus-server/focus-server-core/src/main/java/com/dinstone/focus/server/invoke/LocalInvokeHandler.java +++ b/focus-server/focus-server-core/src/main/java/com/dinstone/focus/server/invoke/LocalInvokeHandler.java @@ -20,11 +20,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.dinstone.focus.config.MethodConfig; import com.dinstone.focus.config.ServiceConfig; import com.dinstone.focus.exception.BusinessException; import com.dinstone.focus.exception.ErrorCode; +import com.dinstone.focus.exception.InvokeException; import com.dinstone.focus.exception.ServiceException; import com.dinstone.focus.invoke.Handler; import com.dinstone.focus.protocol.Call; @@ -68,6 +70,8 @@ public CompletableFuture handle(Call call) throws Exception { throw new ServiceException(ErrorCode.PARAM_ERROR, e); } catch (IllegalAccessException e) { throw new ServiceException(ErrorCode.ACCESS_ERROR, e); + } catch (TimeoutException e) { + throw new InvokeException(ErrorCode.TIMEOUT_ERROR, e); } } diff --git a/focus-telemetry/pom.xml b/focus-telemetry/pom.xml index 9c2cd69..aec8249 100644 --- a/focus-telemetry/pom.xml +++ b/focus-telemetry/pom.xml @@ -1,55 +1,61 @@ - 4.0.0 - - com.dinstone.focus - focus-parent - 1.2.0 - - focus-telemetry - - - - io.opentelemetry - opentelemetry-bom - 1.30.0 - pom - import - - - - - - com.dinstone.focus - focus-core - ${project.version} - - - io.opentelemetry - opentelemetry-api - - - io.opentelemetry - opentelemetry-sdk - - - io.opentelemetry - opentelemetry-exporter-logging - test - - - io.opentelemetry - opentelemetry-exporter-zipkin - - - - - - net.revelc.code.formatter - formatter-maven-plugin - - ../guide/formatter-java.xml - - - - + 4.0.0 + + com.dinstone.focus + focus-parent + 1.2.0 + + focus-telemetry + + + + io.opentelemetry + opentelemetry-bom + 1.33.0 + pom + import + + + + + + com.dinstone.focus + focus-core + ${project.version} + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-logging + test + + + io.opentelemetry + opentelemetry-exporter-zipkin + test + + + io.opentelemetry + opentelemetry-exporter-jaeger + test + + + + + + net.revelc.code.formatter + formatter-maven-plugin + + ../guide/formatter-java.xml + + + + \ No newline at end of file diff --git a/focus-telemetry/src/main/java/com/dinstone/focus/telemetry/TelemetryInterceptor.java b/focus-telemetry/src/main/java/com/dinstone/focus/telemetry/TelemetryInterceptor.java index 86ac2bc..7184e71 100644 --- a/focus-telemetry/src/main/java/com/dinstone/focus/telemetry/TelemetryInterceptor.java +++ b/focus-telemetry/src/main/java/com/dinstone/focus/telemetry/TelemetryInterceptor.java @@ -17,6 +17,7 @@ import java.util.concurrent.CompletableFuture; +import com.dinstone.focus.exception.InvokeException; import com.dinstone.focus.invoke.Handler; import com.dinstone.focus.invoke.Interceptor; import com.dinstone.focus.protocol.Call; @@ -75,15 +76,18 @@ public void set(Call carrier, String key, String value) { public CompletableFuture intercept(Call call, Handler chain) throws Exception { Tracer tracer = telemetry.getTracer(call.getService()); if (kind == Kind.SERVER) { - // Extract the SpanContext and other elements from the request. - Context ec = telemetry.getPropagators().getTextMapPropagator().extract(Context.current(), call, getter); - try (Scope ss = ec.makeCurrent()) { - return chain.handle(call); + Span span = getServerSpan(call, tracer); + try (Scope ignored = span.makeCurrent()) { + return chain.handle(call).whenComplete((reply, error) -> { + finishSpan(reply, error, span); + }); + } catch (Throwable error) { + finishSpan(null, error, span); + throw error; } } else { - Span span = tracer.spanBuilder(call.getMethod()).setSpanKind(SpanKind.CLIENT).startSpan(); - try (Scope ss = span.makeCurrent()) { - span.setAttribute(RPC_SERVICE, call.getService()).setAttribute(RPC_METHOD, call.getMethod()); + Span span = getClientSpan(call, tracer); + try (Scope ignored = span.makeCurrent()) { // Inject the request with the *current* Context, which contains our current // Span. telemetry.getPropagators().getTextMapPropagator().inject(Context.current(), call, setter); @@ -98,10 +102,26 @@ public CompletableFuture intercept(Call call, Handler chain) throws Excep } + private Span getClientSpan(Call call, Tracer tracer) { + Span span = tracer.spanBuilder(call.getEndpoint()).setSpanKind(SpanKind.CLIENT).startSpan(); + return span.setAttribute(RPC_SERVICE, call.getService()).setAttribute(RPC_METHOD, call.getMethod()); + } + + private Span getServerSpan(Call call, Tracer tracer) { + // Extract the SpanContext and other elements from the request. + Context pc = telemetry.getPropagators().getTextMapPropagator().extract(Context.current(), call, getter); + Span span = tracer.spanBuilder(call.getEndpoint()).setSpanKind(SpanKind.SERVER).setParent(pc).startSpan(); + return span.setAttribute(RPC_SERVICE, call.getService()).setAttribute(RPC_METHOD, call.getMethod()); + } + private void finishSpan(Reply reply, Throwable error, Span span) { if (error != null) { span.setStatus(StatusCode.ERROR, error.getMessage()); span.recordException(error); + } else if (reply != null && reply.isError()) { + final InvokeException data = (InvokeException) reply.getData(); + span.setStatus(StatusCode.ERROR, data.getMessage()); + span.recordException(data); } span.end(); } diff --git a/focus-telemetry/src/test/java/focus/TelemetryTest.java b/focus-telemetry/src/test/java/focus/TelemetryTest.java index 4b1672f..94efc8b 100644 --- a/focus-telemetry/src/test/java/focus/TelemetryTest.java +++ b/focus-telemetry/src/test/java/focus/TelemetryTest.java @@ -15,16 +15,24 @@ */ package focus; +import java.io.IOException; +import java.time.Duration; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; import io.opentelemetry.exporter.logging.LoggingSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; @@ -33,10 +41,16 @@ public class TelemetryTest { private static Tracer tracer; - public static void main(String[] args) { + public static void main(String[] args) throws IOException { OpenTelemetry openTelemetry = getOpenTelemetry(); - tracer = openTelemetry.getTracer("telemetry-test-service", "1.0.0"); + String serviceName = "telemetry-test-service"; + Meter meter = openTelemetry.getMeter(serviceName); + tracer = openTelemetry.getTracer(serviceName, "1.0.0"); + + LongHistogram h = meter.histogramBuilder("main").ofLongs().build(); + + h.record(2000); Span span = tracer.spanBuilder("main").startSpan(); // Make the span the current span @@ -49,6 +63,8 @@ public static void main(String[] args) { span.end(); } + System.in.read(); + System.out.println("over"); } @@ -81,10 +97,13 @@ private static OpenTelemetry getOpenTelemetry() { .addSpanProcessor(BatchSpanProcessor.builder(LoggingSpanExporter.create()).build()) .setResource(resource).build(); - OpenTelemetry openTelemetry = OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider) + SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().registerMetricReader(PeriodicMetricReader + .builder(LoggingMetricExporter.create()).setInterval(Duration.ofMillis(3000000)).build()) + .setResource(resource).build(); + + return OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).setMeterProvider(sdkMeterProvider) .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) .buildAndRegisterGlobal(); - return openTelemetry; } } diff --git a/focus-transport/focus-transport-photon/pom.xml b/focus-transport/focus-transport-photon/pom.xml index a7be569..eceda99 100644 --- a/focus-transport/focus-transport-photon/pom.xml +++ b/focus-transport/focus-transport-photon/pom.xml @@ -15,7 +15,7 @@ com.dinstone.photon photon - 1.1.4 + 1.1.5 diff --git a/focus-transport/focus-transport-photon/src/main/java/com/dinstone/focus/transport/photon/PhotonMessageProcessor.java b/focus-transport/focus-transport-photon/src/main/java/com/dinstone/focus/transport/photon/PhotonMessageProcessor.java index a994715..8211f6c 100644 --- a/focus-transport/focus-transport-photon/src/main/java/com/dinstone/focus/transport/photon/PhotonMessageProcessor.java +++ b/focus-transport/focus-transport-photon/src/main/java/com/dinstone/focus/transport/photon/PhotonMessageProcessor.java @@ -15,7 +15,6 @@ */ package com.dinstone.focus.transport.photon; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.concurrent.Executor; import java.util.function.Function; @@ -36,7 +35,6 @@ import com.dinstone.photon.message.Request; import com.dinstone.photon.message.Response; import com.dinstone.photon.message.Response.Status; -import com.dinstone.photon.utils.ByteStreamUtil; import io.netty.util.CharsetUtil; @@ -125,15 +123,10 @@ private Response encode(Reply reply, ServiceConfig serviceConfig, MethodConfig m Response response = new Response(); if (reply.isError()) { response.setStatus(Status.FAILURE); - try { - InvokeException exception = (InvokeException) reply.getData(); - ByteArrayOutputStream bao = new ByteArrayOutputStream(); - ByteStreamUtil.writeInt(bao, exception.getCode().value()); - ByteStreamUtil.writeString(bao, exception.getMessage()); - response.setContent(bao.toByteArray()); - } catch (IOException e) { - throw new ServiceException(ErrorCode.CODEC_ERROR, - "serialize encode error: " + methodConfig.getMethodName(), e); + InvokeException exception = (InvokeException) reply.getData(); + response.headers().setInt(InvokeException.CODE_KEY, exception.getCode().value()); + if (exception.getMessage() != null) { + response.setContent(exception.getMessage().getBytes(CharsetUtil.UTF_8)); } } else { response.setStatus(Status.SUCCESS);