diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 2970f3d7718..b4e24b4ff51 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -7,6 +7,7 @@ patch_microbatch_end_time, relation_from_name, run_dbt, + run_dbt_and_capture, write_file, ) @@ -69,6 +70,81 @@ select * from {{ source('seed_sources', 'raw_source') }} """ +custom_microbatch_strategy = """ +{% macro get_incremental_microbatch_sql(arg_dict) %} + {% do log('custom microbatch strategy', info=True) %} + + {%- set dest_cols_csv = get_quoted_csv(arg_dict["dest_columns"] | map(attribute="name")) -%} + + insert into {{ arg_dict["target_relation"] }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ arg_dict["temp_relation"] }} + ) + +{% endmacro %} +""" + + +class BaseMicrobatchCustomUserStrategy: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"microbatch.sql": custom_microbatch_strategy} + + +class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy): + def test_use_custom_microbatch_strategy_by_default(self, project): + with mock.patch.object( + type(project.adapter), "valid_incremental_strategies", lambda _: [] + ): + # Initial run + run_dbt(["run"]) + + # Incremental run uses custom strategy + _, logs = run_dbt_and_capture(["run"]) + assert "custom microbatch strategy" in logs + + +class TestMicrobatchCustomUserStrategyEnvVarTrueValid(BaseMicrobatchCustomUserStrategy): + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( + self, project + ): + with mock.patch.object( + type(project.adapter), "valid_incremental_strategies", lambda _: ["microbatch"] + ): + # Initial run + run_dbt(["run"]) + + # Incremental run uses custom strategy + _, logs = run_dbt_and_capture(["run"]) + assert "custom microbatch strategy" in logs + + +# TODO: Consider a behaviour flag here if DBT_EXPERIMENTAL_MICROBATCH is removed +# Since this causes an exception prior to using an override +class TestMicrobatchCustomUserStrategyEnvVarTrueInvalid(BaseMicrobatchCustomUserStrategy): + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( + self, project + ): + with mock.patch.object( + type(project.adapter), "valid_incremental_strategies", lambda _: [] + ): + # Initial run + run_dbt(["run"]) + + # Incremental run fails + _, logs = run_dbt_and_capture(["run"], expect_pass=False) + assert "'microbatch' is not valid" in logs + class TestMicrobatchCLI: @pytest.fixture(scope="class")