Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid late records preemptively rotating/committing S3 output files #574

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
public static final String ELASTIC_BUFFER_INIT_CAPACITY = "s3.elastic.buffer.init.capacity";
public static final int ELASTIC_BUFFER_INIT_CAPACITY_DEFAULT = 128 * 1024; // 128KB

public static final String MAX_OPEN_FILES_PER_PARTITION_CONFIG = "max.open.files.per.partition";
public static final int MAX_OPEN_FILES_PER_PARTITION_DEFAULT = 1;

private final String name;

private final Map<String, ComposableConfig> propertyToConfig = new HashMap<>();
Expand Down Expand Up @@ -717,6 +720,19 @@ public static ConfigDef newConfigDef() {
"Elastic buffer initial capacity"
);

configDef.define(
MAX_OPEN_FILES_PER_PARTITION_CONFIG,
Type.INT,
MAX_OPEN_FILES_PER_PARTITION_DEFAULT,
atLeast(1),
Importance.LOW,
"Max open files per partition.",
group,
++orderInGroup,
Width.LONG,
"Max open files per partition"
);

}
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@

import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PART_RETRIES_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_RETRY_BACKOFF_CONFIG;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;

public class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
Expand All @@ -73,20 +76,23 @@ public class TopicPartitionWriter {
private final SinkTaskContext context;
private final boolean isTaggingEnabled;
private final boolean ignoreTaggingErrors;
private int recordCount;
private long recordCount;
private final int flushSize;
private final long rotateIntervalMs;
private final long rotateScheduleIntervalMs;
private long nextScheduledRotation;
private long currentOffset;
private final int maxOpenFilesPerPartition;
private Long currentTimestamp;
private String currentEncodedPartition;
private Long baseRecordTimestamp;
private final Map<String, Long> currentTimestamps;
private final Set<String> openEncodedPartitions;
private final Map<String, Long> baseRecordTimestamps;
private Long offsetToCommit;
private final RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
private final Map<String, Long> startOffsets;
private final Map<String, Long> endOffsets;
private final Map<String, Long> recordCounts;
private final Map<String, LongAdder> encodedPartitionRecordCounts;
private long timeoutMs;
private long failureTime;
private final StorageSchemaCompatibility compatibility;
Expand Down Expand Up @@ -156,13 +162,19 @@ public TopicPartitionWriter(TopicPartition tp,
compatibility = StorageSchemaCompatibility.getCompatibility(
connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG));

maxOpenFilesPerPartition = connectorConfig.getInt(
S3SinkConnectorConfig.MAX_OPEN_FILES_PER_PARTITION_CONFIG);

buffer = new LinkedList<>();
commitFiles = new HashMap<>();
writers = new HashMap<>();
currentSchemas = new HashMap<>();
currentTimestamps = new HashMap<>();
openEncodedPartitions = new HashSet<>();
baseRecordTimestamps = new HashMap<>();
startOffsets = new HashMap<>();
endOffsets = new HashMap<>();
recordCounts = new HashMap<>();
encodedPartitionRecordCounts = new HashMap<>();
state = State.WRITE_STARTED;
failureTime = -1L;
currentOffset = -1L;
Expand Down Expand Up @@ -226,13 +238,6 @@ private void executeState(long now) {
// fallthrough
case WRITE_PARTITION_PAUSED:
SinkRecord record = buffer.peek();
if (timestampExtractor != null) {
currentTimestamp = timestampExtractor.extract(record, now);
if (baseRecordTimestamp == null) {
baseRecordTimestamp = currentTimestamp;
}
}
Schema valueSchema = record.valueSchema();
String encodedPartition;
try {
encodedPartition = partitioner.encodePartition(record, now);
Expand All @@ -245,6 +250,17 @@ private void executeState(long now) {
throw e;
}
}

if (timestampExtractor != null) {
final long currentRecordTimestamp = timestampExtractor.extract(record, now);
currentTimestamp = currentRecordTimestamp;
currentTimestamps.put(encodedPartition, currentRecordTimestamp);
if (!baseRecordTimestamps.containsKey(encodedPartition)) {
baseRecordTimestamps.put(encodedPartition, currentRecordTimestamp);
}
}

Schema valueSchema = record.valueSchema();
Schema currentValueSchema = currentSchemas.get(encodedPartition);
if (currentValueSchema == null) {
currentSchemas.put(encodedPartition, valueSchema);
Expand Down Expand Up @@ -284,9 +300,14 @@ private boolean checkRotationOrAppend(
String encodedPartition,
long now
) {

// rotateOnTime is safe to go before writeRecord, because it is acceptable
// even for a faulty record to trigger time-based rotation if it applies
if (rotateOnTime(encodedPartition, currentTimestamp, now)) {
if (rotateOnTime(
encodedPartition,
currentTimestamps.getOrDefault(encodedPartition, currentTimestamp),
now)) {

setNextScheduledRotation();
nextState();
return true;
Expand Down Expand Up @@ -315,11 +336,11 @@ private boolean checkRotationOrAppend(
return false;
}

if (rotateOnSize()) {
if (rotateOnSize(encodedPartition)) {
log.info(
"Starting commit and rotation for topic partition {} with start offset {}",
tp,
startOffsets
"Starting commit and rotation for encodedPartition {} with start offset {}",
encodedPartition,
startOffsets.getOrDefault(encodedPartition, 0L)
);
nextState();
return true;
Expand All @@ -330,18 +351,46 @@ private boolean checkRotationOrAppend(

private void commitOnTimeIfNoData(long now) {
if (buffer.isEmpty()) {
boolean shouldCommitFiles = false;
// committing files after waiting for rotateIntervalMs time but less than flush.size
// records available
if (recordCount > 0 && rotateOnTime(currentEncodedPartition, currentTimestamp, now)) {
if (maxOpenFilesPerPartition == 1
&& recordCount > 0
&& rotateOnTime(currentEncodedPartition, currentTimestamp, now)) {

log.info(
"Committing files after waiting for rotateIntervalMs time but less than flush.size "
+ "records available."
);
setNextScheduledRotation();

commitFiles();
shouldCommitFiles = true;
} else if (maxOpenFilesPerPartition > 1) {
for (String encodedPartition : openEncodedPartitions) {
if (rotateOnTime(
encodedPartition,
currentTimestamps.getOrDefault(encodedPartition, now),
now)) {

log.info(
"Committing files after waiting for rotateIntervalMs time for encodedPartition "
+ "'{}' but less than flush.size records available.",
encodedPartition);

setNextScheduledRotation();

// At least one encodedPartition needs committing, so commit all so that
// the S3SinkTask's preCommit contains no buffered offsets, only
// those writen to S3
shouldCommitFiles = true;
break;
}
}
}

if (shouldCommitFiles) {
commitFiles();
}
resume();
setState(State.WRITE_STARTED);
}
Expand Down Expand Up @@ -398,23 +447,55 @@ private void setState(State state) {
this.state = state;
}

private static final boolean recordTimestampExceedsRotationInterval(
final Long recordTimestamp,
final Long baseRecordTimestamp,
final long rotateIntervalMs) {

return recordTimestamp - baseRecordTimestamp >= rotateIntervalMs;
}

private static final boolean encodedPartitionChangeNecessitatesRotation(
final String encodedPartition,
final String currentEncodedPartition,
final int maxOpenFilesPerPartition) {

return !encodedPartition.equals(currentEncodedPartition)
&& maxOpenFilesPerPartition == 1;
}

private boolean rotateOnTime(String encodedPartition, Long recordTimestamp, long now) {
if (recordCount <= 0) {
return false;
}

final Long baseRecordTimestamp =
baseRecordTimestamps.getOrDefault(encodedPartition, currentTimestamp);

// rotateIntervalMs > 0 implies timestampExtractor != null
boolean periodicRotation = rotateIntervalMs > 0
&& timestampExtractor != null
&& (
recordTimestamp - baseRecordTimestamp >= rotateIntervalMs
|| !encodedPartition.equals(currentEncodedPartition)
);
final boolean hasTimestampExtractor = rotateIntervalMs > 0 && timestampExtractor != null;
boolean periodicRotation = hasTimestampExtractor
&& (recordTimestampExceedsRotationInterval(
recordTimestamp,
baseRecordTimestamp,
rotateIntervalMs)
|| encodedPartitionChangeNecessitatesRotation(
encodedPartition,
currentEncodedPartition,
maxOpenFilesPerPartition)
|| baseRecordTimestamps.size() > maxOpenFilesPerPartition
);

log.trace(
"Checking rotation on time for topic-partition '{}' "
+ "with recordCount '{}' and encodedPartition '{}'",
tp,
recordCount,
maxOpenFilesPerPartition == 1
? recordCount
: encodedPartitionRecordCounts.getOrDefault(
encodedPartition,
new LongAdder()
).longValue(),
encodedPartition
);

Expand Down Expand Up @@ -471,12 +552,25 @@ private void setNextScheduledRotation() {
}
}

private boolean rotateOnSize() {
boolean messageSizeRotation = recordCount >= flushSize;
private static final long getRecordCount(
final String encodedPartition,
final Map<String, LongAdder> recordCountsPerEncodedPartition) {

return recordCountsPerEncodedPartition
.getOrDefault(encodedPartition, new LongAdder())
.longValue();
}

private boolean rotateOnSize(final String encodedPartition) {
long encodedPartitionRecordCount = maxOpenFilesPerPartition == 1 ? recordCount : getRecordCount(
encodedPartition,
encodedPartitionRecordCounts
);
boolean messageSizeRotation = encodedPartitionRecordCount >= flushSize;
log.trace("Should apply size-based rotation for topic-partition '{}':"
+ " (count {} >= flush size {})? {}",
tp,
recordCount,
encodedPartitionRecordCount,
flushSize,
messageSizeRotation
);
Expand Down Expand Up @@ -580,10 +674,11 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) {
}

currentEncodedPartition = encodedPartition;
openEncodedPartitions.add(encodedPartition);
currentOffset = record.kafkaOffset();
if (shouldRemoveStartOffset) {
log.trace(
"Setting writer's start offset for '{}' to {}",
"Setting writer's current offset for '{}' to {}",
currentEncodedPartition,
currentOffset
);
Expand All @@ -592,12 +687,15 @@ private boolean writeRecord(SinkRecord record, String encodedPartition) {
// value, we know that we have at least one record. This allows us
// to initialize all our maps at the same time, and saves future
// checks on the existence of keys
recordCounts.put(currentEncodedPartition, 0L);
endOffsets.put(currentEncodedPartition, 0L);
encodedPartitionRecordCounts.remove(currentEncodedPartition);
endOffsets.remove(currentEncodedPartition);
}
++recordCount;

recordCounts.put(currentEncodedPartition, recordCounts.get(currentEncodedPartition) + 1);
encodedPartitionRecordCounts.computeIfAbsent(
currentEncodedPartition,
k -> new LongAdder()
).increment();
endOffsets.put(currentEncodedPartition, currentOffset);
return true;
}
Expand All @@ -615,15 +713,15 @@ private void commitFiles() {
}
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
recordCounts.remove(encodedPartition);
encodedPartitionRecordCounts.remove(encodedPartition);
log.debug("Committed {} for {}", entry.getValue(), tp);
baseRecordTimestamps.remove(encodedPartition);
}

offsetToCommit = currentOffset + 1;
commitFiles.clear();
currentSchemas.clear();
recordCount = 0;
baseRecordTimestamp = null;
log.info("Files committed to S3. Target commit offset for {} is {}", tp, offsetToCommit);
}

Expand All @@ -645,8 +743,8 @@ private void commitFile(String encodedPartition) {
private void tagFile(String encodedPartition, String s3ObjectPath) {
Long startOffset = startOffsets.get(encodedPartition);
Long endOffset = endOffsets.get(encodedPartition);
Long recordCount = recordCounts.get(encodedPartition);
if (startOffset == null || endOffset == null || recordCount == null) {
Long recordCount = getRecordCount(encodedPartition, encodedPartitionRecordCounts);
if (startOffset == null || endOffset == null || recordCount == 0L) {
log.warn(
"Missing tags when attempting to tag file {}. "
+ "Starting offset tag: {}, "
Expand All @@ -655,7 +753,7 @@ private void tagFile(String encodedPartition, String s3ObjectPath) {
encodedPartition,
startOffset == null ? "missing" : startOffset,
endOffset == null ? "missing" : endOffset,
recordCount == null ? "missing" : recordCount
recordCount == 0L ? "missing" : recordCount
);
return;
}
Expand Down
Loading