From 86fac6039d8d0f001f3916c430be1d42f446f659 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Mon, 31 Jul 2023 23:52:40 +0000 Subject: [PATCH] Add support for glob patterns in source stream names --- singer_sdk/mapper.py | 82 +++++++++++++++++++++------------------ tests/core/test_mapper.py | 80 +++++++++++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 39 deletions(-) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index f9bf3017bc..e608d8175c 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -11,6 +11,7 @@ import hashlib import logging import typing as t +import fnmatch from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError from singer_sdk.helpers import _simpleeval as simpleeval @@ -691,59 +692,64 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 if isinstance(stream_map_val, dict) else stream_map_val ) - stream_alias: str = stream_map_key source_stream: str = stream_map_key - if isinstance(stream_def, str) and stream_def != NULL_STRING: - if stream_name == stream_map_key: - # TODO: Add any expected cases for str expressions (currently none) - pass - - msg = f"Option '{stream_map_key}:{stream_def}' is not expected." - raise StreamMapConfigError(msg) + stream_alias: str = stream_map_key - if stream_def is None or stream_def == NULL_STRING: - if stream_name != stream_map_key: - continue + is_source_stream_primary = True + if isinstance(stream_def, dict): + if MAPPER_SOURCE_OPTION in stream_def: + # : __source__: + source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) + is_source_stream_primary = False + elif MAPPER_ALIAS_OPTION in stream_def: + # : __alias__: + stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) + + if stream_name == source_stream: + # Exact match + pass + elif fnmatch.fnmatch(stream_name, source_stream): + # Wildcard match + if stream_alias == source_stream: + stream_alias = stream_name + source_stream = stream_name + else: + continue - self.stream_maps[stream_map_key][0] = RemoveRecordTransform( - stream_alias=stream_map_key, + if isinstance(stream_def, dict): + mapper = CustomStreamMap( + stream_alias=stream_alias, + map_transform=stream_def, + map_config=self.map_config, + raw_schema=schema, + key_properties=key_properties, + flattening_options=self.flattening_options, + ) + elif stream_def is None or (isinstance(stream_def, str) and stream_def == NULL_STRING): + mapper = RemoveRecordTransform( + stream_alias=stream_alias, raw_schema=schema, key_properties=None, flattening_options=self.flattening_options, ) - logging.info("Set null tansform as default for '%s'", stream_name) - continue + logging.info(f"Set null transform as default for '{stream_name}'") + + elif isinstance(stream_def, str): + # Non-NULL string values are not currently supported + msg = f"Option '{stream_map_key}:{stream_def}' is not expected." + raise StreamMapConfigError(msg) - if not isinstance(stream_def, dict): + else: msg = ( f"Unexpected stream definition type. Expected str, dict, or None. " f"Got '{type(stream_def).__name__}'." ) raise StreamMapConfigError(msg) - if MAPPER_SOURCE_OPTION in stream_def: - source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) - - if source_stream != stream_name: - # Not a match - continue - - if MAPPER_ALIAS_OPTION in stream_def: - stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) - - mapper = CustomStreamMap( - stream_alias=stream_alias, - map_transform=stream_def, - map_config=self.map_config, - raw_schema=schema, - key_properties=key_properties, - flattening_options=self.flattening_options, - ) - - if source_stream == stream_map_key: + if is_source_stream_primary: # Zero-th mapper should be the same-keyed mapper. # Override the default mapper with this custom map. - self.stream_maps[stream_name][0] = mapper + self.stream_maps[source_stream][0] = mapper else: # Additional mappers for aliasing and multi-projection: - self.stream_maps[stream_name].append(mapper) + self.stream_maps[source_stream].append(mapper) diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 036d7586ae..32e300c23a 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -46,12 +46,13 @@ def sample_catalog_dict() -> dict: Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), - Property("description", StringType), + Property("create_date", StringType), ).to_dict() foobars_schema = PropertiesList( Property("the", StringType), Property("brown", StringType), ).to_dict() + singular_schema = PropertiesList(Property("foo", StringType)).to_dict() return { "streams": [ { @@ -64,6 +65,11 @@ def sample_catalog_dict() -> dict: "tap_stream_id": "foobars", "schema": foobars_schema, }, + { + "stream": "singular", + "tap_stream_id": "singular", + "schema": singular_schema, + }, ], } @@ -106,6 +112,9 @@ def sample_stream(): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [ + {"foo": "bar"}, + ], } @@ -181,6 +190,7 @@ def transformed_result(stream_map_config): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [{"foo": "bar"}], # should be unchanged } @@ -200,6 +210,9 @@ def transformed_schemas(): Property("the", StringType), Property("brown", StringType), ).to_dict(), + "singular": PropertiesList( + Property("foo", StringType), + ).to_dict(), } @@ -231,6 +244,7 @@ def cloned_and_aliased_schemas(): Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), + Property("create_date", StringType), ).to_dict() return { "repositories_aliased": properties, @@ -277,6 +291,51 @@ def filtered_schemas(): return {"repositories": PropertiesList(Property("name", StringType)).to_dict()} +# Wildcard + + +@pytest.fixture +def wildcard_stream_maps(): + return { + "*s": { + "db_name": "'database'", + }, + } + + +@pytest.fixture +def wildcard_result(sample_stream): + return { + "repositories": [ + {**record, "db_name": "database"} + for record in sample_stream["repositories"] + ], + "foobars": [ + {**record, "db_name": "database"} for record in sample_stream["foobars"] + ], + "singular": sample_stream["singular"], + } + + +@pytest.fixture +def wildcard_schemas(sample_catalog_dict): + return { + "repositories": PropertiesList( + Property("name", StringType), + Property("owner_email", StringType), + Property("description", StringType), + Property("create_date", StringType), + Property("db_name", StringType), + ).to_dict(), + "foobars": PropertiesList( + Property("the", StringType), + Property("brown", StringType), + Property("db_name", StringType), # added + ).to_dict(), + "singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged + } + + def test_map_transforms( sample_stream, sample_catalog_obj, @@ -354,6 +413,25 @@ def test_filter_transforms_w_error( ) +def test_wildcard_transforms( + sample_stream, + sample_catalog_obj, + wildcard_stream_maps, + stream_map_config, + wildcard_result, + wildcard_schemas, +): + _test_transform( + "wildcard", + stream_maps=wildcard_stream_maps, + stream_map_config=stream_map_config, + expected_result=wildcard_result, + expected_schemas=wildcard_schemas, + sample_stream=sample_stream, + sample_catalog_obj=sample_catalog_obj, + ) + + def _test_transform( test_name: str, *,