Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a simple back pressure mechanism #333

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static ConfigDef configDef() {
addKafkaBackoffPolicy(configDef);
addAzureRetryPolicies(configDef);
addUserAgentConfig(configDef);
addOverloadConfigGroup(configDef);
return configDef;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public final class AzureBlobSinkTask extends SinkTask {
private BlobContainerClient containerClient;
private final Map<String, BlockBlobClient> blobClientMap = new ConcurrentHashMap<>();

private long backPressureHardLimit;
private long backPressureCurrentBuffer;

// required by Connect
public AzureBlobSinkTask() {
super();
Expand Down Expand Up @@ -104,6 +107,7 @@ public void start(final Map<String, String> props) {
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
context.timeout(config.getKafkaRetryBackoffMs());
}
backPressureHardLimit = config.getBackPressureHardLimit();
}

private void initRecordGrouper() {
Expand All @@ -122,6 +126,11 @@ public void put(final Collection<SinkRecord> records) {
for (final SinkRecord record : records) {
recordGrouper.put(record);
}
backPressureCurrentBuffer += records.size();
if (backPressureCurrentBuffer >= backPressureHardLimit) {
LOG.warn("Back pressure limit reached, requesting flush");
context.requestCommit();
}
}

@Override
Expand All @@ -131,6 +140,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
} finally {
recordGrouper.clear();
}
backPressureCurrentBuffer = 0;
}

private void flushFile(final String filename, final List<SinkRecord> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class AivenCommonConfig extends AbstractConfig {
private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy";
public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";

private static final String GROUP_OVERLOAD = "Overload Control";
private static final String OVERLOAD_MAX_RECORDS_HARD_LIMIT = "overload.hard.record.limit";

protected AivenCommonConfig(final ConfigDef definition, final Map<?, ?> originals) {
super(definition, originals);
// TODO: calls getOutputFields, can be overridden in subclasses.
Expand All @@ -74,23 +77,23 @@ private void validate() {
protected static void addKafkaBackoffPolicy(final ConfigDef configDef) {
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() {

final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24);

@Override
public void ensureValid(final String name, final Object value) {
if (Objects.isNull(value)) {
return;
}
assert value instanceof Long;
final var longValue = (Long) value;
if (longValue < 0) {
throw new ConfigException(name, value, "Value must be at least 0");
} else if (longValue > maximumBackoffPolicy) {
throw new ConfigException(name, value,
"Value must be no more than " + maximumBackoffPolicy + " (24 hours)");
}
}
}, ConfigDef.Importance.MEDIUM,
final long maximumBackoffPolicy = TimeUnit.HOURS.toMillis(24);

@Override
public void ensureValid(final String name, final Object value) {
if (Objects.isNull(value)) {
return;
}
assert value instanceof Long;
final var longValue = (Long) value;
if (longValue < 0) {
throw new ConfigException(name, value, "Value must be at least 0");
} else if (longValue > maximumBackoffPolicy) {
throw new ConfigException(name, value,
"Value must be no more than " + maximumBackoffPolicy + " (24 hours)");
}
}
}, ConfigDef.Importance.MEDIUM,
"The retry backoff in milliseconds. "
+ "This config is used to notify Kafka Connect to retry delivering a message batch or "
+ "performing recovery in case of transient exceptions. Maximum value is "
Expand All @@ -103,7 +106,7 @@ public Long getKafkaRetryBackoffMs() {
}

protected static void addOutputFieldsFormatConfigGroup(final ConfigDef configDef,
final OutputFieldType defaultFieldType) {
final OutputFieldType defaultFieldType) {
int formatGroupCounter = 0;

addFormatTypeConfig(configDef, formatGroupCounter);
Expand Down Expand Up @@ -145,7 +148,7 @@ public FormatType getFormatType() {
}

protected static void addCompressionTypeConfig(final ConfigDef configDef,
final CompressionType defaultCompressionType) {
final CompressionType defaultCompressionType) {
configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
Objects.isNull(defaultCompressionType) ? null : defaultCompressionType.name, // NOPMD NullAssignment
new FileCompressionTypeValidator(), ConfigDef.Importance.MEDIUM,
Expand Down Expand Up @@ -229,4 +232,16 @@ private Boolean isKeyBased(final String groupType) {
return RecordGrouperFactory.KEY_RECORD.equals(groupType)
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType);
}

public long getBackPressureHardLimit() {
return getLong(OVERLOAD_MAX_RECORDS_HARD_LIMIT);
}

protected static void addOverloadConfigGroup(final ConfigDef configDef) {
int groupCounter = 0;

configDef.define(OVERLOAD_MAX_RECORDS_HARD_LIMIT, ConfigDef.Type.LONG, 1000000L, ConfigDef.Importance.MEDIUM,
"The maximum number of records to buffer before requesting a flush.", GROUP_OVERLOAD, groupCounter++,
ConfigDef.Width.NONE, OVERLOAD_MAX_RECORDS_HARD_LIMIT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static ConfigDef configDef() {
addKafkaBackoffPolicy(configDef);
addGcsRetryPolicies(configDef);
addUserAgentConfig(configDef);
addOverloadConfigGroup(configDef);
return configDef;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public final class GcsSinkTask extends SinkTask {
private GcsSinkConfig config;

private Storage storage;
private long backPressureHardLimit;
private long backPressureCurrentBuffer;

// required by Connect
public GcsSinkTask() {
Expand Down Expand Up @@ -88,6 +90,7 @@ public void start(final Map<String, String> props) {
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
context.timeout(config.getKafkaRetryBackoffMs());
}
backPressureHardLimit = config.getBackPressureHardLimit();
}

private void initRest() {
Expand All @@ -106,6 +109,11 @@ public void put(final Collection<SinkRecord> records) {
for (final SinkRecord record : records) {
recordGrouper.put(record);
}
backPressureCurrentBuffer += records.size();
if (backPressureCurrentBuffer >= backPressureHardLimit) {
LOG.warn("Back pressure limit reached, requesting flush");
context.requestCommit();
}
}

@Override
Expand All @@ -115,6 +123,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
} finally {
recordGrouper.clear();
}
backPressureCurrentBuffer = 0;
}

private void flushFile(final String filename, final List<SinkRecord> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public final class S3SinkTask extends SinkTask {

AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

private long backPressureHardLimit;
private long backPressureCurrentBuffer;

@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
public S3SinkTask() {
super();
Expand All @@ -84,6 +87,7 @@ public void start(final Map<String, String> props) {
if (Objects.nonNull(config.getKafkaRetryBackoffMs())) {
context.timeout(config.getKafkaRetryBackoffMs());
}
backPressureHardLimit = config.getBackPressureHardLimit();
}

private AmazonS3 createAmazonS3Client(final S3SinkConfig config) {
Expand All @@ -110,6 +114,11 @@ public void put(final Collection<SinkRecord> records) {
Objects.requireNonNull(records, "records cannot be null");
LOGGER.info("Processing {} records", records.size());
records.forEach(recordGrouper::put);
backPressureCurrentBuffer += records.size();
if (backPressureCurrentBuffer >= backPressureHardLimit) {
LOGGER.warn("Back pressure limit reached, requesting flush");
context.requestCommit();
}
}

@Override
Expand All @@ -119,6 +128,7 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
} finally {
recordGrouper.clear();
}
backPressureCurrentBuffer = 0;
}

private void flushFile(final String filename, final List<SinkRecord> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public static ConfigDef configDef() {
addDeprecatedConfiguration(configDef);
addKafkaBackoffPolicy(configDef);
addS3RetryPolicies(configDef);
addOverloadConfigGroup(configDef);
return configDef;
}

Expand Down
Loading