From 0af783edc2f6e548f4117e7a38ae0acce6ab629b Mon Sep 17 00:00:00 2001 From: frankgrimes97 Date: Thu, 20 Oct 2022 22:45:59 -0400 Subject: [PATCH 1/2] Avoid late records preemptively rotating/committing S3 output files When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410 When late data is interleaved with up-to-date data arriving the problem is exacerbated. When this happens, a quick succession of rotations cause a large number of small files to be committed to S3. This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments. This PR adds a new `max.open.files.per.partition` S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior. If set to a value > 1, the following behavior is enabled: - A separate commit file is kept open for each encodedPartition target up to a maximum of `max.open.files.per.partition` - Only when any of the encodedPartition targets hits its rotation condition (`flush.size`, `rotate.interval.ms`) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs. It's worth noting that this issue/limitation was previously encountered and is well-described as part of: "CC-2313 Handle late arriving records in storage cloud sink connectors" https://github.com/confluentinc/kafka-connect-storage-cloud/pull/187 However, that feature was subsequently reverted: https://github.com/confluentinc/kafka-connect-storage-cloud/commit/a2ce6fc34478f3377192e8aa2d98e01db0dbf951 https://github.com/confluentinc/kafka-connect-storage-common/pull/87 N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8 --- .../connect/s3/S3SinkConnectorConfig.java | 16 ++ .../connect/s3/TopicPartitionWriter.java | 170 ++++++++++++++---- .../connect/s3/TopicPartitionWriterTest.java | 105 +++++++++++ 3 files changed, 254 insertions(+), 37 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 97a9541e7..211dfb2d5 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -190,6 +190,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { public static final String ELASTIC_BUFFER_INIT_CAPACITY = "s3.elastic.buffer.init.capacity"; public static final int ELASTIC_BUFFER_INIT_CAPACITY_DEFAULT = 128 * 1024; // 128KB + public static final String MAX_OPEN_FILES_PER_PARTITION_CONFIG = "max.open.files.per.partition"; + public static final int MAX_OPEN_FILES_PER_PARTITION_DEFAULT = 1; + private final String name; private final Map propertyToConfig = new HashMap<>(); @@ -717,6 +720,19 @@ public static ConfigDef newConfigDef() { "Elastic buffer initial capacity" ); + configDef.define( + MAX_OPEN_FILES_PER_PARTITION_CONFIG, + Type.INT, + MAX_OPEN_FILES_PER_PARTITION_DEFAULT, + atLeast(1), + Importance.LOW, + "Max open files per partition.", + group, + ++orderInGroup, + Width.LONG, + "Max open files per partition" + ); + } return configDef; } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index b22e8a1f8..5c61c56d3 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -56,6 +56,9 @@ import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PART_RETRIES_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_RETRY_BACKOFF_CONFIG; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; public class TopicPartitionWriter { private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class); @@ -73,20 +76,23 @@ public class TopicPartitionWriter { private final SinkTaskContext context; private final boolean isTaggingEnabled; private final boolean ignoreTaggingErrors; - private int recordCount; + private long recordCount; private final int flushSize; private final long rotateIntervalMs; private final long rotateScheduleIntervalMs; private long nextScheduledRotation; private long currentOffset; + private final int maxOpenFilesPerPartition; private Long currentTimestamp; private String currentEncodedPartition; - private Long baseRecordTimestamp; + private final Map currentTimestamps; + private final Set openEncodedPartitions; + private final Map baseRecordTimestamps; private Long offsetToCommit; private final RecordWriterProvider writerProvider; private final Map startOffsets; private final Map endOffsets; - private final Map recordCounts; + private final Map encodedPartitionRecordCounts; private long timeoutMs; private long failureTime; private final StorageSchemaCompatibility compatibility; @@ -156,13 +162,19 @@ public TopicPartitionWriter(TopicPartition tp, compatibility = StorageSchemaCompatibility.getCompatibility( connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)); + maxOpenFilesPerPartition = connectorConfig.getInt( + S3SinkConnectorConfig.MAX_OPEN_FILES_PER_PARTITION_CONFIG); + buffer = new LinkedList<>(); commitFiles = new HashMap<>(); writers = new HashMap<>(); currentSchemas = new HashMap<>(); + currentTimestamps = new HashMap<>(); + openEncodedPartitions = new HashSet<>(); + baseRecordTimestamps = new HashMap<>(); startOffsets = new HashMap<>(); endOffsets = new HashMap<>(); - recordCounts = new HashMap<>(); + encodedPartitionRecordCounts = new HashMap<>(); state = State.WRITE_STARTED; failureTime = -1L; currentOffset = -1L; @@ -226,13 +238,6 @@ private void executeState(long now) { // fallthrough case WRITE_PARTITION_PAUSED: SinkRecord record = buffer.peek(); - if (timestampExtractor != null) { - currentTimestamp = timestampExtractor.extract(record, now); - if (baseRecordTimestamp == null) { - baseRecordTimestamp = currentTimestamp; - } - } - Schema valueSchema = record.valueSchema(); String encodedPartition; try { encodedPartition = partitioner.encodePartition(record, now); @@ -245,6 +250,17 @@ private void executeState(long now) { throw e; } } + + if (timestampExtractor != null) { + final long currentRecordTimestamp = timestampExtractor.extract(record, now); + currentTimestamp = currentRecordTimestamp; + currentTimestamps.put(encodedPartition, currentRecordTimestamp); + if (!baseRecordTimestamps.containsKey(encodedPartition)) { + baseRecordTimestamps.put(encodedPartition, currentRecordTimestamp); + } + } + + Schema valueSchema = record.valueSchema(); Schema currentValueSchema = currentSchemas.get(encodedPartition); if (currentValueSchema == null) { currentSchemas.put(encodedPartition, valueSchema); @@ -284,9 +300,14 @@ private boolean checkRotationOrAppend( String encodedPartition, long now ) { + // rotateOnTime is safe to go before writeRecord, because it is acceptable // even for a faulty record to trigger time-based rotation if it applies - if (rotateOnTime(encodedPartition, currentTimestamp, now)) { + if (rotateOnTime( + encodedPartition, + currentTimestamps.getOrDefault(encodedPartition, currentTimestamp), + now)) { + setNextScheduledRotation(); nextState(); return true; @@ -315,11 +336,11 @@ private boolean checkRotationOrAppend( return false; } - if (rotateOnSize()) { + if (rotateOnSize(encodedPartition)) { log.info( - "Starting commit and rotation for topic partition {} with start offset {}", - tp, - startOffsets + "Starting commit and rotation for encodedPartition {} with start offset {}", + encodedPartition, + startOffsets.getOrDefault(encodedPartition, 0L) ); nextState(); return true; @@ -330,18 +351,46 @@ private boolean checkRotationOrAppend( private void commitOnTimeIfNoData(long now) { if (buffer.isEmpty()) { + boolean shouldCommitFiles = false; // committing files after waiting for rotateIntervalMs time but less than flush.size // records available - if (recordCount > 0 && rotateOnTime(currentEncodedPartition, currentTimestamp, now)) { + if (maxOpenFilesPerPartition == 1 + && recordCount > 0 + && rotateOnTime(currentEncodedPartition, currentTimestamp, now)) { + log.info( "Committing files after waiting for rotateIntervalMs time but less than flush.size " + "records available." ); setNextScheduledRotation(); - commitFiles(); + shouldCommitFiles = true; + } else if (maxOpenFilesPerPartition > 1) { + for (String encodedPartition : openEncodedPartitions) { + if (rotateOnTime( + encodedPartition, + currentTimestamps.getOrDefault(encodedPartition, now), + now)) { + + log.info( + "Committing files after waiting for rotateIntervalMs time for encodedPartition " + + "'{}' but less than flush.size records available.", + encodedPartition); + + setNextScheduledRotation(); + + // At least one encodedPartition needs committing, so commit all so that + // the S3SinkTask's preCommit contains no buffered offsets, only + // those writen to S3 + shouldCommitFiles = true; + break; + } + } } + if (shouldCommitFiles) { + commitFiles(); + } resume(); setState(State.WRITE_STARTED); } @@ -398,23 +447,53 @@ private void setState(State state) { this.state = state; } + private static final boolean recordTimestampExceedsRotationInterval( + final Long recordTimestamp, + final Long baseRecordTimestamp, + final long rotateIntervalMs) { + + return recordTimestamp - baseRecordTimestamp >= rotateIntervalMs; + } + + private static final boolean encodedPartitionChangeNecessitatesRotation( + final String encodedPartition, + final String currentEncodedPartition, + final int maxOpenFilesPerPartition) { + + return !encodedPartition.equals(currentEncodedPartition) + && maxOpenFilesPerPartition == 1; + } + private boolean rotateOnTime(String encodedPartition, Long recordTimestamp, long now) { if (recordCount <= 0) { return false; } + + final Long baseRecordTimestamp = baseRecordTimestamps.getOrDefault(encodedPartition, -1L); // rotateIntervalMs > 0 implies timestampExtractor != null - boolean periodicRotation = rotateIntervalMs > 0 - && timestampExtractor != null - && ( - recordTimestamp - baseRecordTimestamp >= rotateIntervalMs - || !encodedPartition.equals(currentEncodedPartition) - ); + final boolean hasTimestampExtractor = rotateIntervalMs > 0 && timestampExtractor != null; + boolean periodicRotation = hasTimestampExtractor + && (recordTimestampExceedsRotationInterval( + recordTimestamp, + baseRecordTimestamp, + rotateIntervalMs) + || encodedPartitionChangeNecessitatesRotation( + encodedPartition, + currentEncodedPartition, + maxOpenFilesPerPartition) + || baseRecordTimestamps.size() > maxOpenFilesPerPartition + ); log.trace( "Checking rotation on time for topic-partition '{}' " + "with recordCount '{}' and encodedPartition '{}'", tp, - recordCount, + maxOpenFilesPerPartition == 1 + ? recordCount + : encodedPartitionRecordCounts.getOrDefault( + encodedPartition, + new LongAdder() + ).longValue(), encodedPartition ); @@ -471,12 +550,25 @@ private void setNextScheduledRotation() { } } - private boolean rotateOnSize() { - boolean messageSizeRotation = recordCount >= flushSize; + private static final long getRecordCount( + final String encodedPartition, + final Map recordCountsPerEncodedPartition) { + + return recordCountsPerEncodedPartition + .getOrDefault(encodedPartition, new LongAdder()) + .longValue(); + } + + private boolean rotateOnSize(final String encodedPartition) { + long encodedPartitionRecordCount = maxOpenFilesPerPartition == 1 ? recordCount : getRecordCount( + encodedPartition, + encodedPartitionRecordCounts + ); + boolean messageSizeRotation = encodedPartitionRecordCount >= flushSize; log.trace("Should apply size-based rotation for topic-partition '{}':" + " (count {} >= flush size {})? {}", tp, - recordCount, + encodedPartitionRecordCount, flushSize, messageSizeRotation ); @@ -580,10 +672,11 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) { } currentEncodedPartition = encodedPartition; + openEncodedPartitions.add(encodedPartition); currentOffset = record.kafkaOffset(); if (shouldRemoveStartOffset) { log.trace( - "Setting writer's start offset for '{}' to {}", + "Setting writer's current offset for '{}' to {}", currentEncodedPartition, currentOffset ); @@ -592,12 +685,15 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) { // value, we know that we have at least one record. This allows us // to initialize all our maps at the same time, and saves future // checks on the existence of keys - recordCounts.put(currentEncodedPartition, 0L); - endOffsets.put(currentEncodedPartition, 0L); + encodedPartitionRecordCounts.remove(currentEncodedPartition); + endOffsets.remove(currentEncodedPartition); } ++recordCount; - recordCounts.put(currentEncodedPartition, recordCounts.get(currentEncodedPartition) + 1); + encodedPartitionRecordCounts.computeIfAbsent( + currentEncodedPartition, + k -> new LongAdder() + ).increment(); endOffsets.put(currentEncodedPartition, currentOffset); return true; } @@ -615,15 +711,15 @@ private void commitFiles() { } startOffsets.remove(encodedPartition); endOffsets.remove(encodedPartition); - recordCounts.remove(encodedPartition); + encodedPartitionRecordCounts.remove(encodedPartition); log.debug("Committed {} for {}", entry.getValue(), tp); + baseRecordTimestamps.remove(encodedPartition); } offsetToCommit = currentOffset + 1; commitFiles.clear(); currentSchemas.clear(); recordCount = 0; - baseRecordTimestamp = null; log.info("Files committed to S3. Target commit offset for {} is {}", tp, offsetToCommit); } @@ -645,8 +741,8 @@ private void commitFile(String encodedPartition) { private void tagFile(String encodedPartition, String s3ObjectPath) { Long startOffset = startOffsets.get(encodedPartition); Long endOffset = endOffsets.get(encodedPartition); - Long recordCount = recordCounts.get(encodedPartition); - if (startOffset == null || endOffset == null || recordCount == null) { + Long recordCount = getRecordCount(encodedPartition, encodedPartitionRecordCounts); + if (startOffset == null || endOffset == null || recordCount == 0L) { log.warn( "Missing tags when attempting to tag file {}. " + "Starting offset tag: {}, " @@ -655,7 +751,7 @@ private void tagFile(String encodedPartition, String s3ObjectPath) { encodedPartition, startOffset == null ? "missing" : startOffset, endOffset == null ? "missing" : endOffset, - recordCount == null ? "missing" : recordCount + recordCount == 0L ? "missing" : recordCount ); return; } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java index 826ecbf10..a1420b8e4 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java @@ -1343,6 +1343,111 @@ public void testRecordHeadersWrittenJson() throws Exception { verifyRecordElement(expectedHeaderFiles, 3, sinkRecords, RecordElement.HEADERS); } + @Test + public void testWriteRecordLateRecordsMaxOpenFilesPerPartitionGreaterThanOne() throws Exception { + // Do not roll on size, only based on time. + localProps.put(FLUSH_SIZE_CONFIG, "1000"); + localProps.put( + S3SinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + String.valueOf(TimeUnit.MINUTES.toMillis(1)) + ); + localProps.put( + S3SinkConnectorConfig.MAX_OPEN_FILES_PER_PARTITION_CONFIG, + String.valueOf(2) + ); + setUp(); + + // Define the partitioner + Partitioner partitioner = new HourlyPartitioner<>(); + parsedConfig.put(S3SinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, TimeUnit.MINUTES.toMillis(1)); + parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); + partitioner.configure(parsedConfig); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null); + + String key = "key"; + Schema schema = createSchema(); + List records = createRecordBatches(schema, 2, 6); + + // hour 10 + DateTime first = new DateTime(2017, 3, 2, 10, 0, DateTimeZone.forID("America/Los_Angeles")); + long timestampFirst = first.getMillis(); + // hour 9 + long timestampEarlier = first.minusHours(1).getMillis(); + + Collection sinkRecords = new ArrayList<>(); + + // first 2 records for hour 10 + sinkRecords.addAll( + createSinkRecordsWithTimestamp( + records.subList(0, 2), + key, + schema, + 0, + timestampFirst, + 1000 + ) + ); + + // next 4 records for hour 9 + sinkRecords.addAll( + createSinkRecordsWithTimestamp( + records.subList(2, 6), + key, + schema, + 2, + timestampEarlier, + 1000 + ) + ); + + // next 8 records for hour 10 + // 2 for previously open S3 file + // 4 for new S3 file due to passing 1 minute rotate interval + sinkRecords.addAll( + Arrays.asList( + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(6), 6, + timestampFirst + 5000, TimestampType.CREATE_TIME), + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(7), 7, + timestampFirst + 15000, TimestampType.CREATE_TIME), + // this one exceeds the 1-minute rotate interval so will cause rotate/commit of all open files + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(8), 8, + timestampFirst + 60000, TimestampType.CREATE_TIME), + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(9), 9, + timestampFirst + 61000, TimestampType.CREATE_TIME), + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(10), 10, + timestampFirst + 62000, TimestampType.CREATE_TIME), + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(11), 11, + timestampFirst + 63000, TimestampType.CREATE_TIME) + ) + ); + + for (SinkRecord record : sinkRecords) { + topicPartitionWriter.buffer(record); + } + + topicPartitionWriter.write(); + topicPartitionWriter.close(); + + String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); + String encodedPartitionEarlier = getTimebasedEncodedPartition(timestampEarlier); + + String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); + List expectedFiles = new ArrayList<>(); + for (int i : new int[]{0, 8}) { + expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, extension, + ZERO_PAD_FMT)); + } + + String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionEarlier); + for (int i : new int[]{2}) { + expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefixLater, TOPIC_PARTITION, i, extension, + ZERO_PAD_FMT)); + } + verify(expectedFiles, 4, schema, records); + } + // Test if a given exception type was reported to the DLQ private void testExceptionReportedToDLQ( SinkRecord faultyRecord, From db365c4c6c7f83c9c36d5a66717c22837127460a Mon Sep 17 00:00:00 2001 From: frankgrimes97 Date: Tue, 15 Nov 2022 20:52:13 -0500 Subject: [PATCH 2/2] Fix bug in rotateOnTime The default baseRecordTimestamp was incorrect leading to unnessary file committing and rotation. Update unit test case accordingly. --- .../connect/s3/TopicPartitionWriter.java | 4 +++- .../connect/s3/TopicPartitionWriterTest.java | 17 ++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index 5c61c56d3..69a036db0 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -469,7 +469,9 @@ private boolean rotateOnTime(String encodedPartition, Long recordTimestamp, long return false; } - final Long baseRecordTimestamp = baseRecordTimestamps.getOrDefault(encodedPartition, -1L); + final Long baseRecordTimestamp = + baseRecordTimestamps.getOrDefault(encodedPartition, currentTimestamp); + // rotateIntervalMs > 0 implies timestampExtractor != null final boolean hasTimestampExtractor = rotateIntervalMs > 0 && timestampExtractor != null; boolean periodicRotation = hasTimestampExtractor diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java index a1420b8e4..73aa28979 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java @@ -1402,24 +1402,19 @@ public void testWriteRecordLateRecordsMaxOpenFilesPerPartitionGreaterThanOne() t ) ); - // next 8 records for hour 10 + // next 3 records for hour 10 // 2 for previously open S3 file - // 4 for new S3 file due to passing 1 minute rotate interval + // 1 new event past rotate.interval.ms to force flush of open files sinkRecords.addAll( Arrays.asList( new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(6), 6, timestampFirst + 5000, TimestampType.CREATE_TIME), new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(7), 7, timestampFirst + 15000, TimestampType.CREATE_TIME), - // this one exceeds the 1-minute rotate interval so will cause rotate/commit of all open files + // this one exceeds the 1-minute rotate interval for hour 10 so will cause rotate/commit of all open files + // however, it will not be flushed/written (still in buffer) new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(8), 8, - timestampFirst + 60000, TimestampType.CREATE_TIME), - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(9), 9, - timestampFirst + 61000, TimestampType.CREATE_TIME), - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(10), 10, - timestampFirst + 62000, TimestampType.CREATE_TIME), - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, records.get(11), 11, - timestampFirst + 63000, TimestampType.CREATE_TIME) + timestampFirst + 60000, TimestampType.CREATE_TIME) ) ); @@ -1435,7 +1430,7 @@ public void testWriteRecordLateRecordsMaxOpenFilesPerPartitionGreaterThanOne() t String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); List expectedFiles = new ArrayList<>(); - for (int i : new int[]{0, 8}) { + for (int i : new int[]{0}) { expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, extension, ZERO_PAD_FMT)); }