From 7f619e2cd90ee15aaae74081f5bd5e56a034a34d Mon Sep 17 00:00:00 2001 From: Emily Rockman Date: Tue, 29 Mar 2022 17:04:40 -0500 Subject: [PATCH 1/3] WIP --- core/dbt/context/context_config.py | 2 + core/dbt/contracts/graph/manifest.py | 2 +- core/dbt/contracts/graph/model_config.py | 44 ++++++++++++++++++- core/dbt/contracts/graph/parsed.py | 2 +- core/dbt/contracts/graph/unparsed.py | 9 ++-- core/dbt/parser/manifest.py | 3 ++ core/dbt/parser/schemas.py | 40 ++++++++++++++++- core/dbt/parser/sources.py | 13 +++++- .../functional/sources/test_source_configs.py | 41 +++++++++-------- 9 files changed, 123 insertions(+), 33 deletions(-) diff --git a/core/dbt/context/context_config.py b/core/dbt/context/context_config.py index a0aab160685..2a4e9f2f0e6 100644 --- a/core/dbt/context/context_config.py +++ b/core/dbt/context/context_config.py @@ -182,6 +182,7 @@ def initial_result(self, resource_type: NodeType, base: bool) -> C: def _update_from_config(self, result: C, partial: Dict[str, Any], validate: bool = False) -> C: translated = self._active_project.credentials.translate_aliases(partial) + # breakpoint() return result.update_from( translated, self._active_project.credentials.type, validate=validate ) @@ -203,6 +204,7 @@ def calculate_node_config_dict( base=base, patch_config_dict=patch_config_dict, ) + # breakpoint() finalized = config.finalize_and_validate() return finalized.to_dict(omit_none=True) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 5a8309d3319..2f703ea0a2c 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -453,7 +453,7 @@ class Disabled(Generic[D]): def _update_into(dest: MutableMapping[str, T], new_item: T): """Update dest to overwrite whatever is at dest[new_item.unique_id] with - new_itme. There must be an existing value to overwrite, and the two nodes + new_item. There must be an existing value to overwrite, and the two nodes must have the same original file path. """ unique_id = new_item.unique_id diff --git a/core/dbt/contracts/graph/model_config.py b/core/dbt/contracts/graph/model_config.py index 50074c71359..940f9645c33 100644 --- a/core/dbt/contracts/graph/model_config.py +++ b/core/dbt/contracts/graph/model_config.py @@ -7,7 +7,7 @@ ValidationError, register_pattern, ) -from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed +from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed, FreshnessThreshold, Quoting from dbt.exceptions import InternalException, CompilationException from dbt.contracts.util import Replaceable, list_str from dbt import hooks @@ -256,7 +256,7 @@ def same_contents(cls, unrendered: Dict[str, Any], other: Dict[str, Any]) -> boo # 'meta' moved here from node mergebehavior = { "append": ["pre-hook", "pre_hook", "post-hook", "post_hook", "tags"], - "update": ["quoting", "column_types", "meta"], + "update": ["quoting", "column_types", "meta", "freshness"], } @classmethod @@ -332,9 +332,49 @@ def replace(self, **kwargs): return self.from_dict(dct) +# TODO: determine what all the metadata should be? @dataclass class SourceConfig(BaseConfig): enabled: bool = True + # these fields are included in serialized output, but are not part of + # config comparison (they are part of database_representation) + # TODO: confirm these + loader: Optional[str] = field( + default=None, + metadata=CompareBehavior.Exclude.meta(), + ) + loaded_at_field: Optional[str] = field( + default=None, + metadata=CompareBehavior.Exclude.meta(), + ) + schema: Optional[str] = field( + default=None, + metadata=CompareBehavior.Exclude.meta(), + ) + database: Optional[str] = field( + default=None, + metadata=CompareBehavior.Exclude.meta(), + ) + identifer: Optional[str] = field( + default=None, + metadata=CompareBehavior.Exclude.meta(), + ) + tags: Union[List[str], str] = field( + default_factory=list_str, + metadata=metas(ShowBehavior.Hide, MergeBehavior.Append, CompareBehavior.Exclude), + ) + meta: Dict[str, Any] = field( # TODO: blank? + default_factory=dict, + metadata=MergeBehavior.Update.meta(), + ) + quoting: Quoting = field( # TODO: blank + default_factory=Quoting, + metadata=MergeBehavior.Update.meta(), + ) + freshness: FreshnessThreshold = field( # TODO: blank + default_factory=FreshnessThreshold, + metadata=MergeBehavior.Update.meta(), + ) @dataclass diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index b3803cb9eda..a24b2f97c5c 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -688,7 +688,7 @@ def same_contents(self, old: Optional["ParsedSourceDefinition"]) -> bool: return ( self.same_database_representation(old) and self.same_fqn(old) - and self.same_config(old) + and self.same_config(old) # TODO: ct-201 what actually constitues a change? and self.same_quoting(old) and self.same_freshness(old) and self.same_external(old) diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index d1c0b4a7a5a..5c3074f41dc 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -241,7 +241,7 @@ class Quoting(dbtClassMixin, Mergeable): @dataclass -class UnparsedSourceTableDefinition(HasColumnTests, HasTests): +class UnparsedSourceTableDefinition(HasColumnTests, HasConfig, HasTests): loaded_at_field: Optional[str] = None identifier: Optional[str] = None quoting: Quoting = field(default_factory=Quoting) @@ -257,8 +257,8 @@ def __post_serialize__(self, dct): @dataclass -class UnparsedSourceDefinition(dbtClassMixin, Replaceable): - name: str +class UnparsedSourceDefinition(dbtClassMixin, Replaceable, HasConfig): + name: str = "" description: str = "" meta: Dict[str, Any] = field(default_factory=dict) database: Optional[str] = None @@ -269,7 +269,6 @@ class UnparsedSourceDefinition(dbtClassMixin, Replaceable): loaded_at_field: Optional[str] = None tables: List[UnparsedSourceTableDefinition] = field(default_factory=list) tags: List[str] = field(default_factory=list) - config: Dict[str, Any] = field(default_factory=dict) @property def yaml_key(self) -> "str": @@ -284,7 +283,7 @@ def __post_serialize__(self, dct): @dataclass class SourceTablePatch(dbtClassMixin): - name: str + name: str = "" description: Optional[str] = None meta: Optional[Dict[str, Any]] = None data_type: Optional[str] = None diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 24f3ce76700..e5d2ce80fa4 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -365,6 +365,7 @@ def load(self): self.parse_project( project, project_parser_files[project.project_name], parser_types ) + # breakpoint() # config as expected self._perf_info.parse_project_elapsed = time.perf_counter() - start_parse_projects @@ -373,6 +374,7 @@ def load(self): # in SourcePatcher start_patch = time.perf_counter() patcher = SourcePatcher(self.root_project, self.manifest) + # breakpoint() patcher.construct_sources() self.manifest.sources = patcher.sources self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch @@ -464,6 +466,7 @@ def parse_project( dct = block.file.pp_dict else: dct = block.file.dict_from_yaml + # breakpoint() parser.parse_file(block, dct=dct) else: parser.parse_file(block) diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 18d2eb96b46..e1c7475bb72 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -670,6 +670,7 @@ def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: path = self.yaml.path.original_file_path try: cls.validate(data) + # breakpoint() return cls.from_dict(data) except (ValidationError, JSONValidationException) as exc: msg = error_context(path, self.key, data, exc) @@ -685,9 +686,25 @@ def parse(self) -> List[TestBlock]: for data in self.get_key_dicts(): data = self.project.credentials.translate_aliases(data, recurse=True) + # try: + # # target_type: UnparsedNodeUpdate, UnparsedAnalysisUpdate, + # # or UnparsedMacroUpdate + # self._target_type().validate(data) + # self.normalize_configs_properties(data, data["path"]) + # node = self._target_type().from_dict(data) + # except (ValidationError, JSONValidationException) as exc: + # msg = error_context(path, self.key, data, exc) + # raise ParsingException(msg) from exc + # else: + # yield node + + path = self.yaml.path.original_file_path + + self.normalize_configs_properties(data, path) is_override = "overrides" in data if is_override: - data["path"] = self.yaml.path.original_file_path + # breakpoint() + data["path"] = path patch = self._target_from_dict(SourcePatch, data) assert isinstance(self.yaml.file, SchemaSourceFile) source_file = self.yaml.file @@ -699,13 +716,32 @@ def parse(self) -> List[TestBlock]: source_file.source_patches.append(key) else: source = self._target_from_dict(UnparsedSourceDefinition, data) + # breakpoint() self.add_source_definitions(source) return [] + # We want to raise an error if configs and/or properties are in two places, and copy + # to/from toplevel to config if necessary + def normalize_configs_properties(self, data, path): + # TODO: ct201 - update this for all configs + if "meta" in data: + if "config" in data and "meta" in data["config"]: + raise ParsingException( + f""" + In {path}: found meta dictionary in 'config' dictionary and as top-level key. + Remove the top-level key and define it under 'config' dictionary only. + """.strip() + ) + else: + if "config" not in data: + data["config"] = {} + data["config"]["meta"] = data.pop("meta") + def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: original_file_path = self.yaml.path.original_file_path fqn_path = self.yaml.path.relative_path for table in source.tables: + # breakpoint() unique_id = ".".join( [NodeType.Source, self.project.project_name, source.name, table.name] ) @@ -724,7 +760,9 @@ def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: unique_id=unique_id, resource_type=NodeType.Source, fqn=fqn, + # config=config, ) + # breakpoint() self.manifest.add_source(self.yaml.file, source_def) diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index 44dcdb29b67..f3021b1daae 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -56,6 +56,7 @@ def __init__( # with SourcePatches to produce ParsedSourceDefinitions. def construct_sources(self) -> None: for unique_id, unpatched in self.manifest.sources.items(): + # breakpoint() schema_file = self.manifest.files[unpatched.file_id] if isinstance(unpatched, ParsedSourceDefinition): # In partial parsing, there will be ParsedSourceDefinitions @@ -81,6 +82,7 @@ def construct_sources(self) -> None: # Convert UnpatchedSourceDefinition to a ParsedSourceDefinition parsed = self.parse_source(patched) + # breakpoint() if parsed.config.enabled: self.sources[unique_id] = parsed else: @@ -93,6 +95,7 @@ def patch_source( unpatched: UnpatchedSourceDefinition, patch: Optional[SourcePatch], ) -> UnpatchedSourceDefinition: + # breakpoint() # This skips patching if no patch exists because of the # performance overhead of converting to and from dicts @@ -136,16 +139,20 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> ParsedSourceDefinit # make sure we don't do duplicate tags from source + table tags = sorted(set(itertools.chain(source.tags, table.tags))) + # breakpoint() config = self._generate_source_config( fqn=target.fqn, rendered=True, project_name=target.package_name, + config_call_dict=table.config, ) + # breakpoint() unrendered_config = self._generate_source_config( fqn=target.fqn, rendered=False, project_name=target.package_name, + config_call_dict=table.config, ) if not isinstance(config, SourceConfig): @@ -261,7 +268,9 @@ def parse_source_test( ) return node - def _generate_source_config(self, fqn: List[str], rendered: bool, project_name: str): + def _generate_source_config( + self, fqn: List[str], rendered: bool, project_name: str, config_call_dict: Dict[str, Any] + ): generator: BaseContextConfigGenerator if rendered: generator = ContextConfigGenerator(self.root_project) @@ -269,7 +278,7 @@ def _generate_source_config(self, fqn: List[str], rendered: bool, project_name: generator = UnrenderedConfigGenerator(self.root_project) return generator.calculate_node_config( - config_call_dict={}, + config_call_dict=config_call_dict, fqn=fqn, resource_type=NodeType.Source, project_name=project_name, diff --git a/tests/functional/sources/test_source_configs.py b/tests/functional/sources/test_source_configs.py index 1205dbfdf0a..bf14d89f144 100644 --- a/tests/functional/sources/test_source_configs.py +++ b/tests/functional/sources/test_source_configs.py @@ -12,18 +12,18 @@ def setUp(self): pytest.expected_config = SourceConfig( enabled=True, # TODO: uncomment all this once it's added to SourceConfig, throws error right now - # quoting = Quoting(database=False, schema=False, identifier=False, column=False) - # freshness = FreshnessThreshold( - # warn_after=Time(count=12, period=TimePeriod.hour), - # error_after=Time(count=24, period=TimePeriod.hour), - # filter=None - # ) - # loader = "a_loader" - # loaded_at_field = some_column - # database = custom_database - # schema = custom_schema - # meta = {'languages': ['python']} - # tags = ["important_tag"] + quoting=Quoting(database=False, schema=False, identifier=False, column=False), + freshness=FreshnessThreshold( + warn_after=Time(count=12, period=TimePeriod.hour), + error_after=Time(count=24, period=TimePeriod.hour), + filter=None, + ), + loader="a_loader", + loaded_at_field="some_column", + database="custom_database", + schema="custom_schema", + meta={"languages": ["python"]}, + tags=["important_tag"], ) @@ -126,7 +126,7 @@ def test_source_config_yaml_source_level(self, project): # Test enabled config at source table level in yaml file -# expect fail - not implemented +# expect pass - implemented class TestConfigYamlSourceTable(SourceConfigTests): @pytest.fixture(scope="class") def models(self): @@ -134,7 +134,6 @@ def models(self): "schema.yml": disabled_source_table__schema_yml, } - @pytest.mark.xfail def test_source_config_yaml_source_table(self, project): run_dbt(["parse"]) manifest = get_manifest(project.project_root) @@ -190,7 +189,7 @@ def test_source_all_configs_dbt_project(self, project): sources: - name: test_source config: - enabled: True + enabled: False quoting: database: False schema: False @@ -212,13 +211,13 @@ def test_source_all_configs_dbt_project(self, project): # Test configs other than enabled at sources level in yaml file -# **currently passes since enabled is all that ends up in the -# node.config since it's the only thing implemented +# expect fail - not implemented class TestAllConfigsSourceLevel(SourceConfigTests): @pytest.fixture(scope="class") def models(self): return {"schema.yml": configs_source_level__schema_yml} + @pytest.mark.xfail def test_source_all_configs_source_level(self, project): run_dbt(["parse"]) manifest = get_manifest(project.project_root) @@ -263,13 +262,13 @@ def test_source_all_configs_source_level(self, project): # Test configs other than enabled at source table level in yml file -# expect fail - not implemented +# only implemented feature so far! class TestSourceAllConfigsSourceTable(SourceConfigTests): @pytest.fixture(scope="class") def models(self): return {"schema.yml": configs_source_table__schema_yml} - @pytest.mark.xfail + # @pytest.mark.xfail def test_source_all_configs_source_table(self, project): run_dbt(["parse"]) manifest = get_manifest(project.project_root) @@ -524,13 +523,13 @@ def check_source_configs_and_properties(self, project): # Check backwards compatibility of setting configs as properties at top level -# expect pass since the properties don't get copied to the node.config yet so -# the values match since we haven't build the full SourceConfig yet +# expect fail - not implemented class TestPropertiesAsConfigs(SourceBackwardsCompatibility): @pytest.fixture(scope="class") def models(self): return {"schema.yml": properties_as_configs__schema_yml} + @pytest.mark.xfail def test_source_configs_as_properties(self, project): self.check_source_configs_and_properties(project) From 23fa2d622f6082ee9411a53f00bb5cd0c5440c54 Mon Sep 17 00:00:00 2001 From: Emily Rockman Date: Tue, 29 Mar 2022 17:06:14 -0500 Subject: [PATCH 2/3] remove excessive breakpoints --- core/dbt/parser/schemas.py | 19 +------------------ core/dbt/parser/sources.py | 5 ----- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index e1c7475bb72..4ecdd52c7e3 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -670,7 +670,6 @@ def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: path = self.yaml.path.original_file_path try: cls.validate(data) - # breakpoint() return cls.from_dict(data) except (ValidationError, JSONValidationException) as exc: msg = error_context(path, self.key, data, exc) @@ -686,24 +685,11 @@ def parse(self) -> List[TestBlock]: for data in self.get_key_dicts(): data = self.project.credentials.translate_aliases(data, recurse=True) - # try: - # # target_type: UnparsedNodeUpdate, UnparsedAnalysisUpdate, - # # or UnparsedMacroUpdate - # self._target_type().validate(data) - # self.normalize_configs_properties(data, data["path"]) - # node = self._target_type().from_dict(data) - # except (ValidationError, JSONValidationException) as exc: - # msg = error_context(path, self.key, data, exc) - # raise ParsingException(msg) from exc - # else: - # yield node - path = self.yaml.path.original_file_path self.normalize_configs_properties(data, path) is_override = "overrides" in data if is_override: - # breakpoint() data["path"] = path patch = self._target_from_dict(SourcePatch, data) assert isinstance(self.yaml.file, SchemaSourceFile) @@ -716,7 +702,6 @@ def parse(self) -> List[TestBlock]: source_file.source_patches.append(key) else: source = self._target_from_dict(UnparsedSourceDefinition, data) - # breakpoint() self.add_source_definitions(source) return [] @@ -741,7 +726,6 @@ def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: original_file_path = self.yaml.path.original_file_path fqn_path = self.yaml.path.relative_path for table in source.tables: - # breakpoint() unique_id = ".".join( [NodeType.Source, self.project.project_name, source.name, table.name] ) @@ -760,9 +744,8 @@ def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: unique_id=unique_id, resource_type=NodeType.Source, fqn=fqn, - # config=config, + # config=config, #TODO ? ) - # breakpoint() self.manifest.add_source(self.yaml.file, source_def) diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index f3021b1daae..5066ac1fa05 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -56,7 +56,6 @@ def __init__( # with SourcePatches to produce ParsedSourceDefinitions. def construct_sources(self) -> None: for unique_id, unpatched in self.manifest.sources.items(): - # breakpoint() schema_file = self.manifest.files[unpatched.file_id] if isinstance(unpatched, ParsedSourceDefinition): # In partial parsing, there will be ParsedSourceDefinitions @@ -82,7 +81,6 @@ def construct_sources(self) -> None: # Convert UnpatchedSourceDefinition to a ParsedSourceDefinition parsed = self.parse_source(patched) - # breakpoint() if parsed.config.enabled: self.sources[unique_id] = parsed else: @@ -95,7 +93,6 @@ def patch_source( unpatched: UnpatchedSourceDefinition, patch: Optional[SourcePatch], ) -> UnpatchedSourceDefinition: - # breakpoint() # This skips patching if no patch exists because of the # performance overhead of converting to and from dicts @@ -139,14 +136,12 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> ParsedSourceDefinit # make sure we don't do duplicate tags from source + table tags = sorted(set(itertools.chain(source.tags, table.tags))) - # breakpoint() config = self._generate_source_config( fqn=target.fqn, rendered=True, project_name=target.package_name, config_call_dict=table.config, ) - # breakpoint() unrendered_config = self._generate_source_config( fqn=target.fqn, From 9a539901496ad4af409db9a41d98d12eb4627e4d Mon Sep 17 00:00:00 2001 From: Emily Rockman Date: Tue, 29 Mar 2022 17:16:17 -0500 Subject: [PATCH 3/3] missed breakpoints --- core/dbt/parser/manifest.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index e5d2ce80fa4..24f3ce76700 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -365,7 +365,6 @@ def load(self): self.parse_project( project, project_parser_files[project.project_name], parser_types ) - # breakpoint() # config as expected self._perf_info.parse_project_elapsed = time.perf_counter() - start_parse_projects @@ -374,7 +373,6 @@ def load(self): # in SourcePatcher start_patch = time.perf_counter() patcher = SourcePatcher(self.root_project, self.manifest) - # breakpoint() patcher.construct_sources() self.manifest.sources = patcher.sources self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch @@ -466,7 +464,6 @@ def parse_project( dct = block.file.pp_dict else: dct = block.file.dict_from_yaml - # breakpoint() parser.parse_file(block, dct=dct) else: parser.parse_file(block)