Skip to content

Commit

Permalink
added a timestamp unit to partitioner config
Browse files Browse the repository at this point in the history
  • Loading branch information
Priyank Bagrecha committed Mar 14, 2020
1 parent b3dce89 commit 0636363
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi
public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp";
public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor";

public static final String TIMESTAMP_UNIT_NAME_CONFIG = "timestamp.unit";
public static final String TIMESTAMP_UNIT_NAME_DOC =
"The time unit to be used for timestamp by the timestamp extractor.";
public static final String TIMESTAMP_UNIT_NAME_DEFAULT = "ms";
public static final String TIMESTAMP_UNIT_NAME_DISPLAY =
"Time Unit of specified Record Field for Timestamp Extractor";

/**
* Create a new configuration definition.
*
Expand Down Expand Up @@ -208,6 +215,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom
++orderInGroup,
Width.LONG,
TIMESTAMP_FIELD_NAME_DISPLAY);

configDef.define(TIMESTAMP_UNIT_NAME_CONFIG,
Type.STRING,
TIMESTAMP_UNIT_NAME_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_UNIT_NAME_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_UNIT_NAME_DISPLAY);
}

return configDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.storage.common.SchemaGenerator;
import io.confluent.connect.storage.common.StorageCommonConfig;
Expand Down Expand Up @@ -266,15 +267,32 @@ public Long extract(ConnectRecord<?> record) {
public static class RecordFieldTimestampExtractor implements TimestampExtractor {
private String fieldName;
private DateTimeFormatter dateTime;
private TimeUnit timeUnit;

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
dateTime = ISODateTimeFormat.dateTimeParser();
timeUnit = getTimeUnit(config);
}

private TimeUnit getTimeUnit(Map<String, Object> config) {
String timestampTimeUnit = (String) config.get(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG);

if ("s".equalsIgnoreCase(timestampTimeUnit)) {
return TimeUnit.SECONDS;
}

return TimeUnit.MILLISECONDS;
}

@Override
public Long extract(ConnectRecord<?> record) {
Long timestamp = extractTimestamp(record);
return timeUnit == TimeUnit.SECONDS ? timestamp * 1000 : timestamp;
}

private Long extractTimestamp(ConnectRecord<?> record) {
Object value = record.value();
if (value instanceof Struct) {
Struct struct = (Struct) value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.connect.storage.partitioner;

import io.confluent.connect.storage.StorageSinkTestBase;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.errors.PartitionException;
import io.confluent.connect.storage.util.DateTimeUtils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -487,6 +491,17 @@ public void testNumericRecordFieldTimeMap() {
ts = DATE_TIME.getMillis();
testMapNumericTimestampPartitionEncoding(
partitioner, timeField, ts, Schema.INT64_SCHEMA, DATE_TIME);

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

testMapNumericTimestampPartitionEncoding(
secondsPartitioner, timeField, ((long) ts)/1000, Schema.INT64_SCHEMA, DATE_TIME);
}

@Test
Expand All @@ -498,6 +513,14 @@ public void testRecordFieldTimeDateExtractor() {
assertThat(partitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

DateTime moment = new DateTime(2015, 4, 2, 1, 0, 0, 0, DateTimeZone.forID(TIME_ZONE));
String expectedPartition = "year=2015/month=4/day=2/hour=1";

Expand All @@ -506,6 +529,11 @@ public void testRecordFieldTimeDateExtractor() {
String encodedPartition = partitioner.encodePartition(sinkRecord);
assertEquals(expectedPartition, encodedPartition);

long rawTimestampSeconds = moment.getMillis()/1000;
sinkRecord = createSinkRecord(rawTimestampSeconds);
encodedPartition = secondsPartitioner.encodePartition(sinkRecord);
assertEquals(expectedPartition, encodedPartition);

String timestamp = ISODateTimeFormat.dateTimeNoMillis().print(moment);
sinkRecord = createSinkRecord(Schema.STRING_SCHEMA, timestamp);
encodedPartition = partitioner.encodePartition(sinkRecord);
Expand All @@ -525,6 +553,11 @@ public void testRecordFieldTimeDateExtractor() {
encodedPartition = partitioner.encodePartition(sinkRecord);
assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition);

int shortTimestampSeconds = shortTimestamp/1000;
sinkRecord = createSinkRecord(Schema.INT32_SCHEMA, shortTimestampSeconds);
encodedPartition = secondsPartitioner.encodePartition(sinkRecord);
assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition);

// Struct - Date extraction
sinkRecord = createSinkRecord(rawTimestamp);
String structEncodedPartition = partitioner.encodePartition(sinkRecord);
Expand All @@ -539,8 +572,9 @@ public void testRecordFieldTimeDateExtractor() {

@Test
public void testNestedRecordFieldTimeExtractor() throws Exception {
String timeField = "nested.timestamp";
TimeBasedPartitioner<String> partitioner = configurePartitioner(
new TimeBasedPartitioner<>(), "nested.timestamp", null);
new TimeBasedPartitioner<>(), timeField, null);

assertThat(partitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));
Expand All @@ -551,6 +585,20 @@ public void testNestedRecordFieldTimeExtractor() throws Exception {
String encodedPartition = partitioner.encodePartition(sinkRecord);

validateEncodedPartition(encodedPartition);

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

sinkRecord = createSinkRecordWithNestedTimestampField(timestamp/1000);

encodedPartition = secondsPartitioner.encodePartition(sinkRecord);

validateEncodedPartition(encodedPartition);
}

@Test
Expand Down

0 comments on commit 0636363

Please sign in to comment.