From ae729732c4b3597b3773e0d343a29c8303f16c74 Mon Sep 17 00:00:00 2001 From: fqtab <160629711+fqtab@users.noreply.github.com> Date: Fri, 15 Mar 2024 14:20:26 -0400 Subject: [PATCH] Deduplicate before committing and emit a log warning (#212) Co-authored-by: Farooq Qaiser --- kafka-connect/build.gradle | 2 + .../iceberg/connect/channel/CommitState.java | 7 +- .../iceberg/connect/channel/Coordinator.java | 16 +- .../iceberg/connect/channel/Deduplicated.java | 264 +++++++++++++ .../connect/channel/CoordinatorTest.java | 138 +++++-- .../connect/channel/DeduplicatedTest.java | 346 ++++++++++++++++++ .../connect/channel/MemoryAppender.java | 37 ++ 7 files changed, 778 insertions(+), 32 deletions(-) create mode 100644 kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java create mode 100644 kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java create mode 100644 kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/MemoryAppender.java diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 2d684ef5..4f6337c4 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -15,6 +15,8 @@ dependencies { testImplementation libs.mockito testImplementation libs.assertj + + testImplementation 'ch.qos.logback:logback-classic:1.5.3' } configurations { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java index a9a29db2..d027846e 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java @@ -49,14 +49,17 @@ public void addResponse(Envelope envelope) { commitBuffer.add(envelope); if (!isCommitInProgress()) { LOG.warn( - "Received commit response when no commit in progress, this can happen during recovery"); + "Received commit response with commit-id={} when no commit in progress, this can happen during recovery", + ((CommitResponsePayload) envelope.event().payload()).commitId()); } } public void addReady(Envelope envelope) { readyBuffer.add((CommitReadyPayload) envelope.event().payload()); if (!isCommitInProgress()) { - LOG.warn("Received commit ready when no commit in progress, this can happen during recovery"); + LOG.warn( + "Received commit ready for commit-id={} when no commit in progress, this can happen during recovery", + ((CommitReadyPayload) envelope.event().payload()).commitId()); } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 65652914..519ad0af 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -25,7 +25,6 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.events.CommitCompletePayload; import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; import io.tabular.iceberg.connect.events.CommitTablePayload; import io.tabular.iceberg.connect.events.Event; import io.tabular.iceberg.connect.events.EventType; @@ -98,6 +97,7 @@ public void process() { EventType.COMMIT_REQUEST, new CommitRequestPayload(commitState.currentCommitId())); send(event); + LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString()); } consumeAvailable(POLL_DURATION); @@ -187,27 +187,25 @@ private void commitToTable( Map committedOffsets = lastCommittedOffsetsForTable(table, branch.orElse(null)); - List payloads = + List filteredEnvelopeList = envelopeList.stream() .filter( envelope -> { Long minOffset = committedOffsets.get(envelope.partition()); return minOffset == null || envelope.offset() >= minOffset; }) - .map(envelope -> (CommitResponsePayload) envelope.event().payload()) .collect(toList()); List dataFiles = - payloads.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> payload.dataFiles().stream()) + Deduplicated.dataFiles(commitState.currentCommitId(), tableIdentifier, filteredEnvelopeList) + .stream() .filter(dataFile -> dataFile.recordCount() > 0) .collect(toList()); List deleteFiles = - payloads.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> payload.deleteFiles().stream()) + Deduplicated.deleteFiles( + commitState.currentCommitId(), tableIdentifier, filteredEnvelopeList) + .stream() .filter(deleteFile -> deleteFile.recordCount() > 0) .collect(toList()); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java new file mode 100644 index 00000000..67f46f79 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static java.util.stream.Collectors.toList; + +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class both de-duplicates a batch of envelopes and adds logging to help disambiguate between + * different ways that duplicates could manifest. Duplicates could occur in the following three + * general ways: + * + *
    + *
  • same file appears in 2 equivalent envelopes e.g. if the Coordinator read the same message + * twice from Kafka + *
      + *
    • In this case, you should see a log message similar to "Deduplicated 2 data files with + * the same path=data.parquet for table=db.tbl during + * commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following + * events=[2x(SimpleEnvelope{...})]", + *
    + *
  • same file appears in 2 different envelopes e.g. if a Worker sent the same message twice to + * Kafka + *
      + *
    • In this case, you should see a log message similar to "Deduplicated 2 data files with + * the same path=data.parquet for table=db.tbl during + * commit-id=cf602430-0f4d-41d8-a3e9-171848d89832 from the following + * events=[1x(SimpleEnvelope{...}), 1x(SimpleEnvelope{...})]", + *
    + *
  • same file appears in a single envelope twice e.g. if a Worker included the same file twice + * in a single message in Kafka + *
      + *
    • In this case, you should see a log message similar to "Deduplicated 2 data files with + * the same path=data.parquet in the same event=SimpleEnvelope{...} for table=db.tbl + * during commit-id=cf602430-0f4d-41d8-a3e9-171848d89832" + *
    + *
+ */ +class Deduplicated { + private static final Logger LOG = LoggerFactory.getLogger(Deduplicated.class); + + private Deduplicated() {} + + /** + * Returns the deduplicated data files from the batch of envelopes. Does not guarantee anything + * about the ordering of the files that are returned. + */ + public static List dataFiles( + UUID currentCommitId, TableIdentifier tableIdentifier, List envelopes) { + return deduplicatedFiles( + currentCommitId, + tableIdentifier, + envelopes, + "data", + CommitResponsePayload::dataFiles, + dataFile -> dataFile.path().toString()); + } + + /** + * Returns the deduplicated delete files from the batch of envelopes. Does not guarantee anything + * about the ordering of the files that are returned. + */ + public static List deleteFiles( + UUID currentCommitId, TableIdentifier tableIdentifier, List envelopes) { + return deduplicatedFiles( + currentCommitId, + tableIdentifier, + envelopes, + "delete", + CommitResponsePayload::deleteFiles, + deleteFile -> deleteFile.path().toString()); + } + + private static List deduplicatedFiles( + UUID currentCommitId, + TableIdentifier tableIdentifier, + List envelopes, + String fileType, + Function> extractFilesFromPayload, + Function extractPathFromFile) { + List> filesAndEnvelopes = + envelopes.stream() + .flatMap( + envelope -> { + CommitResponsePayload payload = + (CommitResponsePayload) envelope.event().payload(); + List files = extractFilesFromPayload.apply(payload); + if (files == null) { + return Stream.empty(); + } else { + SimpleEnvelope simpleEnvelope = new SimpleEnvelope(envelope); + return deduplicate( + files, + extractPathFromFile, + (path, duplicateFiles) -> + duplicateFilesInSameEventMessage( + path, + duplicateFiles, + fileType, + simpleEnvelope, + tableIdentifier, + currentCommitId)) + .stream() + .map(file -> Pair.of(file, simpleEnvelope)); + } + }) + .collect(toList()); + + List> result = + deduplicate( + filesAndEnvelopes, + fileAndEnvelope -> extractPathFromFile.apply(fileAndEnvelope.first()), + (path, duplicateFilesAndEnvelopes) -> + duplicateFilesAcrossMultipleEventsMessage( + path, duplicateFilesAndEnvelopes, fileType, tableIdentifier, currentCommitId)); + + return result.stream().map(Pair::first).collect(toList()); + } + + private static List deduplicate( + List elements, + Function keyExtractor, + BiFunction, String> logMessageFn) { + return elements.stream() + .collect(Collectors.groupingBy(keyExtractor, Collectors.toList())) + .entrySet() + .stream() + .flatMap( + entry -> { + String key = entry.getKey(); + List values = entry.getValue(); + if (values.size() > 1) { + LOG.warn(logMessageFn.apply(key, values)); + } + return Stream.of(values.get(0)); + }) + .collect(toList()); + } + + private static String duplicateFilesInSameEventMessage( + String path, + List duplicateFiles, + String fileType, + SimpleEnvelope envelope, + TableIdentifier tableIdentifier, + UUID currentCommitId) { + return String.format( + "Deduplicated %d %s files with the same path=%s in the same event=%s for table=%s during commit-id=%s", + duplicateFiles.size(), fileType, path, envelope, tableIdentifier, currentCommitId); + } + + private static String duplicateFilesAcrossMultipleEventsMessage( + String path, + List> duplicateFilesAndEnvelopes, + String fileType, + TableIdentifier tableIdentifier, + UUID currentCommitId) { + return String.format( + "Deduplicated %d %s files with the same path=%s for table=%s during commit-id=%s from the following events=%s", + duplicateFilesAndEnvelopes.size(), + fileType, + path, + tableIdentifier, + currentCommitId, + duplicateFilesAndEnvelopes.stream() + .map(Pair::second) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet() + .stream() + .map(e -> String.format("%dx(%s)", e.getValue(), e.getKey())) + .collect(toList())); + } + + private static class SimpleEnvelope { + + private final int partition; + private final long offset; + private final UUID eventId; + private final String eventGroupId; + private final Long eventTimestamp; + private final UUID payloadCommitId; + + SimpleEnvelope(Envelope envelope) { + partition = envelope.partition(); + offset = envelope.offset(); + eventId = envelope.event().id(); + eventGroupId = envelope.event().groupId(); + eventTimestamp = envelope.event().timestamp(); + payloadCommitId = ((CommitResponsePayload) envelope.event().payload()).commitId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SimpleEnvelope that = (SimpleEnvelope) o; + return partition == that.partition + && offset == that.offset + && Objects.equals(eventId, that.eventId) + && Objects.equals(eventGroupId, that.eventGroupId) + && Objects.equals(eventTimestamp, that.eventTimestamp) + && Objects.equals(payloadCommitId, that.payloadCommitId); + } + + @Override + public int hashCode() { + return Objects.hash( + partition, offset, eventId, eventGroupId, eventTimestamp, payloadCommitId); + } + + @Override + public String toString() { + return "SimpleEnvelope{" + + "partition=" + + partition + + ", offset=" + + offset + + ", eventId=" + + eventId + + ", eventGroupId='" + + eventGroupId + + '\'' + + ", eventTimestamp=" + + eventTimestamp + + ", payloadCommitId=" + + payloadCommitId + + '}'; + } + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index cc59b686..8074c265 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataOperations; @@ -137,6 +138,90 @@ public void testCommitError() { Assertions.assertEquals(0, snapshots.size()); } + @Test + public void testShouldDeduplicateDataFilesBeforeAppending() { + long ts = System.currentTimeMillis(); + DataFile dataFile = EventTestUtil.createDataFile(); + + UUID commitId = + coordinatorTest( + currentCommitId -> { + Event commitResponse = + new Event( + config.controlGroupId(), + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + StructType.of(), + currentCommitId, + new TableName(ImmutableList.of("db"), "tbl"), + ImmutableList.of(dataFile, dataFile), // duplicated data files + ImmutableList.of())); + + return ImmutableList.of( + commitResponse, + commitResponse, // duplicate commit response + new Event( + config.controlGroupId(), + EventType.COMMIT_READY, + new CommitReadyPayload( + currentCommitId, + ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); + }); + + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(1, snapshots.size()); + + Snapshot snapshot = snapshots.get(0); + Assertions.assertEquals(DataOperations.APPEND, snapshot.operation()); + Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size()); + Assertions.assertEquals(0, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); + } + + @Test + public void testShouldDeduplicateDeleteFilesBeforeAppending() { + long ts = System.currentTimeMillis(); + DeleteFile deleteFile = EventTestUtil.createDeleteFile(); + + UUID commitId = + coordinatorTest( + currentCommitId -> { + Event duplicateCommitResponse = + new Event( + config.controlGroupId(), + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + StructType.of(), + currentCommitId, + new TableName(ImmutableList.of("db"), "tbl"), + ImmutableList.of(), + ImmutableList.of(deleteFile, deleteFile))); // duplicate delete files + + return ImmutableList.of( + duplicateCommitResponse, + duplicateCommitResponse, // duplicate commit response + new Event( + config.controlGroupId(), + EventType.COMMIT_READY, + new CommitReadyPayload( + currentCommitId, + ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); + }); + + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + + List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(1, snapshots.size()); + + Snapshot snapshot = snapshots.get(0); + Assertions.assertEquals(DataOperations.OVERWRITE, snapshot.operation()); + Assertions.assertEquals(0, ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size()); + Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); + } + private void assertCommitTable(int idx, UUID commitId, long ts) { byte[] bytes = producer.history().get(idx).value(); Event commitTable = Event.decode(bytes); @@ -158,6 +243,32 @@ private void assertCommitComplete(int idx, UUID commitId, long ts) { } private UUID coordinatorTest(List dataFiles, List deleteFiles, long ts) { + return coordinatorTest( + currentCommitId -> { + Event commitResponse = + new Event( + config.controlGroupId(), + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + StructType.of(), + currentCommitId, + new TableName(ImmutableList.of("db"), "tbl"), + dataFiles, + deleteFiles)); + + Event commitReady = + new Event( + config.controlGroupId(), + EventType.COMMIT_READY, + new CommitReadyPayload( + currentCommitId, + ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); + + return ImmutableList.of(commitResponse, commitReady); + }); + } + + private UUID coordinatorTest(Function> eventsFn) { when(config.commitIntervalMs()).thenReturn(0); when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE); @@ -178,27 +289,12 @@ private UUID coordinatorTest(List dataFiles, List deleteFi UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); - Event commitResponse = - new Event( - config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( - StructType.of(), - commitId, - new TableName(ImmutableList.of("db"), "tbl"), - dataFiles, - deleteFiles)); - bytes = Event.encode(commitResponse); - consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); - - Event commitReady = - new Event( - config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( - commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); - bytes = Event.encode(commitReady); - consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key", bytes)); + int currentOffset = 1; + for (Event event : eventsFn.apply(commitId)) { + bytes = Event.encode(event); + consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, currentOffset, "key", bytes)); + currentOffset += 1; + } when(config.commitIntervalMs()).thenReturn(0); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java new file mode 100644 index 00000000..24685b04 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static io.tabular.iceberg.connect.events.EventType.COMMIT_RESPONSE; +import static org.assertj.core.api.Assertions.assertThat; + +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import io.tabular.iceberg.connect.events.Event; +import io.tabular.iceberg.connect.events.TableName; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +class DeduplicatedTest { + + protected MemoryAppender deduplicatedMemoryAppender; + + private static final UUID CURRENT_COMMIT_ID = + UUID.fromString("cf602430-0f4d-41d8-a3e9-171848d89832"); + private static final UUID PAYLOAD_COMMIT_ID = + UUID.fromString("4142add7-7c92-4bbe-b864-21ce8ac4bf53"); + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("db", "tbl"); + private static final TableName TABLE_NAME = TableName.of(TABLE_IDENTIFIER); + private static final String GROUP_ID = "some-group"; + private static final DataFile DATA_FILE_1 = createDataFile("1"); + private static final DataFile DATA_FILE_2 = createDataFile("2"); + private static final DeleteFile DELETE_FILE_1 = createDeleteFile("1"); + private static final DeleteFile DELETE_FILE_2 = createDeleteFile("2"); + + @BeforeEach + public void before() { + deduplicatedMemoryAppender = new MemoryAppender(); + deduplicatedMemoryAppender.setContext( + (ch.qos.logback.classic.LoggerContext) LoggerFactory.getILoggerFactory()); + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Deduplicated.class)) + .addAppender(deduplicatedMemoryAppender); + deduplicatedMemoryAppender.start(); + } + + @AfterEach + public void after() { + deduplicatedMemoryAppender.stop(); + } + + public static DataFile createDataFile(String fileSuffix) { + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("data-" + fileSuffix + ".parquet") + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(100L) + .withRecordCount(5) + .build(); + } + + public static DeleteFile createDeleteFile(String fileSuffix) { + return FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes(1) + .withPath("delete-" + fileSuffix + ".parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + } + + private void assertExpectedFiles( + List batch, Set expectedDatafiles, Set expectedDeleteFiles) { + List actualDataFiles = + Deduplicated.dataFiles(CURRENT_COMMIT_ID, TABLE_IDENTIFIER, batch); + List actualDeleteFiles = + Deduplicated.deleteFiles(CURRENT_COMMIT_ID, TABLE_IDENTIFIER, batch); + + Assertions.assertEquals(expectedDatafiles, Sets.newHashSet(actualDataFiles)); + Assertions.assertEquals(expectedDeleteFiles, Sets.newHashSet(actualDeleteFiles)); + } + + private void assertNoWarnOrHigherLogs() { + assertThat(deduplicatedMemoryAppender.getWarnOrHigher()) + .as("Expected 0 log messages") + .hasSize(0); + } + + private void assertWarnOrHigherLogsSize(int expectedSize) { + assertThat(deduplicatedMemoryAppender.getWarnOrHigher()).hasSize(expectedSize); + } + + private void assertWarnOrHigherLogsContainsEntryMatching(String expectedMessagesFmt) { + Assertions.assertTrue( + deduplicatedMemoryAppender.getWarnOrHigher().stream() + .anyMatch(x -> x.matches(expectedMessagesFmt))); + } + + private Event commitResponseEvent(List dataFiles, List deleteFiles) { + return new Event( + GROUP_ID, + COMMIT_RESPONSE, + new CommitResponsePayload( + Types.StructType.of(), PAYLOAD_COMMIT_ID, TABLE_NAME, dataFiles, deleteFiles)); + } + + private String detectedDuplicateFileAcrossMultipleEvents( + int numFiles, String fileType, ContentFile contentFile) { + String simpleEnvelopePattern = + "[0-9]+x\\(SimpleEnvelope\\{partition=[0-9]+, offset=[0-9]+, eventId=.*, eventGroupId='.*', eventTimestamp=[0-9]+, payloadCommitId=.*\\}\\)"; + return String.format( + "^Deduplicated %d %s files with the same path=%s for table=%s during commit-id=%s from the following events=\\[(%s)+]$", + numFiles, + fileType, + contentFile.path(), + TABLE_IDENTIFIER, + CURRENT_COMMIT_ID, + simpleEnvelopePattern); + } + + private String detectedDuplicateFileInSameEvent( + int numFiles, String fileType, ContentFile contentFile, int partition, long offset) { + return String.format( + "^Deduplicated %d %s files with the same path=%s in the same event=SimpleEnvelope\\{partition=%d, offset=%d, eventId=.*, eventGroupId='%s', eventTimestamp=[0-9]+, payloadCommitId=%s\\} for table=%s during commit-id=%s$", + numFiles, + fileType, + contentFile.path(), + partition, + offset, + GROUP_ID, + PAYLOAD_COMMIT_ID, + TABLE_IDENTIFIER, + CURRENT_COMMIT_ID); + } + + @Test + public void testNullFilesShouldReturnEmptyFiles() { + Event event = commitResponseEvent(null, null); + Envelope envelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(envelope); + + assertExpectedFiles(batch, ImmutableSet.of(), ImmutableSet.of()); + assertNoWarnOrHigherLogs(); + } + + @Test + public void testShouldReturnEmptyFiles() { + Event event = commitResponseEvent(ImmutableList.of(), ImmutableList.of()); + Envelope envelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(envelope); + + assertExpectedFiles(batch, ImmutableSet.of(), ImmutableSet.of()); + assertNoWarnOrHigherLogs(); + } + + @Test + public void testShouldReturnNonDuplicatedFile() { + Event event = + commitResponseEvent(ImmutableList.of(DATA_FILE_1), ImmutableList.of(DELETE_FILE_1)); + Envelope envelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(envelope); + + assertExpectedFiles(batch, ImmutableSet.of(DATA_FILE_1), ImmutableSet.of(DELETE_FILE_1)); + assertNoWarnOrHigherLogs(); + } + + @Test + public void testShouldReturnNonDuplicatedFiles() { + Event event = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2)); + Envelope envelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(envelope); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + assertNoWarnOrHigherLogs(); + } + + @Test + public void testShouldReturnNonDuplicatedFilesFromMultipleEvents() { + Event event1 = + commitResponseEvent(ImmutableList.of(DATA_FILE_1), ImmutableList.of(DELETE_FILE_1)); + Event event2 = + commitResponseEvent(ImmutableList.of(DATA_FILE_2), ImmutableList.of(DELETE_FILE_2)); + + List batch = + ImmutableList.of(new Envelope(event1, 0, 100), new Envelope(event2, 0, 101)); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + assertNoWarnOrHigherLogs(); + } + + @Test + public void testShouldDeduplicateEnvelopes() { + Event event = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2)); + Envelope duplicatedEnvelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(duplicatedEnvelope, duplicatedEnvelope); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + + assertWarnOrHigherLogsSize(4); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "data", DATA_FILE_1)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "data", DATA_FILE_2)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "delete", DELETE_FILE_1)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "delete", DELETE_FILE_2)); + } + + @Test + public void testShouldDeduplicateFilesInsidePayloads() { + Event event = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2, DATA_FILE_1), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2, DELETE_FILE_1)); + Envelope envelope = new Envelope(event, 0, 100); + + List batch = ImmutableList.of(envelope); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + + assertWarnOrHigherLogsSize(2); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileInSameEvent(2, "data", DATA_FILE_1, 0, 100)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileInSameEvent(2, "delete", DELETE_FILE_1, 0, 100)); + } + + @Test + public void testShouldDeduplicateFilesAcrossPayloads() { + Event event1 = + commitResponseEvent(ImmutableList.of(DATA_FILE_1), ImmutableList.of(DELETE_FILE_1)); + Event event2 = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2)); + + List batch = + ImmutableList.of(new Envelope(event1, 0, 100), new Envelope(event2, 0, 101)); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + + assertWarnOrHigherLogsSize(2); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "data", DATA_FILE_1)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "delete", DELETE_FILE_1)); + } + + @Test + public void testShouldHandleComplexCase() { + Event event1 = + commitResponseEvent(ImmutableList.of(DATA_FILE_1), ImmutableList.of(DELETE_FILE_1)); + Event event2 = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2)); + Event event3 = + commitResponseEvent( + ImmutableList.of(DATA_FILE_1, DATA_FILE_2, DATA_FILE_2), + ImmutableList.of(DELETE_FILE_1, DELETE_FILE_2, DELETE_FILE_2)); + + List batch = + ImmutableList.of( + new Envelope(event1, 0, 100), + new Envelope(event2, 0, 101), + new Envelope(event1, 0, 100), + new Envelope(event3, 0, 102)); + + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + + assertWarnOrHigherLogsSize(6); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(4, "data", DATA_FILE_1)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "data", DATA_FILE_2)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(4, "delete", DELETE_FILE_1)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileAcrossMultipleEvents(2, "delete", DELETE_FILE_2)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileInSameEvent(2, "data", DATA_FILE_2, 0, 102)); + assertWarnOrHigherLogsContainsEntryMatching( + detectedDuplicateFileInSameEvent(2, "data", DATA_FILE_2, 0, 102)); + + // call a second time to make sure there are no mutability bugs + assertExpectedFiles( + batch, + ImmutableSet.of(DATA_FILE_1, DATA_FILE_2), + ImmutableSet.of(DELETE_FILE_1, DELETE_FILE_2)); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/MemoryAppender.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/MemoryAppender.java new file mode 100644 index 00000000..291856e3 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/MemoryAppender.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; + +class MemoryAppender + extends ch.qos.logback.core.read.ListAppender { + public List getLoggedEvents() { + return ImmutableList.copyOf(this.list); + } + + public List getWarnOrHigher() { + return getLoggedEvents().stream() + .filter(e -> e.getLevel().isGreaterOrEqual(ch.qos.logback.classic.Level.WARN)) + .map(e -> e.getMessage()) + .collect(Collectors.toList()); + } +}