Skip to content

Commit

Permalink
Introduce safe offset record header (#140)
Browse files Browse the repository at this point in the history
What and why?
For every record consumed from li-apache-kafka-client consumer, the record will be annotated with a safe offset header such that systems like brooklin aren't forced to use LiKafkaConsumer interface (which has #safeOffsets() api) but instead can just work with Consumer interface and the headers.
  • Loading branch information
viswamy authored Sep 6, 2019
1 parent 00cf733 commit 313dbc9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.kafka.clients.largemessage.errors.SkippableException;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -438,6 +439,13 @@ private ConsumerRecord<K, V> handleConsumerRecord(ConsumerRecord<byte[], byte[]>
headers.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue));
}

// Introduce a safe offset header if and only if the safe offset is not the same as the high watermark
// Note: Safe offset is the last consumed record offset + 1
Long safeOffset = safeOffset(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
if (safeOffset != null && safeOffset != consumerRecord.offset() + 1) {
headers.add(Constants.SAFE_OFFSET_HEADER, PrimitiveEncoderDecoder.encodeLong(safeOffset));
}

handledRecord = new ConsumerRecord<>(
consumerRecord.topic(),
consumerRecord.partition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class Constants {
// The variables reserved by kafka for auditing purpose
public static final String TIMESTAMP_HEADER = "_t";
public static final String LARGE_MESSAGE_HEADER = "_lm";
public static final String SAFE_OFFSET_HEADER = "_so";

/**
* Avoid instantiating the constants class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import com.linkedin.kafka.clients.largemessage.errors.OffsetNotTrackedException;
import com.linkedin.kafka.clients.largemessage.errors.SkippableException;
import com.linkedin.kafka.clients.producer.UUIDFactory;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -84,10 +86,25 @@ public void testCorrectness() {
ConsumerRecords<String, String> processedRecords = consumerRecordsProcessor.process(getConsumerRecords()).consumerRecords();
assertEquals(processedRecords.count(), 4, "There should be 4 records");
Iterator<ConsumerRecord<String, String>> iter = processedRecords.iterator();
assertEquals(iter.next().offset(), 0, "Message offset should b 0");
assertEquals(iter.next().offset(), 2, "Message offset should b 2");
assertEquals(iter.next().offset(), 4, "Message offset should b 4");
assertEquals(iter.next().offset(), 5, "Message offset should b 5");
ConsumerRecord<String, String> record = iter.next();
assertEquals(record.offset(), 0, "Message offset should b 0");
assertNull(record.headers().lastHeader(Constants.SAFE_OFFSET_HEADER));

record = iter.next();
assertEquals(record.offset(), 2, "Message offset should b 2");
assertEquals(
PrimitiveEncoderDecoder.decodeLong(record.headers().lastHeader(Constants.SAFE_OFFSET_HEADER).value(), 0),
1L
);
record = iter.next();
assertEquals(record.offset(), 4, "Message offset should b 4");
assertEquals(
PrimitiveEncoderDecoder.decodeLong(record.headers().lastHeader(Constants.SAFE_OFFSET_HEADER).value(), 0),
1L
);
record = iter.next();
assertEquals(record.offset(), 5, "Message offset should b 5");
assertNull(record.headers().lastHeader(Constants.SAFE_OFFSET_HEADER));
}

@Test
Expand Down

0 comments on commit 313dbc9

Please sign in to comment.