diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java index 805e37f2..6e1e8319 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/OffsetManager.java @@ -53,21 +53,8 @@ public class OffsetManager> { * the context for this instance to use. */ public OffsetManager(final SourceTaskContext context) { - this(context, new ConcurrentHashMap<>()); - } - - /** - * Package private for testing. - * - * @param context - * the context for this instance to use. - * @param offsets - * the offsets - */ - protected OffsetManager(final SourceTaskContext context, - final ConcurrentMap, Map> offsets) { this.context = context; - this.offsets = offsets; + this.offsets = new ConcurrentHashMap<>(); } /** diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 8c62d570..aec5e7a7 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -248,7 +248,7 @@ static List consumeByteMessages(final String topic, final int expectedMe String bootstrapServers) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class, ByteArrayDeserializer.class); - final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(90), + final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(120), consumerProperties); return objects.stream().map(String::new).collect(Collectors.toList()); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index ed5057c9..10089d24 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -183,7 +183,6 @@ void bytesTest(final boolean addPrefix) { addPrefix, localS3Prefix, prefixPattern, fileNamePatternSeparator); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); - connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final String testData1 = "Hello, Kafka Connect S3 Source! object 1"; final String testData2 = "Hello, Kafka Connect S3 Source! object 2"; @@ -193,10 +192,13 @@ void bytesTest(final boolean addPrefix) { // write 5 objects to s3 offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "0", localS3Prefix)); offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00000", localS3Prefix)); - offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "1", localS3Prefix)); - offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00001", localS3Prefix)); + offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00001", localS3Prefix)); + offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "1", localS3Prefix)); offsetKeys.add(writeToS3(topic, new byte[0], "3")); + // Start the Connector + connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); + assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll messages from the Kafka topic and verify the consumed data final List records = IntegrationBase.consumeByteMessages(topic, 4, connectRunner.getBootstrapServers()); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 6fd037b1..53e7baa7 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -71,11 +71,11 @@ public void setValueData(final SchemaAndValue valueData) { } public String getTopic() { - return context.getTopic().isPresent() ? context.getTopic().get() : null; + return context.getTopic().orElse(null); } public Integer getPartition() { - return context.getPartition().isPresent() ? context.getPartition().get() : null; + return context.getPartition().orElse(null); } public String getObjectKey() {