Skip to content

V3: Introduce timestamp_ns and timestamptz_ns #1632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 23, 2025
18 changes: 18 additions & 0 deletions pyiceberg/avro/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class TimestampReader(IntegerReader):
"""


class TimestampNanoReader(IntegerReader):
"""Reads a nanosecond granularity timestamp from the stream.

Long is decoded as python integer which represents
the number of nanoseconds from the unix epoch, 1 January 1970.
"""


class TimestamptzReader(IntegerReader):
"""Reads a microsecond granularity timestamptz from the stream.

Expand All @@ -185,6 +193,16 @@ class TimestamptzReader(IntegerReader):
"""


class TimestamptzNanoReader(IntegerReader):
"""Reads a microsecond granularity timestamptz from the stream.

Long is decoded as python integer which represents
the number of nanoseconds from the unix epoch, 1 January 1970.

Adjusted to UTC.
"""


class StringReader(Reader):
def read(self, decoder: BinaryDecoder) -> str:
return decoder.read_utf8()
Expand Down
24 changes: 24 additions & 0 deletions pyiceberg/avro/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
StringReader,
StructReader,
TimeReader,
TimestampNanoReader,
TimestampReader,
TimestamptzNanoReader,
TimestamptzReader,
UnknownReader,
UUIDReader,
Expand All @@ -64,6 +66,8 @@
OptionWriter,
StringWriter,
StructWriter,
TimestampNanoWriter,
TimestamptzNanoWriter,
TimestamptzWriter,
TimestampWriter,
TimeWriter,
Expand Down Expand Up @@ -99,7 +103,9 @@
PrimitiveType,
StringType,
StructType,
TimestampNanoType,
TimestampType,
TimestamptzNanoType,
TimestamptzType,
TimeType,
UnknownType,
Expand Down Expand Up @@ -184,9 +190,15 @@ def visit_time(self, time_type: TimeType) -> Writer:
def visit_timestamp(self, timestamp_type: TimestampType) -> Writer:
return TimestampWriter()

def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType) -> Writer:
return TimestampNanoWriter()

def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer:
return TimestamptzWriter()

def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> Writer:
return TimestamptzNanoWriter()

def visit_string(self, string_type: StringType) -> Writer:
return StringWriter()

Expand Down Expand Up @@ -332,9 +344,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Wri
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Writer:
return TimestampWriter()

def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Writer:
return TimestampNanoWriter()

def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Writer:
return TimestamptzWriter()

def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Writer:
return TimestamptzNanoWriter()

def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Writer:
return StringWriter()

Expand Down Expand Up @@ -465,9 +483,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Rea
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
return TimestampReader()

def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Reader:
return TimestampNanoReader()

def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
return TimestamptzReader()

def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Reader:
return TimestamptzNanoReader()

def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader:
return StringReader()

Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/avro/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,24 @@ def write(self, encoder: BinaryEncoder, val: int) -> None:
encoder.write_int(val)


@dataclass(frozen=True)
class TimestampNanoWriter(Writer):
def write(self, encoder: BinaryEncoder, val: int) -> None:
encoder.write_int(val)


@dataclass(frozen=True)
class TimestamptzWriter(Writer):
def write(self, encoder: BinaryEncoder, val: int) -> None:
encoder.write_int(val)


@dataclass(frozen=True)
class TimestamptzNanoWriter(Writer):
def write(self, encoder: BinaryEncoder, val: int) -> None:
encoder.write_int(val)


@dataclass(frozen=True)
class StringWriter(Writer):
def write(self, encoder: BinaryEncoder, val: Any) -> None:
Expand Down
17 changes: 16 additions & 1 deletion pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
LongType,
PrimitiveType,
StringType,
TimestampNanoType,
TimestampType,
TimestamptzNanoType,
TimestamptzType,
TimeType,
UnknownType,
Expand All @@ -66,6 +68,7 @@
date_str_to_days,
date_to_days,
datetime_to_micros,
datetime_to_nanos,
days_to_date,
micros_to_time,
micros_to_timestamp,
Expand Down Expand Up @@ -127,7 +130,9 @@ def _(primitive_type: BooleanType, value_str: str) -> Union[int, float, str, uui
@partition_to_py.register(DateType)
@partition_to_py.register(TimeType)
@partition_to_py.register(TimestampType)
@partition_to_py.register(TimestampNanoType)
@partition_to_py.register(TimestamptzType)
@partition_to_py.register(TimestamptzNanoType)
@handle_none
def _(primitive_type: PrimitiveType, value_str: str) -> int:
"""Convert a string to an integer value.
Expand Down Expand Up @@ -213,12 +218,20 @@ def _(_: PrimitiveType, value: int) -> bytes:

@to_bytes.register(TimestampType)
@to_bytes.register(TimestamptzType)
def _(_: TimestampType, value: Union[datetime, int]) -> bytes:
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
if isinstance(value, datetime):
value = datetime_to_micros(value)
return _LONG_STRUCT.pack(value)


@to_bytes.register(TimestampNanoType)
@to_bytes.register(TimestamptzNanoType)
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
if isinstance(value, datetime):
value = datetime_to_nanos(value)
return _LONG_STRUCT.pack(value)


