Skip to content

Commit ed357e5

Browse files
committed
Update-schema: Add support for initial-default
1 parent 5458d15 commit ed357e5

File tree

2 files changed

+67
-14
lines changed

2 files changed

+67
-14
lines changed

pyiceberg/table/update/schema.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
from copy import copy
2121
from dataclasses import dataclass
2222
from enum import Enum
23-
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union
23+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union
2424

2525
from pyiceberg.exceptions import ResolveError, ValidationError
26+
from pyiceberg.expressions import literal # type: ignore
2627
from pyiceberg.schema import (
2728
PartnerAccessor,
2829
Schema,
@@ -153,7 +154,12 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
153154
return self
154155

155156
def add_column(
156-
self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc: Optional[str] = None, required: bool = False
157+
self,
158+
path: Union[str, Tuple[str, ...]],
159+
field_type: IcebergType,
160+
doc: Optional[str] = None,
161+
required: bool = False,
162+
default_value: Optional[Any] = None,
157163
) -> UpdateSchema:
158164
"""Add a new column to a nested struct or Add a new top-level column.
159165
@@ -168,6 +174,7 @@ def add_column(
168174
field_type: Type for the new column.
169175
doc: Documentation string for the new column.
170176
required: Whether the new column is required.
177+
default_value: Default value for the new column.
171178
172179
Returns:
173180
This for method chaining.
@@ -177,10 +184,6 @@ def add_column(
177184
raise ValueError(f"Cannot add column with ambiguous name: {path}, provide a tuple instead")
178185
path = (path,)
179186

180-
if required and not self._allow_incompatible_changes:
181-
# Table format version 1 and 2 cannot add required column because there is no initial value
182-
raise ValueError(f"Incompatible change: cannot add required column: {'.'.join(path)}")
183-
184187
name = path[-1]
185188
parent = path[:-1]
186189

@@ -212,13 +215,35 @@ def add_column(
212215

213216
# assign new IDs in order
214217
new_id = self.assign_new_column_id()
218+
new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id)
219+
220+
if default_value is not None:
221+
try:
222+
# To make sure that the value is valid for the type
223+
initial_default = literal(default_value).to(new_type).value
224+
except ValueError as e:
225+
raise ValueError(f"Invalid default value: {e}") from e
226+
else:
227+
initial_default = default_value
228+
229+
if (required and initial_default is None) and not self._allow_incompatible_changes:
230+
# Table format version 1 and 2 cannot add required column because there is no initial value
231+
raise ValueError(f"Incompatible change: cannot add required column: {'.'.join(path)}")
232+
215233

216234
# update tracking for moves
217235
self._added_name_to_id[full_name] = new_id
218236
self._id_to_parent[new_id] = parent_full_path
219237

220-
new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id)
221-
field = NestedField(field_id=new_id, name=name, field_type=new_type, required=required, doc=doc)
238+
field = NestedField(
239+
field_id=new_id,
240+
name=name,
241+
field_type=new_type,
242+
required=required,
243+
doc=doc,
244+
initial_default=initial_default,
245+
write_default=initial_default,
246+
)
222247

223248
if parent_id in self._adds:
224249
self._adds[parent_id].append(field)
@@ -330,6 +355,7 @@ def _set_column_requirement(self, path: Union[str, Tuple[str, ...]], required: b
330355
field_type=updated.field_type,
331356
doc=updated.doc,
332357
required=required,
358+
initial_default=updated.initial_default,
333359
)
334360
else:
335361
self._updates[field.field_id] = NestedField(
@@ -338,6 +364,7 @@ def _set_column_requirement(self, path: Union[str, Tuple[str, ...]], required: b
338364
field_type=field.field_type,
339365
doc=field.doc,
340366
required=required,
367+
initial_default=field.initial_default,
341368
)
342369

343370
def update_column(
@@ -387,6 +414,7 @@ def update_column(
387414
field_type=field_type or updated.field_type,
388415
doc=doc if doc is not None else updated.doc,
389416
required=updated.required,
417+
initial_default=updated.initial_default,
390418
)
391419
else:
392420
self._updates[field.field_id] = NestedField(
@@ -395,6 +423,7 @@ def update_column(
395423
field_type=field_type or field.field_type,
396424
doc=doc if doc is not None else field.doc,
397425
required=field.required,
426+
initial_default=field.initial_default,
398427
)
399428

400429
if required is not None:

tests/integration/test_rest_schema.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from pyiceberg.table.sorting import SortField, SortOrder
2828
from pyiceberg.table.update.schema import UpdateSchema
2929
from pyiceberg.transforms import IdentityTransform
30+
from pyiceberg.typedef import EMPTY_DICT, Properties
3031
from pyiceberg.types import (
3132
BinaryType,
3233
BooleanType,
@@ -69,7 +70,7 @@ def simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table:
6970
return _create_table_with_schema(catalog, table_schema_simple)
7071

7172

72-
def _create_table_with_schema(catalog: Catalog, schema: Schema) -> Table:
73+
def _create_table_with_schema(catalog: Catalog, schema: Schema, properties: Properties = EMPTY_DICT) -> Table:
7374
tbl_name = "default.test_schema_evolution"
7475
try:
7576
catalog.drop_table(tbl_name)
@@ -78,7 +79,7 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema) -> Table:
7879
return catalog.create_table(
7980
identifier=tbl_name,
8081
schema=schema,
81-
properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()},
82+
properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json(), **properties},
8283
)
8384

8485

@@ -1076,9 +1077,8 @@ def test_add_required_column(catalog: Catalog) -> None:
10761077
schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
10771078
table = _create_table_with_schema(catalog, schema_)
10781079
update = table.update_schema()
1079-
with pytest.raises(ValueError) as exc_info:
1080+
with pytest.raises(ValueError, match="Incompatible change: cannot add required column: data"):
10801081
update.add_column(path="data", field_type=IntegerType(), required=True)
1081-
assert "Incompatible change: cannot add required column: data" in str(exc_info.value)
10821082

10831083
new_schema = (
10841084
UpdateSchema(transaction=table.transaction(), allow_incompatible_changes=True)
@@ -1091,16 +1091,40 @@ def test_add_required_column(catalog: Catalog) -> None:
10911091
)
10921092

10931093

1094+
@pytest.mark.integration
1095+
def test_add_required_column_initial_default(catalog: Catalog) -> None:
1096+
schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
1097+
table = _create_table_with_schema(catalog, schema_)
1098+
new_schema = (
1099+
UpdateSchema(transaction=table.transaction())
1100+
.add_column(path="data", field_type=IntegerType(), required=True, default_value=22)
1101+
._apply()
1102+
)
1103+
assert new_schema == Schema(
1104+
NestedField(field_id=1, name="a", field_type=BooleanType(), required=False),
1105+
NestedField(field_id=2, name="data", field_type=IntegerType(), required=True, initial_default=22, write_default=22),
1106+
schema_id=1,
1107+
)
1108+
1109+
1110+
@pytest.mark.integration
1111+
def test_add_required_column_initial_default_invalid_value(catalog: Catalog) -> None:
1112+
schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
1113+
table = _create_table_with_schema(catalog, schema_)
1114+
update = table.update_schema()
1115+
with pytest.raises(ValueError, match="Invalid default value: Could not convert abc into a int"):
1116+
update.add_column(path="data", field_type=IntegerType(), required=True, default_value="abc")
1117+
1118+
10941119
@pytest.mark.integration
10951120
def test_add_required_column_case_insensitive(catalog: Catalog) -> None:
10961121
schema_ = Schema(NestedField(field_id=1, name="id", field_type=BooleanType(), required=False))
10971122
table = _create_table_with_schema(catalog, schema_)
10981123

1099-
with pytest.raises(ValueError) as exc_info:
1124+
with pytest.raises(ValueError, match="already exists: ID"):
11001125
with table.transaction() as txn:
11011126
with txn.update_schema(allow_incompatible_changes=True) as update:
11021127
update.case_sensitive(False).add_column(path="ID", field_type=IntegerType(), required=True)
1103-
assert "already exists: ID" in str(exc_info.value)
11041128

11051129
new_schema = (
11061130
UpdateSchema(transaction=table.transaction(), allow_incompatible_changes=True)

0 commit comments

Comments
 (0)