Skip to content

Commit

Permalink
dt/dl/schema: Integration tests for partition spec interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Feb 20, 2025
1 parent de51ccb commit 202a811
Showing 1 changed file with 102 additions and 12 deletions.
114 changes: 102 additions & 12 deletions tests/rptest/tests/datalake/schema_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def check_table_schema(
class EvolutionTestCase(NamedTuple):
initial_schema: GenericSchema
next_schema: GenericSchema
partition_spec: str | None = None


LEGAL_TEST_CASES = {
Expand Down Expand Up @@ -264,6 +265,7 @@ class EvolutionTestCase(NamedTuple):
('ordinal', 'integer'),
],
),
partition_spec="(verifier_string)",
),
"drop_column":
EvolutionTestCase(
Expand Down Expand Up @@ -308,6 +310,7 @@ class EvolutionTestCase(NamedTuple):
('verifier_string', 'varchar'),
],
),
partition_spec="(verifier_string)",
),
"promote_column":
EvolutionTestCase(
Expand Down Expand Up @@ -345,6 +348,7 @@ class EvolutionTestCase(NamedTuple):
('ordinal', 'bigint'),
],
),
partition_spec="(ordinal)",
),
"reorder_columns":
EvolutionTestCase(
Expand Down Expand Up @@ -399,6 +403,7 @@ class EvolutionTestCase(NamedTuple):
('third', 'varchar'),
],
),
partition_spec="(first)",
),
}

Expand Down Expand Up @@ -487,16 +492,21 @@ def select(self,
@contextmanager
def setup_services(self,
query_engine: QueryEngineType,
compat_level: str = "NONE"):
compat_level: str = "NONE",
partition_spec: str = None):
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
catalog_type=filesystem_catalog_type(),
include_query_engines=[
query_engine,
]) as dl:
config = {}
if partition_spec is not None:
config["redpanda.iceberg.partition.spec"] = partition_spec
dl.create_iceberg_enabled_topic(
self.topic_name,
iceberg_mode="value_schema_id_prefix",
config=config,
)
SchemaRegistryClient({
'url':
Expand All @@ -518,11 +528,11 @@ def test_legal_schema_evolution(self, cloud_storage_type, query_engine,
Test that rows written with schema A are still readable after evolving
the table to schema B.
"""
with self.setup_services(query_engine) as dl:
tc = LEGAL_TEST_CASES[test_case]
with self.setup_services(query_engine,
partition_spec=tc.partition_spec) as dl:
count = 10
ctx = TranslationContext()
tc = LEGAL_TEST_CASES[test_case]

tc.initial_schema.produce(dl,
self.topic_name,
count,
Expand Down Expand Up @@ -557,11 +567,11 @@ def test_illegal_schema_evolution(self, cloud_storage_type, query_engine,
check that records produced with an incompatible schema don't wind up
in the table.
"""
with self.setup_services(query_engine) as dl:
tc = ILLEGAL_TEST_CASES[test_case]
with self.setup_services(query_engine,
partition_spec=tc.partition_spec) as dl:
count = 10
ctx = TranslationContext()
tc = ILLEGAL_TEST_CASES[test_case]

tc.initial_schema.produce(dl,
self.topic_name,
count,
Expand Down Expand Up @@ -601,12 +611,12 @@ def test_dropped_column_no_collision(self, cloud_storage_type,
with self.setup_services(query_engine) as dl:
count = 10
ctx = TranslationContext()
initial_schema, next_schema = LEGAL_TEST_CASES["drop_column"]
initial_schema, next_schema, _ = LEGAL_TEST_CASES["drop_column"]

dropped_field_names = list(
set(initial_schema.field_names) - set(next_schema.field_names))

for schema in LEGAL_TEST_CASES["drop_column"]:
for schema in [initial_schema, next_schema]:
schema.produce(dl,
self.topic_name,
count,
Expand Down Expand Up @@ -674,7 +684,7 @@ def test_dropped_column_select_fails(self, cloud_storage_type,
with self.setup_services(query_engine) as dl:
count = 10
ctx = TranslationContext()
initial_schema, next_schema = LEGAL_TEST_CASES['drop_column']
initial_schema, next_schema, _ = LEGAL_TEST_CASES['drop_column']
dropped_field_names = list(
set(initial_schema.field_names) - set(next_schema.field_names))

Expand Down Expand Up @@ -713,7 +723,8 @@ def test_reorder_columns(self, cloud_storage_type, query_engine,
with self.setup_services(query_engine) as dl:
count = 10
ctx = TranslationContext()
initial_schema, next_schema = LEGAL_TEST_CASES['reorder_columns']
initial_schema, next_schema, _ = LEGAL_TEST_CASES[
'reorder_columns']
for schema in [initial_schema, next_schema]:
schema.produce(dl,
self.topic_name,
Expand Down Expand Up @@ -748,7 +759,7 @@ def test_old_schema_writer(self, cloud_storage_type, query_engine,
count = 10
ctx = TranslationContext()

initial_schema, next_schema = LEGAL_TEST_CASES[test_case]
initial_schema, next_schema, _ = LEGAL_TEST_CASES[test_case]

for schema in [initial_schema, next_schema]:
schema.produce(dl,
Expand All @@ -769,3 +780,82 @@ def test_old_schema_writer(self, cloud_storage_type, query_engine,

assert len(select_out) == count * 3, \
f"Expected {count*3} rows, got {len(select_out)}"

@cluster(num_nodes=3)
@matrix(
cloud_storage_type=supported_storage_types(),
query_engine=QUERY_ENGINES,
use_partition_spec=[True, False],
)
def test_partition_spec_evo(self, cloud_storage_type, query_engine,
use_partition_spec):

tc = EvolutionTestCase(
initial_schema=GenericSchema(
fields=[
{
"name": "ts",
"type": {
"type": "int",
"logicalType": "date",
}
},
],
generate_record=lambda x: {
"ts": int(x),
},
spark_table=[('ts', 'date')],
trino_table=[
('ts', 'date'),
],
),
next_schema=GenericSchema(
fields=[
{
"name": "ts",
"type": {
"type": "long",
"logicalType": "timestamp-millis",
}
},
],
generate_record=lambda x: {
"ts": int(x) * 60 * 60 * 24,
},
spark_table=[('ts', 'timestamp_ntz')],
trino_table=[
('ts', 'timestamp(6)'),
],
),
partition_spec="(ts)",
)

self.logger.debug(
f"Schema evolution {'fails' if use_partition_spec else 'succeeds'} if the date field is {'not ' if not use_partition_spec else ''}referenced in the partition spec"
)
pspec = tc.partition_spec if use_partition_spec else None
with self.setup_services(query_engine, partition_spec=pspec) as dl:
count = 10
ctx = TranslationContext()
tc.initial_schema.produce(dl,
self.topic_name,
count,
ctx,
mode=ProducerType.AVRO)

tc.initial_schema.check_table_schema(dl, self.table_name,
query_engine)

tc.next_schema.produce(dl,
self.topic_name,
count,
ctx,
should_translate=not use_partition_spec,
mode=ProducerType.AVRO)

if use_partition_spec:
tc.initial_schema.check_table_schema(dl, self.table_name,
query_engine)
else:
tc.next_schema.check_table_schema(dl, self.table_name,
query_engine)

0 comments on commit 202a811

Please sign in to comment.