diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java index 7ba4f878..908218e5 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java @@ -116,12 +116,12 @@ private Payload convertPayload(io.tabular.iceberg.connect.events.Payload payload pay.commitId(), TableReference.of(catalogName, pay.tableName().toIdentifier()), pay.snapshotId(), - OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); + pay.vtts() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); } else if (payload instanceof CommitCompletePayload) { CommitCompletePayload pay = (CommitCompletePayload) payload; return new CommitComplete( pay.commitId(), - OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); + pay.vtts() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); } else { throw new IllegalStateException( String.format("Unknown event payload: %s", payload.getSchema())); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java index 3d1ecab9..54e157de 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java @@ -239,6 +239,30 @@ public void testCommitTableBecomesCommitToTable() { assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); } + @Test + public void testCommitTableBecomesCommitToTableNullVtts() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + EventType.COMMIT_TABLE, + new CommitTablePayload( + commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, null)); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + assertThat(event.groupId()).isEqualTo("cg-connector"); + assertThat(result.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE); + assertThat(result.payload()).isInstanceOf(CommitToTable.class); + CommitToTable payload = (CommitToTable) result.payload(); + + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.snapshotId()).isEqualTo(1L); + assertThat(payload.validThroughTs()).isNull(); + assertThat(payload.tableReference().catalog()).isEqualTo(catalogName); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); + } + @Test public void testCommitCompleteBecomesCommitCompleteSerialization() { io.tabular.iceberg.connect.events.Event event = @@ -255,4 +279,20 @@ public void testCommitCompleteBecomesCommitCompleteSerialization() { assertThat(payload.validThroughTs()) .isEqualTo(OffsetDateTime.ofInstant(Instant.ofEpochMilli(2L), ZoneOffset.UTC)); } + + @Test + public void testCommitCompleteBecomesCommitCompleteSerializationNullVtts() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, null)); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + assertThat(result.type()).isEqualTo(PayloadType.COMMIT_COMPLETE); + assertThat(result.payload()).isInstanceOf(CommitComplete.class); + CommitComplete payload = (CommitComplete) result.payload(); + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.validThroughTs()).isNull(); + } }