diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsumerSenderMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsumerSenderMetrics.java index 76bba6dde7..6aa1893d83 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsumerSenderMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsumerSenderMetrics.java @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.common.metric; import io.micrometer.core.instrument.MeterRegistry; +import pl.allegro.tech.hermes.metrics.HermesTimer; import java.util.function.ToDoubleFunction; @@ -66,4 +67,32 @@ public void registerHttp2SerialClientConnectionsGauge(T obj, ToDoubleFunctio public void registerHttp2SerialClientPendingConnectionsGauge(T obj, ToDoubleFunction f) { gaugeRegistrar.registerGauge(CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_PENDING_CONNECTIONS, obj, f); } + + public HermesTimer http1SerialClientRequestQueueWaitingTimer() { + return HermesTimer.from( + meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME), + hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME) + ); + } + + public HermesTimer http2SerialClientRequestQueueWaitingTimer() { + return HermesTimer.from( + meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME), + hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME) + ); + } + + public HermesTimer http1SerialClientRequestProcessingTimer() { + return HermesTimer.from( + meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME), + hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME) + ); + } + + public HermesTimer http2SerialClientRequestProcessingTimer() { + return HermesTimer.from( + meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME), + hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME) + ); + } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java index 17d3572570..8857eb7bb1 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java @@ -29,4 +29,9 @@ public class Timers { public static final String CONSUMER_IDLE_TIME = "idle-time." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; public static final String OAUTH_PROVIDER_TOKEN_REQUEST_LATENCY = "oauth.provider." + OAUTH_PROVIDER_NAME + ".token-request-latency"; + + public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME = "http-clients.serial.http1.request-queue-waiting-time"; + public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME = "http-clients.serial.http1.request-processing-time"; + public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME = "http-clients.serial.http2.request-queue-waiting-time"; + public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME = "http-clients.serial.http2.request-processing-time"; } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java index 1958e4bff0..9142f1bc42 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java @@ -197,8 +197,9 @@ public HttpHeadersProvidersFactory emptyHttpHeadersProvidersFactory() { @Bean public HttpClientsFactory httpClientsFactory(InstrumentedExecutorServiceFactory executorFactory, - SslContextFactoryProvider sslContextFactoryProvider) { - return new HttpClientsFactory(executorFactory, sslContextFactoryProvider); + SslContextFactoryProvider sslContextFactoryProvider, + MetricsFacade metricsFacade) { + return new HttpClientsFactory(executorFactory, sslContextFactoryProvider, metricsFacade.consumerSender()); } @Bean diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http1ClientProperties.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http1ClientProperties.java index 21f2119552..0a7e16a551 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http1ClientProperties.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http1ClientProperties.java @@ -20,6 +20,7 @@ public class Http1ClientProperties implements Http1ClientParameters { private Duration connectionTimeout = Duration.ofSeconds(15); + private boolean requestProcessingMonitoringEnabled = false; @Override public int getThreadPoolSize() { @@ -83,4 +84,13 @@ public Duration getConnectionTimeout() { public void setConnectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; } + + @Override + public boolean isRequestProcessingMonitoringEnabled() { + return requestProcessingMonitoringEnabled; + } + + public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) { + this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled; + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http2ClientProperties.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http2ClientProperties.java index b4ab115178..a70f833b4f 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http2ClientProperties.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/Http2ClientProperties.java @@ -20,6 +20,8 @@ public class Http2ClientProperties implements Http2ClientParameters { private Duration connectionTimeout = Duration.ofSeconds(15); + private boolean requestProcessingMonitoringEnabled = false; + public boolean isEnabled() { return enabled; } @@ -81,4 +83,13 @@ public Duration getConnectionTimeout() { public void setConnectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; } + + @Override + public boolean isRequestProcessingMonitoringEnabled() { + return this.requestProcessingMonitoringEnabled; + } + + public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) { + this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled; + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientParameters.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientParameters.java index 63da1e530f..663f9bfe9d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientParameters.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientParameters.java @@ -15,4 +15,6 @@ public interface HttpClientParameters { int getMaxRequestsQueuedPerDestination(); Duration getConnectionTimeout(); + + boolean isRequestProcessingMonitoringEnabled(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java index 8784afb9bc..9b9998891d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java @@ -6,6 +6,7 @@ import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.util.HttpCookieStore; +import pl.allegro.tech.hermes.common.metric.ConsumerSenderMetrics; import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory; import java.util.concurrent.ExecutorService; @@ -15,11 +16,17 @@ public class HttpClientsFactory { private final InstrumentedExecutorServiceFactory executorFactory; private final SslContextFactoryProvider sslContextFactoryProvider; + private final ConsumerSenderMetrics consumerSenderMetrics; + + public HttpClientsFactory( InstrumentedExecutorServiceFactory executorFactory, - SslContextFactoryProvider sslContextFactoryProvider) { + SslContextFactoryProvider sslContextFactoryProvider, + ConsumerSenderMetrics consumerSenderMetrics + ) { this.executorFactory = executorFactory; this.sslContextFactoryProvider = sslContextFactoryProvider; + this.consumerSenderMetrics = consumerSenderMetrics; } public HttpClient createClientForHttp1(String name, Http1ClientParameters http1ClientParameters) { @@ -40,6 +47,14 @@ public HttpClient createClientForHttp1(String name, Http1ClientParameters http1C client.setIdleTimeout(http1ClientParameters.getIdleTimeout().toMillis()); client.setFollowRedirects(http1ClientParameters.isFollowRedirectsEnabled()); client.setConnectTimeout(http1ClientParameters.getConnectionTimeout().toMillis()); + if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) { + client.getRequestListeners().add( + new JettyHttpClientMetrics( + consumerSenderMetrics.http1SerialClientRequestQueueWaitingTimer(), + consumerSenderMetrics.http1SerialClientRequestProcessingTimer() + ) + ); + } return client; } @@ -48,7 +63,7 @@ public HttpClient createClientForHttp2(String name, Http2ClientParameters http2C sslContextFactoryProvider.provideSslContextFactory() .ifPresentOrElse(clientConnector::setSslContextFactory, () -> { - throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory"); + throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory"); }); HTTP2Client http2Client = new HTTP2Client(clientConnector); @@ -66,6 +81,14 @@ public HttpClient createClientForHttp2(String name, Http2ClientParameters http2C client.setIdleTimeout(http2ClientParameters.getIdleTimeout().toMillis()); client.setFollowRedirects(http2ClientParameters.isFollowRedirectsEnabled()); client.setConnectTimeout(http2ClientParameters.getConnectionTimeout().toMillis()); + if (http2ClientParameters.isRequestProcessingMonitoringEnabled()) { + client.getRequestListeners().add( + new JettyHttpClientMetrics( + consumerSenderMetrics.http2SerialClientRequestQueueWaitingTimer(), + consumerSenderMetrics.http2SerialClientRequestProcessingTimer() + ) + ); + } return client; } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpClientMetrics.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpClientMetrics.java new file mode 100644 index 0000000000..a530576cc5 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpClientMetrics.java @@ -0,0 +1,29 @@ +package pl.allegro.tech.hermes.consumers.consumer.sender.http; + +import org.eclipse.jetty.client.api.Request; +import pl.allegro.tech.hermes.metrics.HermesTimer; + +public class JettyHttpClientMetrics implements Request.Listener { + + private final HermesTimer requestQueueWaitingTimer; + private final HermesTimer requestProcessingTimer; + + public JettyHttpClientMetrics(HermesTimer requestQueueWaitingTimer, HermesTimer requestProcessingTimer) { + this.requestQueueWaitingTimer = requestQueueWaitingTimer; + this.requestProcessingTimer = requestProcessingTimer; + } + + @Override + public void onQueued(Request request) { + var timer = requestQueueWaitingTimer.time(); + + request.onRequestBegin(onBeginRequest -> timer.close()); + } + + @Override + public void onBegin(Request request) { + var timer = requestProcessingTimer.time(); + + request.onComplete(result -> timer.close()); + } +} diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientConnectionMonitoringTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientConnectionMonitoringTest.groovy index e6ad62185c..3303f957d2 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientConnectionMonitoringTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientConnectionMonitoringTest.groovy @@ -46,7 +46,7 @@ class HttpClientConnectionMonitoringTest extends Specification { ConsumerSenderConfiguration consumerConfiguration = new ConsumerSenderConfiguration(); client = consumerConfiguration.http1SerialClient(new HttpClientsFactory( new InstrumentedExecutorServiceFactory(threadPoolMetrics), - sslContextFactoryProvider), new Http1ClientProperties() + sslContextFactoryProvider, metrics.consumerSender()), new Http1ClientProperties() ) batchClient = Mock(HttpClient) client.start() diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java index c473a4ffb0..c4869fc1f3 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSenderTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import pl.allegro.tech.hermes.api.EndpointAddress; import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory; import pl.allegro.tech.hermes.common.metric.executor.ThreadPoolMetrics; import pl.allegro.tech.hermes.consumers.config.ConsumerSenderConfiguration; @@ -40,6 +41,7 @@ import static pl.allegro.tech.hermes.consumers.test.MessageBuilder.TEST_MESSAGE_CONTENT; import static pl.allegro.tech.hermes.consumers.test.MessageBuilder.testMessage; +// TODO tests? public class JettyMessageSenderTest { private static final int ENDPOINT_PORT = Ports.nextAvailable(); @@ -55,6 +57,8 @@ public class JettyMessageSenderTest { private final HttpHeadersProvider headersProvider = new HermesHeadersProvider(Collections.singleton(new Http1HeadersProvider())); + private static final MetricsFacade metricsFacade = TestMetricsFacadeFactory.create(); + @BeforeClass public static void setupEnvironment() throws Exception { wireMockServer = new WireMockServer(ENDPOINT_PORT); @@ -67,7 +71,7 @@ public static void setupEnvironment() throws Exception { new InstrumentedExecutorServiceFactory( new ThreadPoolMetrics(TestMetricsFacadeFactory.create()) ), - sslContextFactoryProvider), + sslContextFactoryProvider, metricsFacade.consumerSender()), new Http1ClientProperties() ); client.start(); @@ -256,4 +260,4 @@ public void shouldUseSuppliedSocketTimeout() throws ExecutionException, Interrup // then assertThat(messageSendingResult.isTimeout()).isTrue(); } -} \ No newline at end of file +}