@to_bytes.register(DateType)
def _(_: DateType, value: Union[date, int]) -> bytes:
if isinstance(value, date):
Expand Down Expand Up @@ -319,6 +332,8 @@ def _(_: PrimitiveType, b: bytes) -> int:
@from_bytes.register(TimeType)
@from_bytes.register(TimestampType)
@from_bytes.register(TimestamptzType)
@from_bytes.register(TimestampNanoType)
@from_bytes.register(TimestamptzNanoType)
def _(_: PrimitiveType, b: bytes) -> int:
return _LONG_STRUCT.unpack(b)[0]

Expand Down
14 changes: 14 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@
PrimitiveType,
StringType,
StructType,
TimestampNanoType,
TimestampType,
TimestamptzNanoType,
TimestamptzType,
TimeType,
UnknownType,
Expand Down Expand Up @@ -662,9 +664,15 @@ def visit_time(self, _: TimeType) -> pa.DataType:
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")

def visit_timestamp_ns(self, _: TimestampNanoType) -> pa.DataType:
return pa.timestamp(unit="ns")

def visit_timestamptz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="UTC")

def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
return pa.timestamp(unit="ns", tz="UTC")

def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()

Expand Down Expand Up @@ -1894,9 +1902,15 @@ def visit_time(self, time_type: TimeType) -> str:
def visit_timestamp(self, timestamp_type: TimestampType) -> str:
return "INT64"

def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> str:
return "INT64"

def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> str:
return "INT64"

def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> str:
return "INT64"

def visit_string(self, string_type: StringType) -> str:
return "BYTE_ARRAY"

Expand Down
41 changes: 41 additions & 0 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
PrimitiveType,
StringType,
StructType,
TimestampNanoType,
TimestampType,
TimestamptzNanoType,
TimestamptzType,
TimeType,
UnknownType,
Expand Down Expand Up @@ -362,6 +364,21 @@ def _validate_identifier_field(self, field_id: int) -> None:
f"Cannot add field {field.name} as an identifier field: must not be nested in an optional field {parent}"
)

def check_format_version_compatibility(self, format_version: int) -> None:
"""Check that the schema is compatible for the given table format version.

Args:
format_version: The Iceberg table format version.

Raises:
ValueError: If the schema is not compatible for the format version.
"""
for field in self._lazy_id_to_field.values():
if format_version < field.field_type.minimum_format_version():
raise ValueError(
f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}"
)


class SchemaVisitor(Generic[T], ABC):
def before_field(self, field: NestedField) -> None:
Expand Down Expand Up @@ -522,8 +539,12 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) ->
return self.visit_time(primitive, primitive_partner)
elif isinstance(primitive, TimestampType):
return self.visit_timestamp(primitive, primitive_partner)
elif isinstance(primitive, TimestampNanoType):
return self.visit_timestamp_ns(primitive, primitive_partner)
elif isinstance(primitive, TimestamptzType):
return self.visit_timestamptz(primitive, primitive_partner)
elif isinstance(primitive, TimestamptzNanoType):
return self.visit_timestamptz_ns(primitive, primitive_partner)
elif isinstance(primitive, StringType):
return self.visit_string(primitive, primitive_partner)
elif isinstance(primitive, UUIDType):
Expand Down Expand Up @@ -573,10 +594,18 @@ def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
"""Visit a TimestampType."""

@abstractmethod
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[P]) -> T:
"""Visit a TimestampNanoType."""

@abstractmethod
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
"""Visit a TimestamptzType."""

@abstractmethod
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[P]) -> T:
"""Visit a TimestamptzNanoType."""

@abstractmethod
def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
"""Visit a StringType."""
Expand Down Expand Up @@ -706,8 +735,12 @@ def primitive(self, primitive: PrimitiveType) -> T:
return self.visit_time(primitive)
elif isinstance(primitive, TimestampType):
return self.visit_timestamp(primitive)
elif isinstance(primitive, TimestampNanoType):
return self.visit_timestamp_ns(primitive)
elif isinstance(primitive, TimestamptzType):
return self.visit_timestamptz(primitive)
elif isinstance(primitive, TimestamptzNanoType):
return self.visit_timestamptz_ns(primitive)
elif isinstance(primitive, StringType):
return self.visit_string(primitive)
elif isinstance(primitive, UUIDType):
Expand Down Expand Up @@ -759,10 +792,18 @@ def visit_time(self, time_type: TimeType) -> T:
def visit_timestamp(self, timestamp_type: TimestampType) -> T:
"""Visit a TimestampType."""

@abstractmethod
def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> T:
"""Visit a TimestampNanoType."""

@abstractmethod
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
"""Visit a TimestamptzType."""

@abstractmethod
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> T:
"""Visit a TimestamptzNanoType."""

@abstractmethod
def visit_string(self, string_type: StringType) -> T:
"""Visit a StringType."""
Expand Down
7 changes: 5 additions & 2 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,15 +578,18 @@ def new_table_metadata(
) -> TableMetadata:
from pyiceberg.table import TableProperties

# Remove format-version so it does not get persisted
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))

schema.check_format_version_compatibility(format_version)

fresh_schema = assign_fresh_schema_ids(schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)

if table_uuid is None:
table_uuid = uuid.uuid4()

# Remove format-version so it does not get persisted
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
if format_version == 1:
return TableMetadataV1(
location=location,
Expand Down
Loading