Skip to content

Commit

Permalink
Add more comprehensive tests for before/after.
Browse files Browse the repository at this point in the history
  • Loading branch information
VersusFacit committed Sep 26, 2024
1 parent 748a783 commit 0204690
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions tests/functional/iceberg/test_incremental_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path


from dbt.tests.util import run_dbt, rm_file, write_file
from dbt.tests.util import run_dbt, run_dbt_and_capture, rm_file, write_file

_SEED_INCREMENTAL_STRATEGIES = """
world_id,world_name,boss
Expand All @@ -27,7 +27,7 @@
{{{{
config(
materialized='incremental',
table_format = "iceberg",
table_format='iceberg',
incremental_strategy='{strategy}',
unique_key="world_id",
external_volume = "s3_iceberg_snow",
Expand All @@ -51,6 +51,10 @@
UPDATE {database}.{schema}.upstream_table set world_name = 'Twin Bridges', boss = 'Ludwig' where world_id = 4;
"""

_QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT = """
UPDATE {database}.{schema}.upstream_table set world_name = 'Doughnut Plains' where world_id = 2;
"""


class TestIcebergMergeStrategies:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -81,6 +85,24 @@ def test_incremental_strategies_build(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4

def __check_correct_operations(self, model_name, /, rows_affected, status="SUCCESS"):
run_results, stdout = run_dbt_and_capture(
["show", "--inline", f"select * from {{{{ ref('{model_name}') }}}} where world_id = 4"]
)
assert run_results[0].adapter_response["rows_affected"] == rows_affected
assert run_results[0].adapter_response["code"] == status

if model_name != "append":
run_results, stdout = run_dbt_and_capture(
[
"show",
"--inline",
f"select * from {{{{ ref('{model_name}') }}}} where world_id = 2",
]
)
run_results[0].adapter_response["rows_affected"] == 1
assert "Doughnut" not in stdout

def test_incremental_strategies_with_update(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4
Expand All @@ -90,26 +112,15 @@ def test_incremental_strategies_with_update(self, project, setup_class):
database=project.database, schema=project.test_schema
)
)
project.run_sql(
_QUERY_UPDATE_UPSTREAM_TABLE_NO_EFFECT.format(
database=project.database, schema=project.test_schema
)
)

run_results = run_dbt(["run", "-s", "append", "merge", "delete_insert"])
assert len(run_results) == 3
run_results[0].adapter_response["code"] == "SUCCESS"
run_results[0].adapter_response["rows_affected"] == 3

append_run_results = run_dbt(
["show", "--inline", "select * from {{ ref('append') }} where world_id = 4"]
)
assert append_run_results[0].adapter_response["code"] == "SUCCESS"
assert append_run_results[0].adapter_response["rows_affected"] == 2

merge_run_results = run_dbt(
["show", "--inline", "select * from {{ ref('merge') }} where world_id = 4"]
)
assert merge_run_results[0].adapter_response["code"] == "SUCCESS"
assert merge_run_results[0].adapter_response["rows_affected"] == 1

delete_insert_run_results = run_dbt(
["show", "--inline", "select * from {{ ref('delete_insert') }} where world_id = 4"]
)
assert delete_insert_run_results[0].adapter_response["code"] == "SUCCESS"
assert delete_insert_run_results[0].adapter_response["rows_affected"] == 1
self.__check_correct_operations("append", rows_affected=2)
self.__check_correct_operations("merge", rows_affected=1)
self.__check_correct_operations("delete_insert", rows_affected=1)

0 comments on commit 0204690

Please sign in to comment.