From 7eebef90191f94eea470122b800ca4ff0e762c3d Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Thu, 11 Apr 2024 10:35:05 -0600 Subject: [PATCH] feat(mappers): Added support for glob patterns in source stream names (#1888) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Don't rebuild stream_maps[stream_name] if stream is already known * Don't emit SCHEMA messages for skipped streams * Add support for glob patterns in source stream names * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix wildcard test * Fix types --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Edgar Ramírez-Mondragón Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- singer_sdk/mapper.py | 97 ++++++++++++++++++++++----------------- tests/core/test_mapper.py | 94 ++++++++++++++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 43 deletions(-) diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index cd5faf93c..bc6224d7f 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -9,6 +9,7 @@ import ast import copy import datetime +import fnmatch import hashlib import importlib.util import logging @@ -714,11 +715,14 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 if stream_name in self.stream_maps: primary_mapper = self.stream_maps[stream_name][0] if ( - primary_mapper.raw_schema != schema - or primary_mapper.raw_key_properties != key_properties + isinstance(primary_mapper, self.default_mapper_type) + and primary_mapper.raw_schema == schema + and primary_mapper.raw_key_properties == key_properties ): - # Unload/reset stream maps if schema or key properties have changed. - self.stream_maps.pop(stream_name) + return + + # Unload/reset stream maps if schema or key properties have changed. + self.stream_maps.pop(stream_name) if stream_name not in self.stream_maps: # The 0th mapper should be the same-named treatment. @@ -738,60 +742,69 @@ def register_raw_stream_schema( # noqa: PLR0912, C901 if isinstance(stream_map_val, dict) else stream_map_val ) - stream_alias: str = stream_map_key source_stream: str = stream_map_key - if isinstance(stream_def, str) and stream_def != NULL_STRING: - if stream_name == stream_map_key: - # TODO: Add any expected cases for str expressions (currently none) - pass + stream_alias: str = stream_map_key - msg = f"Option '{stream_map_key}:{stream_def}' is not expected." - raise StreamMapConfigError(msg) + is_source_stream_primary = True + if isinstance(stream_def, dict): + if MAPPER_SOURCE_OPTION in stream_def: + # : __source__: + source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) + is_source_stream_primary = False + elif MAPPER_ALIAS_OPTION in stream_def: + # : __alias__: + stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) + + if stream_name == source_stream: + # Exact match + pass + elif fnmatch.fnmatch(stream_name, source_stream): + # Wildcard match + if stream_alias == source_stream: + stream_alias = stream_name + source_stream = stream_name + else: + continue - if stream_def is None or stream_def == NULL_STRING: - if stream_name != stream_map_key: - continue + mapper: CustomStreamMap | RemoveRecordTransform - self.stream_maps[stream_map_key][0] = RemoveRecordTransform( - stream_alias=stream_map_key, + if isinstance(stream_def, dict): + mapper = CustomStreamMap( + stream_alias=stream_alias, + map_transform=stream_def, + map_config=self.map_config, + faker_config=self.faker_config, + raw_schema=schema, + key_properties=key_properties, + flattening_options=self.flattening_options, + ) + elif stream_def is None or ( + isinstance(stream_def, str) and stream_def == NULL_STRING + ): + mapper = RemoveRecordTransform( + stream_alias=stream_alias, raw_schema=schema, key_properties=None, flattening_options=self.flattening_options, ) - logging.info("Set null tansform as default for '%s'", stream_name) - continue + logging.info("Set null transform as default for '%s'", stream_name) + + elif isinstance(stream_def, str): + # Non-NULL string values are not currently supported + msg = f"Option '{stream_map_key}:{stream_def}' is not expected." + raise StreamMapConfigError(msg) - if not isinstance(stream_def, dict): + else: msg = ( f"Unexpected stream definition type. Expected str, dict, or None. " f"Got '{type(stream_def).__name__}'." ) raise StreamMapConfigError(msg) - if MAPPER_SOURCE_OPTION in stream_def: - source_stream = stream_def.pop(MAPPER_SOURCE_OPTION) - - if source_stream != stream_name: - # Not a match - continue - - if MAPPER_ALIAS_OPTION in stream_def: - stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION) - - mapper = CustomStreamMap( - stream_alias=stream_alias, - map_transform=stream_def, - map_config=self.map_config, - faker_config=self.faker_config, - raw_schema=schema, - key_properties=key_properties, - flattening_options=self.flattening_options, - ) - - if source_stream == stream_map_key: + if is_source_stream_primary: # Zero-th mapper should be the same-keyed mapper. # Override the default mapper with this custom map. - self.stream_maps[stream_name][0] = mapper + self.stream_maps[source_stream][0] = mapper else: # Additional mappers for aliasing and multi-projection: - self.stream_maps[stream_name].append(mapper) + self.stream_maps[source_stream].append(mapper) diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 2f17684d6..058099f3a 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -22,6 +22,7 @@ from singer_sdk.typing import ( ArrayType, BooleanType, + CustomType, IntegerType, NumberType, ObjectType, @@ -51,12 +52,13 @@ def sample_catalog_dict() -> dict: Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), - Property("description", StringType), + Property("create_date", StringType), ).to_dict() foobars_schema = PropertiesList( Property("the", StringType), Property("brown", StringType), ).to_dict() + singular_schema = PropertiesList(Property("foo", StringType)).to_dict() nested_jellybean_schema = PropertiesList( Property("id", IntegerType), Property( @@ -81,6 +83,11 @@ def sample_catalog_dict() -> dict: "tap_stream_id": "foobars", "schema": foobars_schema, }, + { + "stream": "singular", + "tap_stream_id": "singular", + "schema": singular_schema, + }, { "stream": "nested_jellybean", "tap_stream_id": "nested_jellybean", @@ -128,6 +135,9 @@ def sample_stream(): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [ + {"foo": "bar"}, + ], "nested_jellybean": [ { "id": 123, @@ -240,6 +250,7 @@ def transformed_result(stream_map_config): {"the": "quick"}, {"brown": "fox"}, ], + "singular": [{"foo": "bar"}], # should be unchanged "nested_jellybean": [ { "id": 123, @@ -273,6 +284,9 @@ def transformed_schemas(): Property("the", StringType), Property("brown", StringType), ).to_dict(), + "singular": PropertiesList( + Property("foo", StringType), + ).to_dict(), "nested_jellybean": PropertiesList( Property("id", IntegerType), Property("custom_field_1", StringType), @@ -310,6 +324,7 @@ def cloned_and_aliased_schemas(): Property("name", StringType), Property("owner_email", StringType), Property("description", StringType), + Property("create_date", StringType), ).to_dict() return { "repositories_aliased": properties, @@ -356,6 +371,64 @@ def filtered_schemas(): return {"repositories": PropertiesList(Property("name", StringType)).to_dict()} +# Wildcard + + +@pytest.fixture +def wildcard_stream_maps(): + return { + "*s": { + "db_name": "'database'", + }, + } + + +@pytest.fixture +def wildcard_result(sample_stream): + return { + "repositories": [ + {**record, "db_name": "database"} + for record in sample_stream["repositories"] + ], + "foobars": [ + {**record, "db_name": "database"} for record in sample_stream["foobars"] + ], + "singular": sample_stream["singular"], + "nested_jellybean": sample_stream["nested_jellybean"], + } + + +@pytest.fixture +def wildcard_schemas(): + return { + "repositories": PropertiesList( + Property("name", StringType), + Property("owner_email", StringType), + Property("description", StringType), + Property("create_date", StringType), + Property("db_name", StringType), + ).to_dict(), + "foobars": PropertiesList( + Property("the", StringType), + Property("brown", StringType), + Property("db_name", StringType), # added + ).to_dict(), + "singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged + "nested_jellybean": PropertiesList( # unchanged + Property("id", IntegerType), + Property( + "custom_fields", + ArrayType( + ObjectType( + Property("id", IntegerType), + Property("value", CustomType({})), + ), + ), + ), + ).to_dict(), + } + + def test_map_transforms( sample_stream, sample_catalog_obj, @@ -433,6 +506,25 @@ def test_filter_transforms_w_error( ) +def test_wildcard_transforms( + sample_stream, + sample_catalog_obj, + wildcard_stream_maps, + stream_map_config, + wildcard_result, + wildcard_schemas, +): + _test_transform( + "wildcard", + stream_maps=wildcard_stream_maps, + stream_map_config=stream_map_config, + expected_result=wildcard_result, + expected_schemas=wildcard_schemas, + sample_stream=sample_stream, + sample_catalog_obj=sample_catalog_obj, + ) + + def _run_transform( *, stream_maps,