From 690e62e0c40480856df4b9ba1250eecb81851c18 Mon Sep 17 00:00:00 2001 From: Mathew Fournier <160646114+tabmatfournier@users.noreply.github.com> Date: Thu, 25 Apr 2024 18:54:45 -0700 Subject: [PATCH] move-noisy-debug-logs-to-debug (#242) - these logs are the equivilent of "HERE" and account for ~75% of the logs but are generally not useful. Moving them to debug --- .../java/io/tabular/iceberg/connect/channel/Channel.java | 4 ++-- .../io/tabular/iceberg/connect/channel/CommitState.java | 8 ++++---- .../io/tabular/iceberg/connect/channel/Coordinator.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java index 3559ab62..6a51676c 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java @@ -88,7 +88,7 @@ protected void send( events.stream() .map( event -> { - LOG.info("Sending event of type: {}", event.type().name()); + LOG.debug("Sending event of type: {}", event.type().name()); byte[] data = Event.encode(event); // key by producer ID to keep event order return new ProducerRecord<>(controlTopic, producerId, data); @@ -129,7 +129,7 @@ record -> { if (event.groupId().equals(groupId)) { LOG.debug("Received event of type: {}", event.type().name()); if (receiveFn.apply(new Envelope(event, record.partition(), record.offset()))) { - LOG.info("Handled event of type: {}", event.type().name()); + LOG.debug("Handled event of type: {}", event.type().name()); } } }); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java index d027846e..fb1191eb 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java @@ -48,7 +48,7 @@ public CommitState(IcebergSinkConfig config) { public void addResponse(Envelope envelope) { commitBuffer.add(envelope); if (!isCommitInProgress()) { - LOG.warn( + LOG.debug( "Received commit response with commit-id={} when no commit in progress, this can happen during recovery", ((CommitResponsePayload) envelope.event().payload()).commitId()); } @@ -57,7 +57,7 @@ public void addResponse(Envelope envelope) { public void addReady(Envelope envelope) { readyBuffer.add((CommitReadyPayload) envelope.event().payload()); if (!isCommitInProgress()) { - LOG.warn( + LOG.debug( "Received commit ready for commit-id={} when no commit in progress, this can happen during recovery", ((CommitReadyPayload) envelope.event().payload()).commitId()); } @@ -118,14 +118,14 @@ public boolean isCommitReady(int expectedPartitionCount) { .sum(); if (receivedPartitionCount >= expectedPartitionCount) { - LOG.info( + LOG.debug( "Commit {} ready, received responses for all {} partitions", currentCommitId, receivedPartitionCount); return true; } - LOG.info( + LOG.debug( "Commit {} not ready, received responses for {} of {} partitions, waiting for more", currentCommitId, receivedPartitionCount, diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 6fb6e2c3..6d8b8fb5 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -103,7 +103,7 @@ public void process() { EventType.COMMIT_REQUEST, new CommitRequestPayload(commitState.currentCommitId())); send(event); - LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString()); + LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString()); } consumeAvailable(POLL_DURATION, this::receive);