Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Iceberg's timestamp with nano-precision data type #10238

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timeType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.uuidType;

import java.util.Iterator;
Expand Down Expand Up @@ -98,7 +100,9 @@ public static Stream<IcebergType> icebergTypes(IntSupplier idSupplier) {
decimalType(10, 3),
fixedType(42),
timestampType(),
timestamptzType());
timestampTzType(),
timestampNanosType(),
timestampNanosTzType());
}

public static IcebergSchema icebergSchemaAllTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.TYPE_MAP;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.TYPE_STRUCT;
import static org.projectnessie.catalog.model.id.NessieId.transientNessieId;
import static org.projectnessie.catalog.model.schema.types.NessieType.DEFAULT_TIME_PRECISION;
import static org.projectnessie.catalog.model.schema.types.NessieType.MICROS_TIME_PRECISION;
import static org.projectnessie.catalog.model.schema.types.NessieType.NANOS_TIME_PRECISION;
import static org.projectnessie.catalog.model.snapshot.NessieViewRepresentation.NessieViewSQLRepresentation.nessieViewSQLRepresentation;
import static org.projectnessie.catalog.model.snapshot.TableFormat.ICEBERG;
import static org.projectnessie.catalog.model.statistics.NessiePartitionStatisticsFile.partitionStatisticsFile;
Expand Down Expand Up @@ -316,18 +317,25 @@ public static IcebergType nessieTypeToIcebergType(NessieTypeSpec type) {
return IcebergType.dateType();
case TIME:
NessieTimeTypeSpec time = (NessieTimeTypeSpec) type;
if (time.precision() != DEFAULT_TIME_PRECISION || time.withTimeZone()) {
if (time.precision() != MICROS_TIME_PRECISION || time.withTimeZone()) {
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
}
return IcebergType.timeType();
case TIMESTAMP:
NessieTimestampTypeSpec timestamp = (NessieTimestampTypeSpec) type;
if (timestamp.precision() != DEFAULT_TIME_PRECISION) {
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
switch (timestamp.precision()) {
case MICROS_TIME_PRECISION:
return timestamp.withTimeZone()
? IcebergType.timestampTzType()
: IcebergType.timestampType();
case NANOS_TIME_PRECISION:
return timestamp.withTimeZone()
? IcebergType.timestampNanosTzType()
: IcebergType.timestampNanosType();
default:
break;
}
return timestamp.withTimeZone()
? IcebergType.timestamptzType()
: IcebergType.timestampType();
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
case BINARY:
return IcebergType.binaryType();
case DECIMAL:
Expand Down Expand Up @@ -378,9 +386,13 @@ static NessieTypeSpec icebergTypeToNessieType(
IcebergType type, Map<Integer, NessieField> icebergFields) {
switch (type.type()) {
case IcebergType.TYPE_TIMESTAMP_TZ:
return NessieType.timestampType(true);
return NessieType.timestampType(MICROS_TIME_PRECISION, true);
case IcebergType.TYPE_TIMESTAMP:
return NessieType.timestampType(false);
return NessieType.timestampType(MICROS_TIME_PRECISION, false);
case IcebergType.TYPE_TIMESTAMP_NS_TZ:
return NessieType.timestampType(NANOS_TIME_PRECISION, true);
case IcebergType.TYPE_TIMESTAMP_NS:
return NessieType.timestampType(NANOS_TIME_PRECISION, false);
case IcebergType.TYPE_BOOLEAN:
return NessieType.booleanType();
case IcebergType.TYPE_UUID:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.formats.iceberg.types;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

/** Iceberg timestamp, nanosecond precision. */
public final class IcebergTimestampNanosType extends IcebergPrimitiveType {
private static final Schema TIMESTAMP_NS_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema TIMESTAMPTZ_NS_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";

static {
TIMESTAMP_NS_SCHEMA.addProp(ADJUST_TO_UTC_PROP, false);
TIMESTAMPTZ_NS_SCHEMA.addProp(ADJUST_TO_UTC_PROP, true);
}

private final boolean adjustToUTC;

IcebergTimestampNanosType(boolean adjustToUTC) {
this.adjustToUTC = adjustToUTC;
}

public boolean adjustToUTC() {
return adjustToUTC;
}

@Override
public String type() {
return adjustToUTC() ? TYPE_TIMESTAMP_NS_TZ : TYPE_TIMESTAMP_NS;
}

@Override
public Schema avroSchema(int fieldId) {
return adjustToUTC() ? TIMESTAMPTZ_NS_SCHEMA : TIMESTAMP_NS_SCHEMA;
}

@Override
public byte[] serializeSingleValue(Object value) {
return IcebergLongType.serializeLong((Long) value);
}

@Override
public Object deserializeSingleValue(byte[] value) {
return IcebergLongType.deserializeLong(value);
}

@Override
public int hashCode() {
return type().hashCode() ^ (adjustToUTC ? 1 : 0);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof IcebergTimestampNanosType)) {
return false;
}
if (obj == this) {
return true;
}
IcebergTimestampNanosType o = (IcebergTimestampNanosType) obj;
return o.adjustToUTC == adjustToUTC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

/** Iceberg timestamp, microsecond precision. */
public final class IcebergTimestampType extends IcebergPrimitiveType {
private static final Schema TIMESTAMP_SCHEMA =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface IcebergType {
String TYPE_BINARY = "binary";
String TYPE_TIMESTAMP = "timestamp";
String TYPE_TIMESTAMP_TZ = "timestamptz";
String TYPE_TIMESTAMP_NS = "timestamp_ns";
String TYPE_TIMESTAMP_NS_TZ = "timestamptz_ns";
String TYPE_FIXED = "fixed";
String TYPE_DECIMAL = "decimal";
String TYPE_STRUCT = "struct";
Expand Down Expand Up @@ -89,14 +91,22 @@ static IcebergBinaryType binaryType() {
return IcebergTypes.BINARY;
}

static IcebergTimestampType timestamptzType() {
static IcebergTimestampType timestampTzType() {
return IcebergTypes.TIMESTAMPTZ;
}

static IcebergTimestampType timestampType() {
return IcebergTypes.TIMESTAMP;
}

static IcebergTimestampNanosType timestampNanosTzType() {
return IcebergTypes.TIMESTAMPTZ_NS;
}

static IcebergTimestampNanosType timestampNanosType() {
return IcebergTypes.TIMESTAMP_NS;
}

static IcebergFixedType fixedType(int length) {
return ImmutableIcebergFixedType.of(length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,41 @@ final class IcebergTypes {
static final IcebergBinaryType BINARY = new IcebergBinaryType();
static final IcebergTimestampType TIMESTAMPTZ = new IcebergTimestampType(true);
static final IcebergTimestampType TIMESTAMP = new IcebergTimestampType(false);
static final IcebergTimestampNanosType TIMESTAMPTZ_NS = new IcebergTimestampNanosType(true);
static final IcebergTimestampNanosType TIMESTAMP_NS = new IcebergTimestampNanosType(false);

private IcebergTypes() {}

static IcebergPrimitiveType primitiveFromString(String primitiveType) {
switch (primitiveType) {
case IcebergBooleanType.TYPE_BOOLEAN:
case IcebergType.TYPE_BOOLEAN:
return IcebergType.booleanType();
case IcebergUuidType.TYPE_UUID:
case IcebergType.TYPE_UUID:
return IcebergType.uuidType();
case IcebergIntegerType.TYPE_INT:
case IcebergType.TYPE_INT:
return IcebergType.integerType();
case IcebergLongType.TYPE_LONG:
case IcebergType.TYPE_LONG:
return IcebergType.longType();
case IcebergFloatType.TYPE_FLOAT:
case IcebergType.TYPE_FLOAT:
return IcebergType.floatType();
case IcebergDoubleType.TYPE_DOUBLE:
case IcebergType.TYPE_DOUBLE:
return IcebergType.doubleType();
case IcebergDateType.TYPE_DATE:
case IcebergType.TYPE_DATE:
return IcebergType.dateType();
case IcebergTimeType.TYPE_TIME:
case IcebergType.TYPE_TIME:
return IcebergType.timeType();
case IcebergStringType.TYPE_STRING:
case IcebergType.TYPE_STRING:
return IcebergType.stringType();
case IcebergBinaryType.TYPE_BINARY:
case IcebergType.TYPE_BINARY:
return IcebergType.binaryType();
case IcebergTimestampType.TYPE_TIMESTAMP_TZ:
return IcebergType.timestamptzType();
case IcebergTimestampType.TYPE_TIMESTAMP:
case IcebergType.TYPE_TIMESTAMP_TZ:
return IcebergType.timestampTzType();
case IcebergType.TYPE_TIMESTAMP:
return IcebergType.timestampType();
case IcebergType.TYPE_TIMESTAMP_NS_TZ:
return IcebergType.timestampNanosTzType();
case IcebergType.TYPE_TIMESTAMP_NS:
return IcebergType.timestampNanosType();
default:
Matcher m = DECIMAL_PATTERN.matcher(primitiveType);
if (m.matches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.mapType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.model.id.NessieIdHasher.nessieIdHasher;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -489,7 +489,7 @@ static Stream<Arguments> icebergNested() {
nestedField(102, "topic", false, stringType(), null),
nestedField(103, "partition", false, integerType(), null),
nestedField(104, "offset", false, longType(), null),
nestedField(105, "timestamp", false, timestamptzType(), null),
nestedField(105, "timestamp", false, timestampTzType(), null),
nestedField(106, "timestampType", false, integerType(), null),
nestedField(
107,
Expand Down Expand Up @@ -555,7 +555,7 @@ static Stream<Arguments> icebergNested() {
nestedField(3, "topic", false, stringType(), null),
nestedField(4, "partition", false, integerType(), null),
nestedField(5, "offset", false, longType(), null),
nestedField(6, "timestamp", false, timestamptzType(), null),
nestedField(6, "timestamp", false, timestampTzType(), null),
nestedField(7, "timestampType", false, integerType(), null),
nestedField(
8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timeType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.uuidType;

import java.util.stream.Stream;
Expand Down Expand Up @@ -75,7 +77,9 @@ static Stream<Arguments> types() {
arguments(dateType(), "\"date\""),
arguments(timeType(), "\"time\""),
arguments(timestampType(), "\"timestamp\""),
arguments(timestamptzType(), "\"timestamptz\""),
arguments(timestampTzType(), "\"timestamptz\""),
arguments(timestampNanosType(), "\"timestamp_ns\""),
arguments(timestampNanosTzType(), "\"timestamptz_ns\""),
arguments(uuidType(), "\"uuid\""),
arguments(fixedType(42), "\"fixed[42]\""),
arguments(decimalType(33, 11), "\"decimal(33, 11)\""),
Expand Down Expand Up @@ -153,6 +157,8 @@ static Stream<Arguments> icebergTypes() {
arguments(Types.DecimalType.of(10, 3), decimalType(10, 3)),
arguments(Types.FixedType.ofLength(42), fixedType(42)),
arguments(Types.TimestampType.withoutZone(), timestampType()),
arguments(Types.TimestampType.withZone(), timestamptzType()));
arguments(Types.TimestampType.withZone(), timestampTzType()),
arguments(Types.TimestampNanoType.withoutZone(), timestampNanosType()),
arguments(Types.TimestampNanoType.withZone(), timestampNanosTzType()));
}
}
Loading
Loading