From 033b82ab134c4eb069f7897f0631c8456dcfa42f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=C9=B4=E1=B4=9B=E1=B4=8F=C9=B4?= Date: Fri, 14 Jul 2023 23:07:28 +0300 Subject: [PATCH] Logging improvements (#141) --- .../kora/application/graph/GraphImpl.java | 28 ++++---- .../application/graph/KoraApplication.java | 8 +-- .../kora/cache/caffeine/CaffeineCache.java | 12 ++-- .../tinkoff/kora/cache/redis/RedisCache.java | 42 ++++++----- .../redis/client/DefaultLettuceCommander.java | 8 +-- .../database/cassandra/CassandraDatabase.java | 1 + .../common/telemetry/DataBaseLogger.java | 2 +- .../telemetry/DataBaseLoggerFactory.java | 1 + .../telemetry/DataBaseTelemetryFactory.java | 2 +- .../telemetry/DefaultDataBaseLogger.java | 50 +++++++------ .../telemetry/DefaultDataBaseTelemetry.java | 18 +++-- .../DefaultDataBaseTelemetryFactory.java | 2 +- .../flyway/FlywayJdbcDatabaseInterceptor.java | 4 +- .../kora/database/jdbc/JdbcDatabase.java | 23 ++++-- .../kora/database/r2dbc/R2dbcDatabase.java | 5 +- .../kora/database/vertx/VertxDatabase.java | 2 +- .../java/ru/tinkoff/kora/grpc/GrpcServer.java | 9 ++- .../telemetry/DefaultGrpcServerTelemetry.java | 4 +- .../grpc/telemetry/Slf4jGrpcServerLogger.java | 63 ++++++++-------- .../telemetry/DefaultHttpClientTelemetry.java | 22 +++--- .../telemetry/Sl4fjHttpClientLogger.java | 66 +++++++++-------- .../tinkoff/kora/http/common/HttpHeaders.java | 5 +- .../http/server/common/HttpServerModule.java | 2 +- .../common/router/PublicApiHandler.java | 15 ++-- .../telemetry/DefaultHttpServerTelemetry.java | 21 +++--- .../common/telemetry/HttpServerLogger.java | 64 +++-------------- .../common/telemetry/HttpServerTelemetry.java | 4 +- .../common/telemetry/HttpServerTracer.java | 2 +- .../telemetry/Slf4jHttpServerLogger.java | 72 +++++++++++++++++++ .../http/server/common/HttpServerTestKit.java | 8 +-- .../server/undertow/UndertowHttpServer.java | 10 +-- .../undertow/UndertowPrivateHttpServer.java | 8 +-- .../KafkaAssignConsumerContainer.java | 62 +++++++++++++--- .../containers/KafkaProducerContainer.java | 1 + .../KafkaSubscribeConsumerContainer.java | 53 ++++++++++---- .../telemetry/DefaultKafkaProducerLogger.java | 40 ++++++++--- .../kora/scheduling/jdk/AbstractJob.java | 12 ++-- .../quartz/KoraQuartzJobRegistrar.java | 4 +- .../validation/common/SimpleViolation.java | 5 ++ .../validation/common/ValidationContext.java | 1 + 40 files changed, 475 insertions(+), 286 deletions(-) create mode 100644 http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/Slf4jHttpServerLogger.java diff --git a/application-graph/src/main/java/ru/tinkoff/kora/application/graph/GraphImpl.java b/application-graph/src/main/java/ru/tinkoff/kora/application/graph/GraphImpl.java index f89ef6c11..cf4f05631 100644 --- a/application-graph/src/main/java/ru/tinkoff/kora/application/graph/GraphImpl.java +++ b/application-graph/src/main/java/ru/tinkoff/kora/application/graph/GraphImpl.java @@ -95,7 +95,7 @@ public Mono refresh(Node fromNode) { } case ON_COMPLETE -> { this.semaphore.release(); - log.debug("Refreshing Graph completed took {}", Duration.ofNanos(System.nanoTime() - started)); + log.debug("Refreshing Graph completed in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } default -> {} } @@ -110,22 +110,22 @@ public Mono init() { root.set(0, this.objects.length()); this.semaphore.acquireUninterruptibly(); - log.debug("Initializing Graph..."); + log.debug("Graph Initializing..."); final long started = System.nanoTime(); return this.initializeSubgraph(root) .doOnEach(s -> { switch (s.getType()) { case CANCEL -> { this.semaphore.release(); - log.debug("Initializing Graph cancelled"); + log.debug("Graph Initializing cancelled"); } case ON_ERROR -> { this.semaphore.release(); - log.debug("Initializing Graph error", s.getThrowable()); + log.debug("Graph Initializing error", s.getThrowable()); } case ON_COMPLETE -> { this.semaphore.release(); - log.debug("Initializing Graph completed took {}", Duration.ofNanos(System.nanoTime() - started)); + log.debug("Graph Initializing completed in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } default -> {} } @@ -139,22 +139,22 @@ public Mono release() { var root = new BitSet(this.objects.length()); root.set(0, this.objects.length()); this.semaphore.acquireUninterruptibly(); - log.debug("Releasing Graph..."); + log.debug("Graph Releasing..."); final long started = System.nanoTime(); return this.releaseNodes(this.objects, root) .doOnEach(s -> { switch (s.getType()) { case CANCEL -> { this.semaphore.release(); - log.debug("Releasing graph cancelled"); + log.debug("Graph Releasing cancelled"); } case ON_ERROR -> { this.semaphore.release(); - log.debug("Releasing graph error", s.getThrowable()); + log.debug("Graph Releasing error", s.getThrowable()); } case ON_COMPLETE -> { this.semaphore.release(); - log.debug("Releasing graph completed took {}", Duration.ofNanos(System.nanoTime() - started)); + log.debug("Graph Releasing completed in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } default -> {} } @@ -175,7 +175,7 @@ private Mono initializeSubgraph(BitSet root) { for (var newPromise : tmpGraph.newPromises) { newPromise.graph = GraphImpl.this; } - log.trace("Graph refreshed, calling interceptors"); + log.trace("Graph refreshed, calling interceptors..."); for (var refreshListenerNode : this.refreshListenerNodes) { if (this.objects.get(refreshListenerNode) instanceof RefreshListener refreshListener) { try { @@ -386,14 +386,14 @@ private Mono initializeNode(Node node, Lifecycle lifecycle) { return lifecycle.init().then() .doOnEach(s -> { switch (s.getType()) { - case CANCEL -> this.rootGraph.log.trace("Initializing node {} of class {} cancelled", index, lifecycle.getClass()); - case ON_SUBSCRIBE -> this.rootGraph.log.trace("Initializing node {} of class {}", index, lifecycle.getClass()); - case ON_ERROR -> this.rootGraph.log.trace("Initializing node {} of class {} error", index, lifecycle.getClass(), s.getThrowable()); + case CANCEL -> this.rootGraph.log.trace("Node Initializing {} of class {} cancelled", index, lifecycle.getClass()); + case ON_SUBSCRIBE -> this.rootGraph.log.trace("Node Initializing {} of class {}", index, lifecycle.getClass()); + case ON_ERROR -> this.rootGraph.log.trace("Node Initializing {} of class {} error", index, lifecycle.getClass(), s.getThrowable()); case ON_COMPLETE -> { synchronized (TmpGraph.this) { this.initialized.set(node.index); } - this.rootGraph.log.trace("Initializing node {} of class {} complete", index, lifecycle.getClass()); + this.rootGraph.log.trace("Node Initializing {} of class {} complete", index, lifecycle.getClass()); } default -> {} } diff --git a/application-graph/src/main/java/ru/tinkoff/kora/application/graph/KoraApplication.java b/application-graph/src/main/java/ru/tinkoff/kora/application/graph/KoraApplication.java index 8f685a00e..7910ff46d 100644 --- a/application-graph/src/main/java/ru/tinkoff/kora/application/graph/KoraApplication.java +++ b/application-graph/src/main/java/ru/tinkoff/kora/application/graph/KoraApplication.java @@ -11,22 +11,22 @@ public static RefreshableGraph run(Supplier supplier) { var start = System.currentTimeMillis(); var graphDraw = supplier.get(); var log = LoggerFactory.getLogger(graphDraw.getRoot()); - log.debug("Initializing Application..."); + log.debug("Application initializing..."); try { var graph = graphDraw.init().block(); var end = System.currentTimeMillis(); try { var uptime = ManagementFactory.getRuntimeMXBean().getUptime() / 1000.0; - log.info("Initialized Application in {} ms (JVM running for {} s)", end - start, uptime); + log.info("Application initialized in {} ms (JVM running for {} s)", end - start, uptime); } catch (Throwable ex) { - log.info("Initialized Application in {}ms", end - start); + log.info("Application initialized in {}ms", end - start); } var thread = new Thread(() -> graph.release().block()); thread.setName("kora-shutdown"); Runtime.getRuntime().addShutdownHook(thread); return graph; } catch (Exception e) { - log.error("Initializing Application failed with error", e); + log.error("Application initializing failed with error", e); e.printStackTrace(); try { Thread.sleep(100);// so async logger is able to write exception to log diff --git a/cache/cache-caffeine/src/main/java/ru/tinkoff/kora/cache/caffeine/CaffeineCache.java b/cache/cache-caffeine/src/main/java/ru/tinkoff/kora/cache/caffeine/CaffeineCache.java index 0b8cb8522..d7992bd22 100644 --- a/cache/cache-caffeine/src/main/java/ru/tinkoff/kora/cache/caffeine/CaffeineCache.java +++ b/cache/cache-caffeine/src/main/java/ru/tinkoff/kora/cache/caffeine/CaffeineCache.java @@ -29,15 +29,15 @@ String origin() { @Override public V get(@Nonnull K key) { - logger.trace("Looking for value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' looking for value for key: {}", name, key); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.GET, name, origin()); telemetryContext.startRecording(); final V v = caffeine.getIfPresent(key); if (v == null) { - logger.trace("Value NOT found in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' no value found for key: {}", name, key); } else { - logger.debug("Value found in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' found value for key: {}", name, key); } telemetryContext.recordSuccess(v); @@ -46,7 +46,7 @@ public V get(@Nonnull K key) { @Nonnull public V put(@Nonnull K key, @Nonnull V value) { - logger.trace("Putting value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' storing for key: {}", name, key); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.PUT, name, origin()); telemetryContext.startRecording(); @@ -58,7 +58,7 @@ public V put(@Nonnull K key, @Nonnull V value) { @Override public void invalidate(@Nonnull K key) { - logger.trace("Invalidating value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' invalidating for key: {}", name, key); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE, name, origin()); telemetryContext.startRecording(); @@ -69,7 +69,7 @@ public void invalidate(@Nonnull K key) { @Override public void invalidateAll() { - logger.trace("Invalidating all values in cache '{}'", name); + logger.trace("Cache '{}' invalidating all values", name); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE_ALL, name, origin()); telemetryContext.startRecording(); diff --git a/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/RedisCache.java b/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/RedisCache.java index 2c212dbaa..47a60d88f 100644 --- a/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/RedisCache.java +++ b/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/RedisCache.java @@ -53,7 +53,7 @@ String origin() { @Override public V get(@Nonnull K key) { - logger.trace("Looking for value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' looking for value for key: {}", name, key); final byte[] keyAsBytes = keyMapper.apply(key); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.GET, name, origin()); @@ -64,9 +64,9 @@ public V get(@Nonnull K key) { : syncClient.getExpire(keyAsBytes, expireAfterAccessMillis); if (jsonAsBytes != null) { - logger.trace("Value NOT found in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' no value found for key: {}", name, key); } else { - logger.debug("Value found in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' found value for key: {}", name, key); } final V v = valueMapper.read(jsonAsBytes); @@ -82,7 +82,7 @@ public V get(@Nonnull K key) { @Nonnull @Override public V put(@Nonnull K key, @Nonnull V value) { - logger.trace("Putting value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' storing for key: {}", name, key); final byte[] keyAsBytes = keyMapper.apply(key); final byte[] valueAsBytes = valueMapper.write(value); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.PUT, name, origin()); @@ -95,7 +95,7 @@ public V put(@Nonnull K key, @Nonnull V value) { syncClient.setExpire(keyAsBytes, valueAsBytes, expireAfterWriteMillis); } telemetryContext.recordSuccess(); - logger.trace("Putted value in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' stored for key: {}", name, key); return value; } catch (Exception e) { telemetryContext.recordFailure(e); @@ -106,7 +106,7 @@ public V put(@Nonnull K key, @Nonnull V value) { @Override public void invalidate(@Nonnull K key) { - logger.trace("Invalidating value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' invalidating for key: {}", name, key); final byte[] keyAsBytes = keyMapper.apply(key); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE, name, origin()); @@ -114,7 +114,7 @@ public void invalidate(@Nonnull K key) { telemetryContext.startRecording(); syncClient.del(keyAsBytes); telemetryContext.recordSuccess(); - logger.trace("Invalidated value in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' invalidated for key: {}", name, key); } catch (Exception e) { telemetryContext.recordFailure(e); logger.warn(e.getMessage(), e); @@ -123,14 +123,14 @@ public void invalidate(@Nonnull K key) { @Override public void invalidateAll() { - logger.trace("Invalidating all values in cache '{}'", name); + logger.trace("Cache '{}' invalidating all", name); final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE_ALL, name, origin()); try { telemetryContext.startRecording(); syncClient.flushAll(); telemetryContext.recordSuccess(); - logger.trace("Invalidated all values in cache '{}'", name); + logger.debug("Cache '{}' invalidated all", name); } catch (Exception e) { telemetryContext.recordFailure(e); logger.warn(e.getMessage(), e); @@ -143,7 +143,7 @@ public Mono getAsync(@Nonnull K key) { final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.GET, name, origin()); return Mono.fromCallable(() -> keyMapper.apply(key)) .flatMap(keyAsBytes -> { - logger.trace("Looking for value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' looking for value for key: {}", name, key); telemetryContext.startRecording(); return (expireAfterAccessMillis == null) ? reactiveClient.get(keyAsBytes) @@ -151,9 +151,9 @@ public Mono getAsync(@Nonnull K key) { }) .map(jsonAsBytes -> { if (jsonAsBytes != null) { - logger.trace("Value NOT found in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' no value found for key: {}", name, key); } else { - logger.debug("Value found in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' found value for key: {}", name, key); } final V v = valueMapper.read(jsonAsBytes); telemetryContext.recordSuccess(jsonAsBytes); @@ -172,7 +172,7 @@ public Mono putAsync(@Nonnull K key, @Nonnull V value) { final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.PUT, name, origin()); return Mono.fromCallable(() -> keyMapper.apply(key)) .flatMap(keyAsBytes -> { - logger.trace("Putting value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' storing for key: {}", name, key); final byte[] valueAsBytes = valueMapper.write(value); telemetryContext.startRecording(); return (expireAfterWriteMillis == null) @@ -182,7 +182,7 @@ public Mono putAsync(@Nonnull K key, @Nonnull V value) { .map(r -> value) .switchIfEmpty(Mono.fromCallable(() -> { telemetryContext.recordSuccess(); - logger.trace("Putted value in cache '{}' for key: {}", name, key); + logger.debug("Cache '{}' stored for key: {}", name, key); return value; })) .onErrorResume(e -> { @@ -197,11 +197,14 @@ public Mono putAsync(@Nonnull K key, @Nonnull V value) { public Mono invalidateAsync(@Nonnull K key) { final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE, name, origin()); return Mono.fromCallable(() -> { - logger.trace("Invalidating value in cache '{}' for key: {}", name, key); + logger.trace("Cache '{}' invalidating for key: {}", name, key); return keyMapper.apply(key); }) .flatMap(reactiveClient::del) - .doOnSuccess(r -> telemetryContext.recordSuccess()) + .doOnSuccess(r -> { + telemetryContext.recordSuccess(); + logger.debug("Cache '{}' invalidated for key: {}", name, key); + }) .onErrorResume(e -> { telemetryContext.recordFailure(e); logger.warn(e.getMessage(), e); @@ -213,9 +216,12 @@ public Mono invalidateAsync(@Nonnull K key) { @Override public Mono invalidateAllAsync() { final CacheTelemetry.TelemetryContext telemetryContext = telemetry.create(CacheTelemetry.Operation.Type.INVALIDATE_ALL, name, origin()); - logger.trace("Invalidating all values in cache '{}'", name); + logger.trace("Cache '{}' invalidating all", name); return reactiveClient.flushAll() - .doOnSuccess(r -> telemetryContext.recordSuccess()) + .doOnSuccess(r -> { + telemetryContext.recordSuccess(); + logger.debug("Cache '{}' invalidated all", name); + }) .onErrorResume(e -> { telemetryContext.recordFailure(e); logger.warn(e.getMessage(), e); diff --git a/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/client/DefaultLettuceCommander.java b/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/client/DefaultLettuceCommander.java index b9094862c..304eb30db 100644 --- a/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/client/DefaultLettuceCommander.java +++ b/cache/cache-redis/src/main/java/ru/tinkoff/kora/cache/redis/client/DefaultLettuceCommander.java @@ -31,7 +31,7 @@ public DefaultLettuceCommander(AbstractRedisClient redisClient) { @Override public Mono init() { return ReactorUtils.ioMono(() -> { - logger.debug("Starting Redis Client (Lettuce)..."); + logger.debug("Redis Client (Lettuce) starting..."); final long started = System.nanoTime(); try { @@ -56,7 +56,7 @@ public Mono init() { throw Exceptions.propagate(e); } - logger.info("Started Redis Client (Lettuce) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("Redis Client (Lettuce) started in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); }); } @@ -64,10 +64,10 @@ public Mono init() { public Mono release() { return ReactorUtils.ioMono(() -> { try { - logger.debug("Stopping Redis Client (Lettuce)..."); + logger.debug("Redis Client (Lettuce) stopping..."); final long started = System.nanoTime(); connection.close(); - logger.info("Stopping Redis Client (Lettuce) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("Redis Client (Lettuce) stopped in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } catch (Exception e) { throw Exceptions.propagate(e); } diff --git a/database/database-cassandra/src/main/java/ru/tinkoff/kora/database/cassandra/CassandraDatabase.java b/database/database-cassandra/src/main/java/ru/tinkoff/kora/database/cassandra/CassandraDatabase.java index fd75c1385..8bcfca3c3 100644 --- a/database/database-cassandra/src/main/java/ru/tinkoff/kora/database/cassandra/CassandraDatabase.java +++ b/database/database-cassandra/src/main/java/ru/tinkoff/kora/database/cassandra/CassandraDatabase.java @@ -21,6 +21,7 @@ public CassandraDatabase(CassandraConfig config, DataBaseTelemetryFactory teleme this.telemetry = telemetryFactory.get( Objects.requireNonNullElse(config.basic().sessionName(), "cassandra"), "cassandra", + "cassandra", Optional.ofNullable(config.auth()).map(CassandraConfig.CassandraCredentials::login).orElse("anonymous") ); } diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLogger.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLogger.java index 1459878f0..56cabdeda 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLogger.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLogger.java @@ -9,5 +9,5 @@ public interface DataBaseLogger { void logQueryBegin(QueryContext queryContext); - void logQueryEnd(long duration, QueryContext queryContext, @Nullable Throwable ex); + void logQueryEnd(long processingTime, QueryContext queryContext, @Nullable Throwable ex); } diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLoggerFactory.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLoggerFactory.java index b94e2f769..a98db215b 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLoggerFactory.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseLoggerFactory.java @@ -1,6 +1,7 @@ package ru.tinkoff.kora.database.common.telemetry; public interface DataBaseLoggerFactory { + DataBaseLogger get(String poolName); final class DefaultDataBaseLoggerFactory implements DataBaseLoggerFactory { diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseTelemetryFactory.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseTelemetryFactory.java index e0fa02ff0..81fa2ebf5 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseTelemetryFactory.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DataBaseTelemetryFactory.java @@ -1,5 +1,5 @@ package ru.tinkoff.kora.database.common.telemetry; public interface DataBaseTelemetryFactory { - DataBaseTelemetry get(String name, String dbType, String username); + DataBaseTelemetry get(String name, String driverType, String dbType, String username); } diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseLogger.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseLogger.java index 1bb526137..88b9409d3 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseLogger.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseLogger.java @@ -3,47 +3,55 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.tinkoff.kora.database.common.QueryContext; +import ru.tinkoff.kora.logging.common.arg.StructuredArgument; import javax.annotation.Nullable; -import static ru.tinkoff.kora.logging.common.arg.StructuredArgument.marker; - public class DefaultDataBaseLogger implements DataBaseLogger { - private final Logger queryLog; + + private final Logger log; private final String poolName; public DefaultDataBaseLogger(String poolName) { this.poolName = poolName; - this.queryLog = LoggerFactory.getLogger("ru.tinkoff.kora.database.jdbc." + poolName + ".query"); + this.log = LoggerFactory.getLogger("ru.tinkoff.kora.database." + poolName + ".query"); } @Override public boolean isEnabled() { - return this.queryLog.isDebugEnabled(); + return this.log.isInfoEnabled(); } @Override public void logQueryBegin(QueryContext queryContext) { - if (this.queryLog.isDebugEnabled()) { - this.queryLog.debug(marker("sqlQuery", gen -> { - gen.writeStartObject(); - gen.writeStringField("pool", this.poolName); - gen.writeStringField("queryId", queryContext.queryId()); - gen.writeEndObject(); - }), "Sql query begin"); + var marker = StructuredArgument.marker("sqlQuery", gen -> { + gen.writeStartObject(); + gen.writeStringField("pool", this.poolName); + gen.writeStringField("queryId", queryContext.queryId()); + gen.writeEndObject(); + }); + + if (log.isDebugEnabled()) { + log.debug(marker, "SQL executing for pool '{}':\n{}",this.poolName, queryContext.sql()); + } else if (log.isInfoEnabled()) { + log.info(marker, "SQL executing for pool '{}'", this.poolName); } } @Override - public void logQueryEnd(long duration, QueryContext queryContext, @Nullable Throwable ex) { - if (this.queryLog.isDebugEnabled()) { - this.queryLog.debug(marker("sqlQuery", gen -> { - gen.writeStartObject(); - gen.writeStringField("pool", this.poolName); - gen.writeStringField("queryId", queryContext.queryId()); - gen.writeNumberField("duration", duration / 1_000_000); - gen.writeEndObject(); - }), "Sql query end"); + public void logQueryEnd(long processingTime, QueryContext queryContext, @Nullable Throwable ex) { + var marker = StructuredArgument.marker("sqlQuery", gen -> { + gen.writeStartObject(); + gen.writeStringField("pool", this.poolName); + gen.writeStringField("queryId", queryContext.queryId()); + gen.writeNumberField("processingTime", processingTime / 1_000_000); + gen.writeEndObject(); + }); + + if (log.isDebugEnabled()) { + log.debug(marker, "SQL executed for pool '{}':\n{}", this.poolName, queryContext.sql()); + } else if(log.isInfoEnabled()) { + log.info(marker, "SQL executed for pool '{}'", this.poolName); } } } diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetry.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetry.java index 82eed2aa1..fba141d29 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetry.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetry.java @@ -41,13 +41,21 @@ public DataBaseTelemetryContext createContext(Context ctx, QueryContext query) { var span = tracing == null ? null : tracing.createQuerySpan(ctx, query); var start = System.nanoTime(); - if (logger != null) logger.logQueryBegin(query); + if (logger != null) { + logger.logQueryBegin(query); + } return exception -> { - var duration = System.nanoTime() - start; - if (metricWriter != null) metricWriter.recordQuery(start, query, exception); - if (logger != null) logger.logQueryEnd(duration, query, exception); - if (span != null) span.close(exception); + var processingTime = System.nanoTime() - start; + if (metricWriter != null) { + metricWriter.recordQuery(start, query, exception); + } + if (logger != null) { + logger.logQueryEnd(processingTime, query, exception); + } + if (span != null) { + span.close(exception); + } }; } } diff --git a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetryFactory.java b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetryFactory.java index 9db338827..0f5edcac0 100644 --- a/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetryFactory.java +++ b/database/database-common/src/main/java/ru/tinkoff/kora/database/common/telemetry/DefaultDataBaseTelemetryFactory.java @@ -18,7 +18,7 @@ public DefaultDataBaseTelemetryFactory( } @Override - public DataBaseTelemetry get(String name, String dbType, String username) { + public DataBaseTelemetry get(String name, String driverType, String dbType, String username) { var logger = this.loggerFactory == null ? null : this.loggerFactory.get(name); var metricWriter = this.metricWriterFactory == null ? null : this.metricWriterFactory.get(name); var tracingFactory = this.tracingFactory == null ? null : this.tracingFactory.get(dbType, null, username); diff --git a/database/database-flyway/src/main/java/ru/tinkoff/kora/database/flyway/FlywayJdbcDatabaseInterceptor.java b/database/database-flyway/src/main/java/ru/tinkoff/kora/database/flyway/FlywayJdbcDatabaseInterceptor.java index 8d77f4599..95a927306 100644 --- a/database/database-flyway/src/main/java/ru/tinkoff/kora/database/flyway/FlywayJdbcDatabaseInterceptor.java +++ b/database/database-flyway/src/main/java/ru/tinkoff/kora/database/flyway/FlywayJdbcDatabaseInterceptor.java @@ -19,14 +19,14 @@ public Mono init(JdbcDatabase value) { return ReactorUtils .ioMono(() -> { final long started = System.nanoTime(); - logger.info("Starting FlyWay migration..."); + logger.info("FlyWay migration starting..."); Flyway.configure() .dataSource(value.value()) .load() .migrate(); - logger.info("Finished FlyWay migration took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("FlyWay migration finished in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); }) .thenReturn(value); } diff --git a/database/database-jdbc/src/main/java/ru/tinkoff/kora/database/jdbc/JdbcDatabase.java b/database/database-jdbc/src/main/java/ru/tinkoff/kora/database/jdbc/JdbcDatabase.java index db488f4b7..6463822e5 100644 --- a/database/database-jdbc/src/main/java/ru/tinkoff/kora/database/jdbc/JdbcDatabase.java +++ b/database/database-jdbc/src/main/java/ru/tinkoff/kora/database/jdbc/JdbcDatabase.java @@ -14,6 +14,7 @@ import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; +import java.util.Objects; public class JdbcDatabase implements Lifecycle, Wrapped, JdbcConnectionFactory { private final Context.Key connectionKey = new Context.Key<>() { @@ -27,15 +28,27 @@ protected Connection copy(Connection object) { private final HikariDataSource dataSource; private final DataBaseTelemetry telemetry; - public JdbcDatabase(JdbcDatabaseConfig databaseConfig, DataBaseTelemetryFactory telemetryFactory) { - this(databaseConfig, telemetryFactory == null ? null : telemetryFactory.get(databaseConfig.poolName(), "", databaseConfig.username())); + public JdbcDatabase(JdbcDatabaseConfig config, DataBaseTelemetryFactory telemetryFactory) { + this(config, getTelemetry(config, telemetryFactory)); } public JdbcDatabase(JdbcDatabaseConfig databaseConfig, DataBaseTelemetry telemetry) { - this.databaseConfig = databaseConfig; - this.telemetry = telemetry; + this.databaseConfig = Objects.requireNonNull(databaseConfig); + this.telemetry = Objects.requireNonNull(telemetry); this.dataSource = new HikariDataSource(JdbcDatabaseConfig.toHikariConfig(this.databaseConfig)); - if (telemetry != null) this.dataSource.setMetricRegistry(telemetry.getMetricRegistry()); + if (telemetry != null) { + this.dataSource.setMetricRegistry(telemetry.getMetricRegistry()); + } + } + + private static DataBaseTelemetry getTelemetry(JdbcDatabaseConfig config, DataBaseTelemetryFactory factory) { + var jdbcUrl = config.jdbcUrl(); + return factory.get( + config.poolName(), + "jdbc", + jdbcUrl.substring(4, jdbcUrl.indexOf(":", 5)), + config.username() + ); } @Override diff --git a/database/database-r2dbc/src/main/java/ru/tinkoff/kora/database/r2dbc/R2dbcDatabase.java b/database/database-r2dbc/src/main/java/ru/tinkoff/kora/database/r2dbc/R2dbcDatabase.java index c83e8e7d9..afd081575 100644 --- a/database/database-r2dbc/src/main/java/ru/tinkoff/kora/database/r2dbc/R2dbcDatabase.java +++ b/database/database-r2dbc/src/main/java/ru/tinkoff/kora/database/r2dbc/R2dbcDatabase.java @@ -39,7 +39,10 @@ protected Connection copy(Connection object) { public R2dbcDatabase(R2dbcDatabaseConfig config, List> customizers, DataBaseTelemetryFactory telemetryFactory) { this.connectionFactory = r2dbcConnectionFactory(config, customizers); - this.telemetry = telemetryFactory.get(config.poolName(), config.r2dbcUrl().substring(5, config.r2dbcUrl().indexOf(":", 6)), config.username()); + this.telemetry = telemetryFactory.get(config.poolName(), + "r2dbc", + config.r2dbcUrl().substring(5, config.r2dbcUrl().indexOf(":", 6)), + config.username()); } @Override diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabase.java b/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabase.java index 0bb7be84f..bd9c50d29 100644 --- a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabase.java +++ b/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabase.java @@ -35,12 +35,12 @@ protected Transaction copy(Transaction object) { private final DataBaseTelemetry telemetry; public VertxDatabase(VertxDatabaseConfig vertxDatabaseConfig, EventLoopGroup eventLoopGroup, DataBaseTelemetryFactory telemetryFactory) { - this.telemetry = telemetryFactory.get(vertxDatabaseConfig.poolName(), "postgres", vertxDatabaseConfig.username()); this.pool = PgPool.pool( VertxUtil.customEventLoopVertx(eventLoopGroup), VertxDatabaseConfig.toPgConnectOptions(vertxDatabaseConfig), VertxDatabaseConfig.toPgPoolOptions(vertxDatabaseConfig) ); + this.telemetry = telemetryFactory.get(vertxDatabaseConfig.poolName(), "vertx", "postgres", vertxDatabaseConfig.username()); } @Override diff --git a/grpc/src/main/java/ru/tinkoff/kora/grpc/GrpcServer.java b/grpc/src/main/java/ru/tinkoff/kora/grpc/GrpcServer.java index fda49799c..88a0ac343 100644 --- a/grpc/src/main/java/ru/tinkoff/kora/grpc/GrpcServer.java +++ b/grpc/src/main/java/ru/tinkoff/kora/grpc/GrpcServer.java @@ -30,7 +30,7 @@ public GrpcServer(ValueOf nettyServerBuilder) { @Override public Mono init() { return Mono.create(sink -> { - logger.debug("Starting GRPC Server..."); + logger.debug("Starting gRPC Server..."); final long started = System.nanoTime(); var builder = nettyServerBuilder.get(); @@ -38,25 +38,24 @@ public Mono init() { try { this.server.start(); this.state.set(GrpcServerState.RUN); - logger.info("Started GRPC Server took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("gRPC Server started in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); sink.success(); } catch (IOException e) { sink.error(e); } }); - } @Override public Mono release() { return Mono.fromRunnable(() -> { - logger.debug("Stopping GRPC Server..."); + logger.debug("gRPC Server stopping..."); final long started = System.nanoTime(); state.set(GrpcServerState.SHUTDOWN); server.shutdown(); - logger.info("Stopped GRPC Server took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("gRPC Server stopped in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); }); } diff --git a/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/DefaultGrpcServerTelemetry.java b/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/DefaultGrpcServerTelemetry.java index dc749a35c..2b1bf23ec 100644 --- a/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/DefaultGrpcServerTelemetry.java +++ b/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/DefaultGrpcServerTelemetry.java @@ -51,7 +51,9 @@ public GrpcServerTelemetryContext createContext(ServerCall call, Metadata var methodName = method(call); var m = metrics == null ? null : metrics.get(call, headers, serviceName, methodName); var span = tracing == null ? null : tracing.createSpan(call, headers, serviceName, methodName); - if (logger != null) logger.logBegin(call, headers, serviceName, methodName); + if (logger != null) { + logger.logBegin(call, headers, serviceName, methodName); + } return new DefaultGrpcServerTelemetryContext(start, serviceName, methodName, m, logger, span); } diff --git a/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/Slf4jGrpcServerLogger.java b/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/Slf4jGrpcServerLogger.java index 4ff41ce98..fb60a895a 100644 --- a/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/Slf4jGrpcServerLogger.java +++ b/grpc/src/main/java/ru/tinkoff/kora/grpc/telemetry/Slf4jGrpcServerLogger.java @@ -11,53 +11,54 @@ import javax.annotation.Nullable; public final class Slf4jGrpcServerLogger implements GrpcServerLogger { + private static final Logger log = LoggerFactory.getLogger(GrpcServer.class); @Override public boolean isEnabled() { - return log.isWarnEnabled(); + return log.isInfoEnabled(); } @Override public void logEnd(String serviceName, String methodName, @Nullable Status status, @Nullable Throwable exception, long processingTime) { - if (status != null && status.isOk()) { - var marker = StructuredArgument.marker("grpcResponse", gen -> { - gen.writeStartObject(); - gen.writeStringField("serviceName", serviceName); - gen.writeStringField("operation", serviceName + "/" + methodName); - gen.writeNumberField("processingTime", processingTime / 1_000_000); + var marker = StructuredArgument.marker("grpcResponse", gen -> { + gen.writeStartObject(); + gen.writeStringField("serviceName", serviceName); + gen.writeStringField("operation", serviceName + "/" + methodName); + gen.writeNumberField("processingTime", processingTime / 1_000_000); + if (status != null) { gen.writeStringField("status", status.getCode().name()); - gen.writeEndObject(); - }); - log.info(marker, "Response finished"); - return; - } - if (status == null) { - var marker = StructuredArgument.marker("grpcResponse", gen -> { - gen.writeStartObject(); - gen.writeStringField("serviceName", serviceName); - gen.writeStringField("operation", serviceName + "/" + methodName); - gen.writeNumberField("processingTime", processingTime / 1_000_000); - gen.writeNullField("status"); - gen.writeEndObject(); - }); - log.warn(marker, "Response finished", exception); + } + if (exception != null) { + var exceptionType = exception.getClass().getCanonicalName(); + gen.writeStringField("exceptionType", exceptionType); + } + gen.writeEndObject(); + }); + + if (status != null && status.isOk()) { + log.info(marker, "GrpcCall responded {} for {}#{}", status, serviceName, methodName); + } else if (status != null) { + log.warn(marker, "GrpcCall responded {} for {}#{}", status, serviceName, methodName, exception); } else { - var marker = StructuredArgument.marker("grpcResponse", gen -> { - gen.writeStartObject(); - gen.writeStringField("serviceName", serviceName); - gen.writeStringField("operation", serviceName + "/" + methodName); - gen.writeNumberField("processingTime", processingTime / 1_000_000); - gen.writeStringField("status", status.getCode().name()); - gen.writeEndObject(); - }); - log.warn(marker, "Response finished", exception); + log.warn(marker, "GrpcCall responded for {}#{}", serviceName, methodName, exception); } } @Override public void logBegin(ServerCall call, Metadata headers, String serviceName, String methodName) { + var marker = StructuredArgument.marker("grpcRequest", gen -> { + gen.writeStartObject(); + gen.writeStringField("serviceName", serviceName); + gen.writeStringField("operation", serviceName + "/" + methodName); + gen.writeEndObject(); + }); + if (log.isDebugEnabled()) { + log.debug(marker, "GrpcCall received for {}#{}\n{}", serviceName, methodName, headers); + } else { + log.info(marker, "GrpcCall received for {}#{}", serviceName, methodName); + } } @Override diff --git a/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/DefaultHttpClientTelemetry.java b/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/DefaultHttpClientTelemetry.java index 43b6fb5df..805729865 100644 --- a/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/DefaultHttpClientTelemetry.java +++ b/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/DefaultHttpClientTelemetry.java @@ -78,9 +78,9 @@ public HttpServerTelemetryContext get(Context ctx, HttpClientRequest request) { if (requestBodyCharset == null) { this.logger.logRequest(authority, request.method(), operation, resolvedUri, headers, null); } else { - var requestBodyFlux = this.wrapBody(request.body(), true, l -> { - var s = byteBufListToBodyString(l, requestBodyCharset); - this.logger.logRequest(authority, method, operation, resolvedUri, headers, s); + var requestBodyFlux = this.wrapBody(request.body(), true, buffers -> { + var bodyString = byteBufListToBodyString(buffers, requestBodyCharset); + this.logger.logRequest(authority, method, operation, resolvedUri, headers, bodyString); }, e -> {}, () -> {}); request = request.toBuilder() .body(requestBodyFlux) @@ -104,13 +104,17 @@ public HttpClientResponse close(@Nullable HttpClientResponse response, @Nullable if (metrics != null) { metrics.record(-1, processingTime, method, host, scheme, target); } - if (logger != null && logger.logResponse()) logger.logResponse(authority, operation, processingTime, null, HttpResultCode.CONNECTION_ERROR, exception, null, null); + if (logger != null && logger.logResponse()) { + logger.logResponse(authority, operation, processingTime, null, HttpResultCode.CONNECTION_ERROR, exception, null, null); + } return null; } var responseBodyCharset = logger == null || !logger.logResponseBody() ? null : detectCharset(response.headers()); var bodySubscribed = new AtomicBoolean(false); - var responseBodyFlux = wrapBody(response.body(), responseBodyCharset != null, l -> { - if (createSpanResult != null) createSpanResult.span().close(null); + var responseBodyFlux = wrapBody(response.body(), responseBodyCharset != null, buffers -> { + if (createSpanResult != null) { + createSpanResult.span().close(null); + } var processingTime = System.nanoTime() - startTime; if (metrics != null) { metrics.record(response.code(), processingTime, method, host, scheme, target); @@ -118,7 +122,7 @@ public HttpClientResponse close(@Nullable HttpClientResponse response, @Nullable var resultCode = HttpResultCode.fromStatusCode(response.code()); if (logger != null) { var headers = logger.logResponseHeaders() ? response.headers() : null; - var bodyString = byteBufListToBodyString(l, responseBodyCharset); + var bodyString = byteBufListToBodyString(buffers, responseBodyCharset); logger.logResponse(authority, operation, processingTime, response.code(), resultCode, null, headers, bodyString); } }, e -> { @@ -127,7 +131,9 @@ public HttpClientResponse close(@Nullable HttpClientResponse response, @Nullable if (metrics != null) { metrics.record(-1, processingTime, method, host, scheme, target); } - if (logger != null && logger.logResponse()) logger.logResponse(authority, operation, processingTime, null, HttpResultCode.CONNECTION_ERROR, e, null, null); + if (logger != null && logger.logResponse()) { + logger.logResponse(authority, operation, processingTime, null, HttpResultCode.CONNECTION_ERROR, e, null, null); + } }, () -> bodySubscribed.set(true)); return new HttpClientResponse.Default( diff --git a/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/Sl4fjHttpClientLogger.java b/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/Sl4fjHttpClientLogger.java index 704642fd4..f02b23eb7 100644 --- a/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/Sl4fjHttpClientLogger.java +++ b/http/http-client-common/src/main/java/ru/tinkoff/kora/http/client/common/telemetry/Sl4fjHttpClientLogger.java @@ -1,11 +1,11 @@ package ru.tinkoff.kora.http.client.common.telemetry; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import ru.tinkoff.kora.http.common.HttpHeaders; import ru.tinkoff.kora.http.common.HttpResultCode; import ru.tinkoff.kora.logging.common.arg.StructuredArgument; +import javax.annotation.Nullable; import java.util.concurrent.CancellationException; public class Sl4fjHttpClientLogger implements HttpClientLogger { @@ -48,7 +48,12 @@ public boolean logResponseBody() { } @Override - public void logRequest(String authority, String method, String operation, String resolvedUri, @Nullable HttpHeaders headers, @Nullable String body) { + public void logRequest(String authority, + String method, + String operation, + String resolvedUri, + @Nullable HttpHeaders headers, + @Nullable String body) { var marker = StructuredArgument.marker("httpResponse", gen -> { gen.writeStartObject(); gen.writeStringField("authority", authority); @@ -56,22 +61,27 @@ public void logRequest(String authority, String method, String operation, String gen.writeEndObject(); }); - if (this.requestLog.isTraceEnabled() && body != null) { + if (this.requestLog.isTraceEnabled() && headers != null && headers.size() > 0 && body != null) { var headersString = this.requestHeaderString(headers); var bodyStr = this.requestBodyString(body); - this.requestLog.trace(marker, "{} {}\n{}\n{}\n", method, resolvedUri, headersString, bodyStr); - return; - } - if (this.requestLog.isDebugEnabled()) { + this.requestLog.trace(marker, "Requesting {}\n{}\n{}", operation, headersString, bodyStr); + } else if (this.requestLog.isDebugEnabled() && headers != null && headers.size() > 0) { var headersString = this.requestHeaderString(headers); - this.requestLog.debug(marker, "{} {}\n{}\n", method, resolvedUri, headersString); - return; + this.requestLog.debug(marker, "Requesting {}\n{}", operation, headersString); + } else { + this.requestLog.info(marker, "Requesting {}", operation); } - this.requestLog.info(marker, "{}", operation); } @Override - public void logResponse(String authority, String operation, long processingTime, @Nullable Integer statusCode, HttpResultCode resultCode, @Nullable Throwable exception, @Nullable HttpHeaders headers, @Nullable String body) { + public void logResponse(String authority, + String operation, + long processingTime, + @Nullable Integer statusCode, + HttpResultCode resultCode, + @Nullable Throwable exception, + @Nullable HttpHeaders headers, + @Nullable String body) { var exceptionTypeString = exception != null ? exception.getClass().getCanonicalName() : statusCode != null ? null : CancellationException.class.getCanonicalName(); @@ -80,33 +90,30 @@ public void logResponse(String authority, String operation, long processingTime, gen.writeStartObject(); gen.writeStringField("authority", authority); gen.writeStringField("operation", operation); - gen.writeNumberField("processingTime", processingTime); - gen.writeFieldName("statusCode"); + gen.writeStringField("resultCode", resultCode.name().toLowerCase()); + gen.writeNumberField("processingTime", processingTime / 1_000_000); if (statusCode != null) { + gen.writeFieldName("statusCode"); gen.writeNumber(statusCode); - } else { - gen.writeNull(); } - gen.writeStringField("resultCode", resultCode.name().toLowerCase()); - gen.writeStringField("exceptionType", exceptionTypeString); + if (exceptionTypeString != null) { + gen.writeStringField("exceptionType", exceptionTypeString); + } gen.writeEndObject(); }); - if (responseLog.isTraceEnabled() && body != null) { + + if (responseLog.isTraceEnabled() && headers != null && headers.size() > 0 && body != null) { var headersString = this.responseHeaderString(headers); var bodyStr = this.responseBodyString(body); - responseLog.trace(marker, "{}\n{}\n{}\n", statusCode, headersString, bodyStr); - return; - } - if (responseLog.isDebugEnabled() && headers != null) { + responseLog.trace(marker, "Received {} from {}\n{}\n{}", statusCode, operation, headersString, bodyStr); + } else if (responseLog.isDebugEnabled() && headers != null && headers.size() > 0) { var headersString = this.responseHeaderString(headers); - responseLog.debug(marker, "{}\n{}\n", statusCode, headersString); - return; + responseLog.debug(marker, "Received {} from {}\n{}", statusCode, operation, headersString); + } else if (statusCode != null) { + responseLog.info(marker, "Received {} from {}", statusCode, operation); + } else { + responseLog.info(marker, "Received no HttpResponse from {}", operation); } - if (statusCode != null) { - responseLog.info(marker, "{}", statusCode); - return; - } - responseLog.info(marker, "Http response was not received"); } public String responseBodyString(String body) { @@ -124,5 +131,4 @@ public String requestBodyString(String body) { public String requestHeaderString(HttpHeaders headers) { return HttpHeaders.toString(headers); } - } diff --git a/http/http-common/src/main/java/ru/tinkoff/kora/http/common/HttpHeaders.java b/http/http-common/src/main/java/ru/tinkoff/kora/http/common/HttpHeaders.java index 0f7b57108..1e727b46d 100644 --- a/http/http-common/src/main/java/ru/tinkoff/kora/http/common/HttpHeaders.java +++ b/http/http-common/src/main/java/ru/tinkoff/kora/http/common/HttpHeaders.java @@ -110,6 +110,10 @@ static HttpHeaders of(String k1, String v1, String k2, String v2, String k3, Str static String toString(HttpHeaders headers) { var sb = new StringBuilder(); for (var entry : headers) { + if(!sb.isEmpty()) { + sb.append('\n'); + } + sb.append(entry.getKey()); boolean first = true; for (var val : entry.getValue()) { @@ -121,7 +125,6 @@ static String toString(HttpHeaders headers) { } sb.append(val); } - sb.append("\n"); } return sb.toString(); } diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/HttpServerModule.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/HttpServerModule.java index 31528e83a..9ba3521b6 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/HttpServerModule.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/HttpServerModule.java @@ -52,7 +52,7 @@ default PublicApiHandler publicApiHandler(All> } default HttpServerLogger httpServerLogger() { - return new HttpServerLogger.DefaultHttpServerLogger(); + return new Slf4jHttpServerLogger(); } default HttpServerResponseMapper noopResponseMapper() { diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/router/PublicApiHandler.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/router/PublicApiHandler.java index df628cf49..4780d72f1 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/router/PublicApiHandler.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/router/PublicApiHandler.java @@ -1,6 +1,8 @@ package ru.tinkoff.kora.http.server.common.router; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import ru.tinkoff.kora.application.graph.All; @@ -30,6 +32,9 @@ */ public class PublicApiHandler implements RefreshListener { + + private static final Logger log = LoggerFactory.getLogger(HttpServer.class); + private final Function> NOT_FOUND_HANDLER = request -> Mono.just(new SimpleHttpServerResponse(404, "application/octet-stream", HttpHeaders.of(), null)); @@ -152,21 +157,21 @@ private void sendResponse(HttpServerTelemetry.HttpServerTelemetryContext ctx, Ht if (ex == null && response instanceof Throwable throwableResponse) { ex = throwableResponse; } - ctx.close(success.code(), resultCode, ex); + ctx.close(success.code(), resultCode, response.headers(), ex); } else if (result instanceof HttpServerResponseSender.ResponseBodyErrorBeforeCommit responseBodyError) { var newResponse = new SimpleHttpServerResponse(500, "text/plain", HttpHeaders.of(), StandardCharsets.UTF_8.encode( Objects.requireNonNullElse(responseBodyError.error().getMessage(), "Unknown error") )); responseSender.send(newResponse).subscribe(v -> { - ctx.close(500, HttpResultCode.SERVER_ERROR, responseBodyError.error()); + ctx.close(500, HttpResultCode.SERVER_ERROR, newResponse.headers(), responseBodyError.error()); }); } else if (result instanceof HttpServerResponseSender.ResponseBodyError responseBodyError) { - ctx.close(response.code(), HttpResultCode.SERVER_ERROR, responseBodyError.error()); + ctx.close(response.code(), HttpResultCode.SERVER_ERROR, response.headers(), responseBodyError.error()); } else if (result instanceof HttpServerResponseSender.ConnectionError connectionError) { - ctx.close(response.code(), HttpResultCode.CONNECTION_ERROR, connectionError.error()); + ctx.close(response.code(), HttpResultCode.CONNECTION_ERROR, response.headers(), connectionError.error()); } }, e -> { - HttpServerLogger.log.error("Error dropped: looks like a bug in HttpServerResponseSender", e); + log.error("Error dropped: looks like a bug in HttpServerResponseSender", e); }); } diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/DefaultHttpServerTelemetry.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/DefaultHttpServerTelemetry.java index cacf3fd67..3149ee9d7 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/DefaultHttpServerTelemetry.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/DefaultHttpServerTelemetry.java @@ -6,9 +6,7 @@ public final class DefaultHttpServerTelemetry implements HttpServerTelemetry { private static final String UNMATCHED_ROUTE_TEMPLATE = "UNKNOWN_ROUTE"; - private static final HttpServerTelemetryContext NOOP_CTX = (statusCode, resultCode, exception) -> { - - }; + private static final HttpServerTelemetryContext NOOP_CTX = (statusCode, resultCode, httpHeaders, exception) -> { }; @Nullable private final HttpServerMetrics metrics; @@ -31,12 +29,14 @@ public HttpServerTelemetryContext get(PublicApiHandler.PublicApiRequest request, if (metrics == null && tracer == null && (logger == null || !logger.isEnabled())) { return NOOP_CTX; } + var start = System.nanoTime(); var method = request.method(); var scheme = request.scheme(); var host = request.hostName(); - - if (metrics != null) metrics.requestStarted(method, routeTemplate != null ? routeTemplate : UNMATCHED_ROUTE_TEMPLATE, host, scheme); + if (metrics != null) { + metrics.requestStarted(method, routeTemplate != null ? routeTemplate : UNMATCHED_ROUTE_TEMPLATE, host, scheme); + } final HttpServerTracer.HttpServerSpan span; final String operation; @@ -48,22 +48,23 @@ public HttpServerTelemetryContext get(PublicApiHandler.PublicApiRequest request, span = null; } if (logger != null) { - logger.logStart(operation); + logger.logStart(operation, request.headers()); } } else { span = null; operation = null; } - return (statusCode, resultCode, exception) -> { + return (statusCode, resultCode, httpHeaders, exception) -> { var end = System.nanoTime(); var processingTime = end - start; - - if (metrics != null) metrics.requestFinished(method, routeTemplate != null ? routeTemplate : UNMATCHED_ROUTE_TEMPLATE, host, scheme, statusCode, processingTime); + if (metrics != null) { + metrics.requestFinished(method, routeTemplate != null ? routeTemplate : UNMATCHED_ROUTE_TEMPLATE, host, scheme, statusCode, processingTime); + } if (routeTemplate != null) { if (logger != null) { - logger.logEnd(operation, statusCode, resultCode, processingTime, exception); + logger.logEnd(operation, statusCode, resultCode, processingTime, httpHeaders, exception); } if (span != null) { span.close(statusCode, resultCode, exception); diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerLogger.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerLogger.java index f07d70c97..62a9d2b6d 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerLogger.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerLogger.java @@ -1,69 +1,21 @@ package ru.tinkoff.kora.http.server.common.telemetry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import ru.tinkoff.kora.http.common.HttpHeaders; import ru.tinkoff.kora.http.common.HttpResultCode; -import ru.tinkoff.kora.http.server.common.HttpServer; -import ru.tinkoff.kora.logging.common.arg.StructuredArgument; import javax.annotation.Nullable; public interface HttpServerLogger { - Logger log = LoggerFactory.getLogger(HttpServer.class); boolean isEnabled(); - void logStart(String operation); + void logStart(String operation, @Nullable HttpHeaders headers); - void logEnd(String operation, Integer statusCode, HttpResultCode resultCode, long processingTime, @Nullable Throwable exception); - - final class DefaultHttpServerLogger implements HttpServerLogger { - @Override - public boolean isEnabled() { - return log.isInfoEnabled(); - } - - @Override - public void logStart(String operation) { - if (!log.isInfoEnabled()) { - return; - } - log.info(StructuredArgument.marker("httpRequest", gen -> { - gen.writeStartObject(); - gen.writeStringField("operation", operation); - gen.writeEndObject(); - }), "Http request begin"); - } - - @Override - public void logEnd(String operation, Integer statusCode, HttpResultCode resultCode, long processingTime, @Nullable Throwable exception) { - if (!log.isInfoEnabled()) { - return; - } - if (exception == null) { - log.info(StructuredArgument.marker("httpResponse", gen -> { - gen.writeStartObject(); - gen.writeStringField("operation", operation); - gen.writeNumberField("processingTime", processingTime / 1_000_000); - gen.writeNumberField("statusCode", statusCode); - gen.writeStringField("resultCode", resultCode.string()); - gen.writeNullField("exceptionType"); - gen.writeEndObject(); - }), "Http request end"); - - } else { - var exceptionType = exception.getClass().getCanonicalName(); - log.info(StructuredArgument.marker("httpResponse", gen -> { - gen.writeStartObject(); - gen.writeStringField("operation", operation); - gen.writeNumberField("processingTime", processingTime / 1_000_000); - gen.writeNumberField("statusCode", statusCode); - gen.writeStringField("resultCode", resultCode.string()); - gen.writeStringField("exceptionType", exceptionType); - gen.writeEndObject(); - }), "Http request end", exception); - } - } - } + void logEnd(String operation, + Integer statusCode, + HttpResultCode resultCode, + long processingTime, + @Nullable HttpHeaders headers, + @Nullable Throwable exception); } diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTelemetry.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTelemetry.java index 80c6ec09c..253d033dd 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTelemetry.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTelemetry.java @@ -1,13 +1,15 @@ package ru.tinkoff.kora.http.server.common.telemetry; +import ru.tinkoff.kora.http.common.HttpHeaders; import ru.tinkoff.kora.http.common.HttpResultCode; import ru.tinkoff.kora.http.server.common.router.PublicApiHandler; import javax.annotation.Nullable; public interface HttpServerTelemetry { + interface HttpServerTelemetryContext { - void close(int statusCode, HttpResultCode resultCode, @Nullable Throwable exception); + void close(int statusCode, HttpResultCode resultCode, @Nullable HttpHeaders headers, @Nullable Throwable exception); } HttpServerTelemetryContext get(PublicApiHandler.PublicApiRequest request, @Nullable String routeTemplate); diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTracer.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTracer.java index 43293906c..efdec7a0c 100644 --- a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTracer.java +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/HttpServerTracer.java @@ -7,11 +7,11 @@ import javax.annotation.Nullable; public interface HttpServerTracer { + interface HttpServerSpan { void close(int statusCode, HttpResultCode resultCode, @Nullable Throwable exception); } - interface HeadersSetter { void set(H headers, String key, String value); } diff --git a/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/Slf4jHttpServerLogger.java b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/Slf4jHttpServerLogger.java new file mode 100644 index 000000000..c40fada56 --- /dev/null +++ b/http/http-server-common/src/main/java/ru/tinkoff/kora/http/server/common/telemetry/Slf4jHttpServerLogger.java @@ -0,0 +1,72 @@ +package ru.tinkoff.kora.http.server.common.telemetry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.tinkoff.kora.http.common.HttpHeaders; +import ru.tinkoff.kora.http.common.HttpResultCode; +import ru.tinkoff.kora.http.server.common.HttpServer; +import ru.tinkoff.kora.logging.common.arg.StructuredArgument; + +import javax.annotation.Nullable; + +public final class Slf4jHttpServerLogger implements HttpServerLogger { + + private static final Logger log = LoggerFactory.getLogger(HttpServer.class); + + @Override + public boolean isEnabled() { + return log.isInfoEnabled(); + } + + @Override + public void logStart(String operation, @Nullable HttpHeaders headers) { + if (!log.isInfoEnabled()) { + return; + } + + var marker = StructuredArgument.marker("httpRequest", gen -> { + gen.writeStartObject(); + gen.writeStringField("operation", operation); + gen.writeEndObject(); + }); + + if (log.isDebugEnabled() && headers != null && headers.size() > 0) { + var headersString = HttpHeaders.toString(headers); + log.debug(marker, "HttpRequest received for {}\n{}", operation, headersString); + } else { + log.info(marker, "HttpRequest received for {}", operation); + } + } + + @Override + public void logEnd(String operation, + Integer statusCode, + HttpResultCode resultCode, + long processingTime, + @Nullable HttpHeaders headers, + @Nullable Throwable exception) { + if (!log.isInfoEnabled()) { + return; + } + + var marker = StructuredArgument.marker("httpResponse", gen -> { + gen.writeStartObject(); + gen.writeStringField("operation", operation); + gen.writeStringField("resultCode", resultCode.name().toLowerCase()); + gen.writeNumberField("processingTime", processingTime / 1_000_000); + gen.writeNumberField("statusCode", statusCode); + if (exception != null) { + var exceptionType = exception.getClass().getCanonicalName(); + gen.writeStringField("exceptionType", exceptionType); + } + gen.writeEndObject(); + }); + + if (log.isDebugEnabled() && headers != null && headers.size() > 0) { + var headersString = HttpHeaders.toString(headers); + log.debug(marker, "HttpRequest responded {} for {}\n{}", statusCode, operation, headersString); + } else if (statusCode != null) { + log.info(marker, "HttpRequest responded {} for {}", statusCode, operation); + } + } +} diff --git a/http/http-server-common/src/testFixtures/java/ru/tinkoff/kora/http/server/common/HttpServerTestKit.java b/http/http-server-common/src/testFixtures/java/ru/tinkoff/kora/http/server/common/HttpServerTestKit.java index e38e32dfa..a5b58675a 100644 --- a/http/http-server-common/src/testFixtures/java/ru/tinkoff/kora/http/server/common/HttpServerTestKit.java +++ b/http/http-server-common/src/testFixtures/java/ru/tinkoff/kora/http/server/common/HttpServerTestKit.java @@ -292,8 +292,8 @@ void testUnknownPath() throws IOException { try (var response = client.newCall(request).execute()) { assertThat(response.code()).isEqualTo(404); } - verify(logger, never()).logStart(any()); - verify(logger, never()).logEnd(any(), any(), any(), anyLong().getAsLong(), any()); + verify(logger, never()).logStart(any(), any()); + verify(logger, never()).logEnd(any(), any(), any(), anyLong().getAsLong(), any(), any()); verify(metrics, times(1)).requestStarted(eq(GET), eq("UNKNOWN_ROUTE"), eq("localhost"), eq("http")); verify(metrics, timeout(100).times(1)).requestFinished(eq(GET), eq("UNKNOWN_ROUTE"), eq("localhost"), eq("http"), eq(404), Mockito.anyLong()); } @@ -604,8 +604,8 @@ private void verifyResponse(String method, String route, int code, HttpResultCod private void verifyResponse(String method, String route, int code, HttpResultCode resultCode, String host, String scheme, Supplier throwable, LongSupplier duration, VerificationMode mode) { verify(metrics, mode).requestStarted(eq(method), eq(route), eq(host), eq(scheme)); - verify(logger, mode).logStart(method + " " + route); - verify(logger, mode).logEnd(eq(method + " " + route), eq(code), eq(resultCode), duration.getAsLong(), throwable.get()); + verify(logger, mode).logStart(eq(method + " " + route), any()); + verify(logger, mode).logEnd(eq(method + " " + route), eq(code), eq(resultCode), duration.getAsLong(), any(), throwable.get()); verify(metrics, mode).requestFinished(eq(method), eq(route), eq(host), eq(scheme), eq(code), Mockito.anyLong()); } diff --git a/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowHttpServer.java b/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowHttpServer.java index 68bb9b962..2c836eb29 100644 --- a/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowHttpServer.java +++ b/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowHttpServer.java @@ -42,20 +42,20 @@ public Mono release() { return Mono.fromRunnable(() -> this.state.set(HttpServerState.SHUTDOWN)) .then(Mono.delay(this.config.get().shutdownWait())) .then(ReactorUtils.ioMono(() -> { - logger.debug("Stopping Public HTTP Server (Undertow)..."); + logger.debug("Public HTTP Server (Undertow) stopping..."); final long started = System.nanoTime(); this.gracefulShutdown.shutdown(); try { + logger.debug("Public HTTP Server (Undertow) awaiting graceful shutdown..."); this.gracefulShutdown.awaitShutdown(); } catch (InterruptedException e) { e.printStackTrace(); } - logger.debug("Awaiting Public HTTP Server (Undertow) graceful shutdown..."); if (this.undertow != null) { this.undertow.stop(); this.undertow = null; } - logger.info("Stopped Public HTTP Server (Undertow) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("Public HTTP Server (Undertow) stopped in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); })); } @@ -64,7 +64,7 @@ public Mono init() { return Mono.create(sink -> { // dirty hack to start undertow thread as non daemon var t = new Thread(() -> { - logger.debug("Starting Public HTTP Server (Undertow)..."); + logger.debug("Public HTTP Server (Undertow) starting..."); final long started = System.nanoTime(); try { this.gracefulShutdown.start(); @@ -72,7 +72,7 @@ public Mono init() { this.undertow.start(); this.state.set(HttpServerState.RUN); var data = StructuredArgument.marker( "port", this.port() ); - logger.info(data, "Started Public HTTP Server (Undertow) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info(data, "Public HTTP Server (Undertow) started in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); sink.success(); } catch (Throwable e) { sink.error(e); diff --git a/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowPrivateHttpServer.java b/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowPrivateHttpServer.java index 51060f2aa..10c425974 100644 --- a/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowPrivateHttpServer.java +++ b/http/http-server-undertow/src/main/java/ru/tinkoff/kora/http/server/undertow/UndertowPrivateHttpServer.java @@ -34,13 +34,13 @@ public UndertowPrivateHttpServer(ValueOf config, ValueOf release() { return Mono.delay(this.config.get().shutdownWait()) .then(ReactorUtils.ioMono(() -> { - logger.debug("Stopping Private HTTP Server (Undertow)..."); + logger.debug("Private HTTP Server (Undertow) stopping..."); final long started = System.nanoTime(); if (this.undertow != null) { this.undertow.stop(); this.undertow = null; } - logger.info("Stopped Private HTTP Server (Undertow) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info("Private HTTP Server (Undertow) stopped in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); })); } @@ -49,13 +49,13 @@ public Mono init() { return Mono.create(sink -> { // dirty hack to start undertow thread as non daemon var t = new Thread(() -> { - logger.debug("Starting Private HTTP Server (Undertow)..."); + logger.debug("Private HTTP Server (Undertow) starting..."); final long started = System.nanoTime(); try { this.undertow = this.createServer(); this.undertow.start(); var data = StructuredArgument.marker( "port", this.port() ); - logger.info(data, "Started Private HTTP Server (Undertow) took {}", Duration.ofNanos(System.nanoTime() - started)); + logger.info(data, "Private HTTP Server (Undertow) started in {}", Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); sink.success(); } catch (Throwable e) { sink.error(e); diff --git a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaAssignConsumerContainer.java b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaAssignConsumerContainer.java index 0628f99ea..1caee181b 100644 --- a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaAssignConsumerContainer.java +++ b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaAssignConsumerContainer.java @@ -28,7 +28,8 @@ import java.util.concurrent.atomic.AtomicReference; public final class KafkaAssignConsumerContainer implements Lifecycle { - private final static Logger logger = LoggerFactory.getLogger(KafkaSubscribeConsumerContainer.class); + + private final static Logger logger = LoggerFactory.getLogger(KafkaAssignConsumerContainer.class); private final AtomicBoolean isActive = new AtomicBoolean(true); private final AtomicLong backoffTimeout; @@ -48,7 +49,6 @@ public final class KafkaAssignConsumerContainer implements Lifecycle { private final String topic; private final KafkaConsumerTelemetry telemetry; - public KafkaAssignConsumerContainer( KafkaConsumerConfig config, String topic, @@ -68,28 +68,35 @@ public KafkaAssignConsumerContainer( this.consumerPrefix = KafkaUtils.getConsumerPrefix(this.config); } - public void launchPollLoop(int number) { + public void launchPollLoop(int number, long started) { while (isActive.get()) { final Consumer consumer; try { var realConsumer = new KafkaConsumer<>(this.config.driverProperties(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); consumer = new ConsumerWrapper<>(realConsumer, keyDeserializer, valueDeserializer); } catch (KafkaException e) { - logger.error("Error initializing KafkaConsumer", e); + logger.error("Kafka Consumer '{}' initialization failed", consumerPrefix, e); try { Thread.sleep(1000); } catch (InterruptedException ie) { - logger.error("Error interrupting thread", ie); + logger.error("Kafka Consumer '{}' error interrupting thread", consumerPrefix, ie); } continue; } + var allPartitions = this.partitions.get(); var partitions = List.of(); try (consumer) { consumers.add(consumer); + logger.info("Kafka Consumer '{}' started in {}", + consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); + + boolean isFirstPoll = true; while (isActive.get()) { var changed = this.refreshPartitions(allPartitions); if (changed) { + logger.info("Kafka Consumer '{}' refreshing and reassigning partitions...", consumerPrefix); + allPartitions = this.partitions.get(); partitions = new ArrayList<>(allPartitions.size() / threads + 1); for (var i = number; i < allPartitions.size(); i++) { @@ -97,6 +104,7 @@ public void launchPollLoop(int number) { partitions.add(allPartitions.get(i)); } } + consumer.assign(partitions); synchronized (this.offsets) { this.offsets.ensureCapacity(partitions.size()); @@ -122,14 +130,41 @@ public void launchPollLoop(int number) { } } } + if (partitions.isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException ignore) {} continue; } + try { + logger.trace("Kafka Consumer '{}' polling...", consumerPrefix); + var records = consumer.poll(config.pollTimeout()); + if (isFirstPoll) { + logger.info("Kafka Consumer '{}' first poll in {}", + consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); + isFirstPoll = false; + } + + // log + if (!records.isEmpty() && logger.isTraceEnabled()) { + var logTopics = new HashSet(records.partitions().size()); + var logPartitions = new HashSet(records.partitions().size()); + for (TopicPartition partition : records.partitions()) { + logPartitions.add(partition.partition()); + logTopics.add(partition.topic()); + } + + logger.trace("Kafka Consumer '{}' polled '{}' records from topics {} and partitions {}", + consumerPrefix, records.count(), logTopics, logPartitions); + } else if (!records.isEmpty() && logger.isDebugEnabled()) { + logger.debug("Kafka Consumer '{}' polled '{}' records", consumerPrefix, records.count()); + } else { + logger.trace("Kafka Consumer '{}' polled '0' records", consumerPrefix); + } + handler.handle(records, consumer, false); for (var partition : records.partitions()) { var partitionRecords = records.records(partition); @@ -139,14 +174,15 @@ public void launchPollLoop(int number) { this.refreshLag(consumer); } } + backoffTimeout.set(config.backoffTimeout().toMillis()); } catch (WakeupException ignore) { } catch (Exception e) { - logger.error("Unhandled exception", e); + logger.error("Kafka Consumer '{}' got unhandled exception", consumerPrefix, e); try { Thread.sleep(backoffTimeout.get()); } catch (InterruptedException ie) { - logger.error("Error interrupting thread", ie); + logger.error("Kafka Consumer '{}' error interrupting thread", consumerPrefix, ie); } if (backoffTimeout.get() < 60000) backoffTimeout.set(backoffTimeout.get() * 2); break; @@ -182,6 +218,7 @@ private boolean refreshPartitions(List partitions) { if (currentTime - updateTime <= refreshInterval) { return oldPartitions.size() != partitions.size(); } + if (lastUpdateTime.compareAndSet(updateTime, currentTime)) { // we have to create new consumer to ignore metadata cache try (var consumer = new KafkaConsumer<>(this.config.driverProperties(), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { @@ -216,16 +253,16 @@ public Mono init() { var threads = this.threads; if (threads > 0) { if (this.topic != null) { - logger.debug("Starting Kafka Consumer '{}'...", consumerPrefix); + logger.debug("Kafka Consumer '{}' starting...", consumerPrefix); final long started = System.nanoTime(); executorService = Executors.newFixedThreadPool(threads, new NamedThreadFactory(this.topic)); for (int i = 0; i < threads; i++) { var number = i; - executorService.execute(() -> launchPollLoop(number)); + executorService.execute(() -> launchPollLoop(number, started)); } - logger.info("Started Kafka Consumer '{}' took {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started)); + logger.info("Kafka Consumer '{}' started in {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } } }); @@ -235,6 +272,9 @@ public Mono init() { public Mono release() { return Mono.fromRunnable(() -> { if (isActive.compareAndSet(true, false)) { + logger.debug("Kafka Consumer '{}' stopping...", consumerPrefix); + final long started = System.nanoTime(); + for (var consumer : consumers) { consumer.wakeup(); } @@ -242,6 +282,8 @@ public Mono release() { if (executorService != null) { executorService.shutdownNow(); } + + logger.info("Kafka Consumer '{}' stopped in {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } }); } diff --git a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaProducerContainer.java b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaProducerContainer.java index a841b12a5..8dda4392a 100644 --- a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaProducerContainer.java +++ b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaProducerContainer.java @@ -12,6 +12,7 @@ * @param key type * @param value type * @deprecated move to @ru.tinkoff.kora.kafka.common.annotation.KafkaPublisher generated producer + * @see ru.tinkoff.kora.kafka.common.annotation.KafkaPublisher */ @Deprecated public final class KafkaProducerContainer implements Lifecycle, Wrapped> { diff --git a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaSubscribeConsumerContainer.java b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaSubscribeConsumerContainer.java index 4a2cd1bbb..aa7b165d2 100644 --- a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaSubscribeConsumerContainer.java +++ b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/consumer/containers/KafkaSubscribeConsumerContainer.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -17,10 +18,7 @@ import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.BaseKafkaRecordsHandler; import java.time.Duration; -import java.util.Collections; -import java.util.IdentityHashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,13 +67,13 @@ public KafkaSubscribeConsumerContainer( } - public void launchPollLoop() { + public void launchPollLoop(long started) { while (isActive.get()) { final Consumer consumer; try { consumer = this.buildConsumer(); } catch (Exception e) { - logger.error("Kafka Consumer '{}' failed initializing", consumerPrefix, e); + logger.error("Kafka Consumer '{}' initialization failed", consumerPrefix, e); try { Thread.sleep(1000); } catch (InterruptedException ie) { @@ -83,11 +81,40 @@ public void launchPollLoop() { } continue; } + try (consumer) { consumers.add(consumer); + logger.info("Kafka Consumer '{}' started in {}", + consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); + + boolean isFirstPoll = true; while (isActive.get()) { try { + logger.trace("Kafka Consumer '{}' polling...", consumerPrefix); + var records = consumer.poll(config.pollTimeout()); + if (isFirstPoll) { + logger.info("Kafka Consumer '{}' first poll in {}", + consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); + isFirstPoll = false; + } + + if (!records.isEmpty() && logger.isTraceEnabled()) { + var logTopics = new HashSet(records.partitions().size()); + var logPartitions = new HashSet(records.partitions().size()); + for (TopicPartition partition : records.partitions()) { + logPartitions.add(partition.partition()); + logTopics.add(partition.topic()); + } + + logger.trace("Kafka Consumer '{}' polled '{}' records from topics {} and partitions {}", + consumerPrefix, records.count(), logTopics, logPartitions); + } else if (!records.isEmpty() && logger.isDebugEnabled()) { + logger.debug("Kafka Consumer '{}' polled '{}' records", consumerPrefix, records.count()); + } else { + logger.trace("Kafka Consumer '{}' polled '0' records", consumerPrefix); + } + handler.handle(records, consumer, this.commitAllowed); backoffTimeout.set(config.backoffTimeout().toMillis()); } catch (WakeupException ignore) { @@ -98,7 +125,9 @@ public void launchPollLoop() { } catch (InterruptedException ie) { logger.error("Kafka Consumer '{}' error interrupting thread", consumerPrefix, ie); } - if (backoffTimeout.get() < 60000) backoffTimeout.set(backoffTimeout.get() * 2); + if (backoffTimeout.get() < 60000) { + backoffTimeout.set(backoffTimeout.get() * 2); + } break; } finally { Context.clear(); @@ -115,15 +144,13 @@ public void launchPollLoop() { public Mono init() { return Mono.fromRunnable(() -> { if (config.threads() > 0 && this.isActive.compareAndSet(false, true)) { - logger.debug("Starting Kafka Consumer '{}'...", consumerPrefix); + logger.debug("Kafka Consumer '{}' starting...", consumerPrefix); final long started = System.nanoTime(); executorService = Executors.newFixedThreadPool(config.threads(), new NamedThreadFactory(consumerPrefix)); for (int i = 0; i < config.threads(); i++) { - executorService.execute(this::launchPollLoop); + executorService.execute(() -> launchPollLoop(started)); } - - logger.info("Started Kafka Consumer '{}' took {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started)); } }); } @@ -132,7 +159,7 @@ public Mono init() { public Mono release() { return Mono.fromRunnable(() -> { if (isActive.compareAndSet(true, false)) { - logger.debug("Stopping Kafka Consumer '{}'...", consumerPrefix); + logger.debug("Kafka Consumer '{}' stopping...", consumerPrefix); final long started = System.nanoTime(); for (var consumer : consumers) { @@ -143,7 +170,7 @@ public Mono release() { executorService.shutdownNow(); } - logger.info("Stopped Kafka Consumer '{}' took {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started)); + logger.info("Kafka Consumer '{}' stopped in {}", consumerPrefix, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } }); } diff --git a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/producer/telemetry/DefaultKafkaProducerLogger.java b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/producer/telemetry/DefaultKafkaProducerLogger.java index 7f80c5608..be2032115 100644 --- a/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/producer/telemetry/DefaultKafkaProducerLogger.java +++ b/kafka/kafka/src/main/java/ru/tinkoff/kora/kafka/common/producer/telemetry/DefaultKafkaProducerLogger.java @@ -9,43 +9,67 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; public class DefaultKafkaProducerLogger implements KafkaProducerLogger { - private static final Logger log = LoggerFactory.getLogger(DefaultKafkaProducerLogger.class); + + private static final Logger logger = LoggerFactory.getLogger(DefaultKafkaProducerLogger.class); @Override public void sendBegin(ProducerRecord record) { - + logger.debug("Kafka Producer sending record to topic {} and partition {}", record.topic(), record.partition()); } @Override public void sendEnd(ProducerRecord record, Throwable e) { - log.warn("Error sending record to kafka topic {}", record.topic(), e); + logger.warn("Kafka Producer error sending record to topic {} and partition {}", record.topic(), record.topic(), e); } @Override public void sendEnd(RecordMetadata metadata) { - + logger.debug("Kafka Producer success sending record to topic {} and partition {} and offset {}", metadata.topic(), metadata.partition(), metadata.offset()); } @Override public void txBegin() { - + logger.debug("Kafka Producer starting transaction..."); } @Override public void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) { - + if (logger.isTraceEnabled()) { + final Map>> traceInfo = new HashMap<>(); + for (var metadataEntry : offsets.entrySet()) { + final Map> partitionInfo = traceInfo.computeIfAbsent(metadataEntry.getKey().topic(), k -> new HashMap<>()); + final Set offsetInfo = partitionInfo.computeIfAbsent(metadataEntry.getKey().partition(), k -> new TreeSet<>()); + offsetInfo.add(metadataEntry.getValue().offset()); + } + + final String transactionMeta = traceInfo.entrySet().stream() + .map(ti -> "topic=" + ti.getKey() + ", partitions=" + ti.getValue().entrySet().stream() + .map(pi -> "partition=" + pi.getKey() + ", offsets=" + pi.getValue().stream() + .map(String::valueOf) + .collect(Collectors.joining(", ", "[", "]"))) + .collect(Collectors.joining("], [", "[", "]"))) + .collect(Collectors.joining("], [", "[", "]")); + + logger.trace("Kafka Producer success sending '{}' transaction records with meta: {}", offsets.size(), transactionMeta); + } else { + logger.debug("Kafka Producer success sending '{}' transaction records", offsets.size()); + } } @Override public void txCommit() { - + logger.debug("Kafka Producer committing transaction..."); } @Override public void txRollback(@Nullable Throwable e) { - + logger.debug("Kafka Producer rollback transaction..."); } } diff --git a/scheduling/scheduling-jdk/src/main/java/ru/tinkoff/kora/scheduling/jdk/AbstractJob.java b/scheduling/scheduling-jdk/src/main/java/ru/tinkoff/kora/scheduling/jdk/AbstractJob.java index fc1440cc3..cc105b88e 100644 --- a/scheduling/scheduling-jdk/src/main/java/ru/tinkoff/kora/scheduling/jdk/AbstractJob.java +++ b/scheduling/scheduling-jdk/src/main/java/ru/tinkoff/kora/scheduling/jdk/AbstractJob.java @@ -32,13 +32,13 @@ public AbstractJob(SchedulingTelemetry telemetry, JdkSchedulingExecutor service, public final Mono init() { return Mono.fromRunnable(() -> { if (this.started.compareAndSet(false, true)) { - logger.debug("Starting Scheduled Job '{}#{}'...", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod()); + logger.debug("Scheduled Job '{}#{}' starting...", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod()); final long started = System.nanoTime(); this.scheduledFuture = this.schedule(this.service, this::runJob); - logger.info("Started Scheduled Job '{}#{}' took {}", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod(), - Duration.ofNanos(System.nanoTime() - started)); + logger.info("Started Scheduled Job '{}#{}' started in {}", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod(), + Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } }); } @@ -63,15 +63,15 @@ private void runJob() { public final Mono release() { return Mono.fromRunnable(() -> { if (this.started.compareAndSet(true, false)) { - logger.debug("Stopping Scheduled Job '{}#{}'...", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod()); + logger.debug("Scheduled Job '{}#{}' stopping...", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod()); final long started = System.nanoTime(); var f = this.scheduledFuture; this.scheduledFuture = null; f.cancel(true); - logger.info("Stopped Scheduled Job '{}#{}' took {}", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod(), - Duration.ofNanos(System.nanoTime() - started)); + logger.info("Scheduled Job '{}#{}' stopped in {}", telemetry.jobClass().getCanonicalName(), telemetry.jobMethod(), + Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); } }); } diff --git a/scheduling/scheduling-quartz/src/main/java/ru/tinkoff/kora/scheduling/quartz/KoraQuartzJobRegistrar.java b/scheduling/scheduling-quartz/src/main/java/ru/tinkoff/kora/scheduling/quartz/KoraQuartzJobRegistrar.java index 6ceea13e1..85a76b562 100644 --- a/scheduling/scheduling-quartz/src/main/java/ru/tinkoff/kora/scheduling/quartz/KoraQuartzJobRegistrar.java +++ b/scheduling/scheduling-quartz/src/main/java/ru/tinkoff/kora/scheduling/quartz/KoraQuartzJobRegistrar.java @@ -28,7 +28,7 @@ public final Mono init() { .map(q -> q.getClass().getCanonicalName()) .toList(); - logger.debug("Starting Quartz Jobs {}...", quartzJobsNames); + logger.debug("Quartz Jobs {} starting...", quartzJobsNames); final long started = System.nanoTime(); for (var koraQuartzJob : this.quartzJobList) { @@ -47,7 +47,7 @@ public final Mono init() { this.scheduler.scheduleJob(job, koraQuartzJob.getTrigger()); } - logger.info("Started Quartz Jobs {} took {}", quartzJobsNames, Duration.ofNanos(System.nanoTime() - started)); + logger.info("Quartz Jobs {} started in {}", quartzJobsNames, Duration.ofNanos(System.nanoTime() - started).toString().substring(2).toLowerCase()); return null; }); } diff --git a/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/SimpleViolation.java b/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/SimpleViolation.java index 7c368fa9d..a01964fdb 100644 --- a/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/SimpleViolation.java +++ b/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/SimpleViolation.java @@ -1,7 +1,12 @@ package ru.tinkoff.kora.validation.common; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + record SimpleViolation(String message, ValidationContext.Path path) implements Violation { + static final Logger logger = LoggerFactory.getLogger(Validator.class); + @Override public String toString() { return "Path=" + path + ", Message=" + message; diff --git a/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/ValidationContext.java b/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/ValidationContext.java index aa28d7abe..e915abe1c 100644 --- a/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/ValidationContext.java +++ b/validation/validation-common/src/main/java/ru/tinkoff/kora/validation/common/ValidationContext.java @@ -37,6 +37,7 @@ default ValidationContext addPath(int pathIndex) { */ @Nonnull default Violation violates(@Nonnull String message) { + SimpleViolation.logger.debug("Validation violation on path '{}' with error: {}", path(), message); return new SimpleViolation(message, path()); }