forked from dbt-labs/dbt-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_defer_state.py
135 lines (110 loc) · 4.4 KB
/
test_defer_state.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from test.integration.base import DBTIntegrationTest, use_profile
import copy
import json
import os
import shutil
import pytest
class TestDeferState(DBTIntegrationTest):
@property
def schema(self):
return "defer_state_062"
@property
def models(self):
return "models"
def setUp(self):
self.other_schema = None
super().setUp()
self._created_schemas.add(self.other_schema)
@property
def project_config(self):
return {
'config-version': 2,
'seeds': {
'test': {
'quote_columns': True,
}
}
}
def get_profile(self, adapter_type):
if self.other_schema is None:
self.other_schema = self.unique_schema() + '_other'
if self.adapter_type == 'snowflake':
self.other_schema = self.other_schema.upper()
profile = super().get_profile(adapter_type)
default_name = profile['test']['target']
profile['test']['outputs']['otherschema'] = copy.deepcopy(profile['test']['outputs'][default_name])
profile['test']['outputs']['otherschema']['schema'] = self.other_schema
return profile
def copy_state(self):
assert not os.path.exists('state')
os.makedirs('state')
shutil.copyfile('target/manifest.json', 'state/manifest.json')
def run_and_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['run'])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
# copy files over from the happy times when we had a good target
self.copy_state()
# no state, still fails
self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False)
# with state it should work though
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
with open('target/manifest.json') as fp:
data = json.load(fp)
assert data['nodes']['seed.test.seed']['deferred']
assert len(results) == 1
def run_switchdirs_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2
# copy files over from the happy times when we had a good target
self.copy_state()
self.use_default_project({'source-paths': ['changed_models']})
# the sql here is just wrong, so it should fail
self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=False,
)
# but this should work since we just use the old happy model
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=True,
)
self.use_default_project({'source-paths': ['changed_models_bad']})
# this should fail because the table model refs a broken ephemeral
# model, which it should see
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=False,
)
@use_profile('postgres')
def test_postgres_state_changetarget(self):
self.run_and_defer()
# these should work without --defer!
self.run_dbt(['test'])
self.run_dbt(['snapshot'])
# make sure these commands don't work with --defer
with pytest.raises(SystemExit):
self.run_dbt(['seed', '--defer'])
with pytest.raises(SystemExit):
self.run_dbt(['test', '--defer'])
with pytest.raises(SystemExit):
self.run_dbt(['snapshot', '--defer'])
@use_profile('postgres')
def test_postgres_stat_changedir(self):
self.run_switchdirs_defer()
@use_profile('snowflake')
def test_snowflake_state_changetarget(self):
self.run_and_defer()
@use_profile('redshift')
def test_redshift_state_changetarget(self):
self.run_and_defer()
@use_profile('bigquery')
def test_bigquery_state_changetarget(self):
self.run_and_defer()