From 626dd80a4188d4f356c68404b2db8e93c1183845 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Mon, 29 Jan 2024 15:47:43 +0100 Subject: [PATCH 01/11] Add test for adding custom property to catalog --- tests/test_elx/test_catalog.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 2db0d64..5c2bf17 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -159,3 +159,23 @@ def test_catalog_set_stream_replication_key(tap: Tap): assert catalog.streams[1].replication_method == "INCREMENTAL" assert catalog.streams[1].replication_key == "updated_at" + + +def test_catalog_add_custom_property(tap: Tap): + """If we add a custom property, the catalog should be updated.""" + schema = { + "users": { + "custom_property": { + "type": "string", + }, + } + } + + # Add a custom property to the tap schema + tap.schema = schema + + # Verify that the custom property is in the catalog + assert ( + tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "users"]) + != None + ) From e60d9cad9172fe1b2b8d4c5f6d9da5cd2029039d Mon Sep 17 00:00:00 2001 From: BernardWez Date: Mon, 29 Jan 2024 16:29:53 +0100 Subject: [PATCH 02/11] Add method to add custom properties to stream schema and metadata --- elx/catalog.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/elx/catalog.py b/elx/catalog.py index 44b488c..b7112b6 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -193,3 +193,30 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": ) return catalog + + def add_custom_properties(self, properties: Optional[dict]) -> "Catalog": + """ + Adds custom properties to stream schema and metadata. + """ + # Make a copy of the existing catalog. + catalog = self.copy(deep=True) + + # Loop over the streams + for stream in catalog.streams: + # If the stream is specified in `properties` dictionary + if stream.tap_stream_id in properties: + # Get the custom properties for the current stream + custom_properties = properties[stream.tap_stream_id] + + # Loop over each custom property + for property_name, property_typing in custom_properties.items(): + # Add property to the stream schema with + stream.stream_schema["properties"][property_name] = property_typing + + # Add property to metadata and mark as selected + stream.upsert_metadata( + breadcrumb=["properties", property_name], + metadata={"selected": True}, + ) + + return catalog From cae52c16e2a0d72efe922518ba45065c6364bea4 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Mon, 29 Jan 2024 16:30:20 +0100 Subject: [PATCH 03/11] Add custom properties to catalog in Tap class --- elx/tap.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/elx/tap.py b/elx/tap.py index af4bc11..2c09514 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -18,10 +18,12 @@ def __init__( config: dict = {}, deselected: List[str] = None, replication_keys: dict = {}, + custom_properties: dict = {}, ): super().__init__(spec, executable, config) self.deselected = deselected self.replication_keys = replication_keys + self.custom_properties = custom_properties def discover(self, config_path: Path) -> dict: """ @@ -51,6 +53,7 @@ def catalog(self) -> Catalog: catalog = catalog.set_replication_keys( replication_keys=self.replication_keys ) + catalog = catalog.add_custom_properties(properties=self.custom_properties) return catalog @contextlib.asynccontextmanager From 9b5f25fbf90eee70629ed5dcb5246163d12762e8 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Mon, 29 Jan 2024 16:31:09 +0100 Subject: [PATCH 04/11] Update test for adding custom properties to catalog --- tests/test_elx/test_catalog.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 5c2bf17..3b55471 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -163,7 +163,7 @@ def test_catalog_set_stream_replication_key(tap: Tap): def test_catalog_add_custom_property(tap: Tap): """If we add a custom property, the catalog should be updated.""" - schema = { + custom_properties = { "users": { "custom_property": { "type": "string", @@ -172,10 +172,15 @@ def test_catalog_add_custom_property(tap: Tap): } # Add a custom property to the tap schema - tap.schema = schema + tap.custom_properties = custom_properties - # Verify that the custom property is in the catalog + # Verify that the custom property is in the metadata of the catalog assert ( - tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "users"]) + tap.catalog.streams[1].find_metadata_by_breadcrumb( + ["properties", "custom_property"] + ) != None ) + + # Verify that the custom property is in the schema of the catalog + assert "custom_property" in tap.catalog.streams[1].stream_schema["properties"] From 004a1e98bf08e28b72a8a28f63bbe5dec44271e6 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 30 Jan 2024 12:23:52 +0100 Subject: [PATCH 05/11] Add debug print statements to Tap class --- elx/tap.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/elx/tap.py b/elx/tap.py index 2c09514..7ce32c0 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -104,6 +104,13 @@ def invoke( catalog = self.catalog.select(streams=streams) + for stream in catalog.streams: + print(stream.name) + print("metadata") + print(stream.metadata) + print("schema") + print(stream.stream_schema) + with json_temp_file(self.config) as config_path: with json_temp_file(catalog.dict(by_alias=True)) as catalog_path: with json_temp_file({}) as state_path: From 0732e57550a4aa534d67be397b50e47faa727983 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 30 Jan 2024 12:46:34 +0100 Subject: [PATCH 06/11] Refactor method names and variable names in catalog.py and tap.py --- elx/catalog.py | 2 +- elx/tap.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/elx/catalog.py b/elx/catalog.py index b7112b6..908aab7 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -194,7 +194,7 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": return catalog - def add_custom_properties(self, properties: Optional[dict]) -> "Catalog": + def add_properties_to_schema(self, properties: Optional[dict]) -> "Catalog": """ Adds custom properties to stream schema and metadata. """ diff --git a/elx/tap.py b/elx/tap.py index 7ce32c0..75990a9 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -18,12 +18,12 @@ def __init__( config: dict = {}, deselected: List[str] = None, replication_keys: dict = {}, - custom_properties: dict = {}, + schema: dict = {}, ): super().__init__(spec, executable, config) self.deselected = deselected self.replication_keys = replication_keys - self.custom_properties = custom_properties + self.schema = schema def discover(self, config_path: Path) -> dict: """ @@ -53,7 +53,7 @@ def catalog(self) -> Catalog: catalog = catalog.set_replication_keys( replication_keys=self.replication_keys ) - catalog = catalog.add_custom_properties(properties=self.custom_properties) + catalog = catalog.add_properties_to_schema(properties=self.schema) return catalog @contextlib.asynccontextmanager From 16f5a9e0347bd1cc25231c57e8a555f98689e781 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 30 Jan 2024 13:03:15 +0100 Subject: [PATCH 07/11] Add logging --- elx/tap.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/elx/tap.py b/elx/tap.py index 75990a9..91572db 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -106,11 +106,14 @@ def invoke( for stream in catalog.streams: print(stream.name) + print() print("metadata") print(stream.metadata) print("schema") print(stream.stream_schema) + print(catalog.dict(by_alias=True)) + with json_temp_file(self.config) as config_path: with json_temp_file(catalog.dict(by_alias=True)) as catalog_path: with json_temp_file({}) as state_path: From 5c805294dbec0cbc4a3aad6544b81cbb8341f9a3 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 13 Feb 2024 15:37:59 +0100 Subject: [PATCH 08/11] Pytest tap fixture to function scope --- tests/fixtures/tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index b7981b0..66b906c 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -3,7 +3,7 @@ from elx import Tap -@pytest.fixture(scope="session") +@pytest.fixture() def tap() -> Generator[Tap, None, None]: """ Return a Tap instance for the executable with an incremental stream. From 005c0ba899eab93da29f8a7c03bf5eb01d5f8088 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 13 Feb 2024 15:38:35 +0100 Subject: [PATCH 09/11] Add catalog tests for adding custom properties to schema --- tests/test_elx/test_catalog.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 3b55471..9682635 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -172,7 +172,7 @@ def test_catalog_add_custom_property(tap: Tap): } # Add a custom property to the tap schema - tap.custom_properties = custom_properties + tap.schema = custom_properties # Verify that the custom property is in the metadata of the catalog assert ( @@ -184,3 +184,31 @@ def test_catalog_add_custom_property(tap: Tap): # Verify that the custom property is in the schema of the catalog assert "custom_property" in tap.catalog.streams[1].stream_schema["properties"] + + +def test_catalog_add_nested_custom_property(tap: Tap): + """If we add a custom property, the catalog should be updated.""" + custom_properties = { + "users": { + "items": { + "properties": { + "custom_property": { + "type": "string", + }, + }, + "type": "object", + } + } + } + + # Add a custom property to the tap schema + tap.schema = custom_properties + + # Verify that the custom property is in the metadata of the catalog + assert ( + tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "items"]) + != None + ) + + # Verify that the custom property is in the schema of the catalog + assert "items" in tap.catalog.streams[1].stream_schema["properties"] From 8d364ba8ede6f5c490118c5541cff6a68a7b7b36 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 13 Feb 2024 15:48:10 +0100 Subject: [PATCH 10/11] Remove print statements for debugging --- elx/tap.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/elx/tap.py b/elx/tap.py index 323ad5b..b74ebb9 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -105,16 +105,6 @@ def invoke( catalog = self.catalog.select(streams=streams) - for stream in catalog.streams: - print(stream.name) - print() - print("metadata") - print(stream.metadata) - print("schema") - print(stream.stream_schema) - - print(catalog.dict(by_alias=True)) - with json_temp_file(self.config) as config_path: with json_temp_file(catalog.dict(by_alias=True)) as catalog_path: with json_temp_file({}) as state_path: From bb514b3dbc72a2cd6b91a57f9e0b175d90ddf7d5 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Tue, 13 Feb 2024 16:00:25 +0100 Subject: [PATCH 11/11] Refactor `add_properties_to_schema` method --- elx/catalog.py | 46 ++++++++++++++++++++++++++++------------------ elx/tap.py | 2 +- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/elx/catalog.py b/elx/catalog.py index 908aab7..67fb1db 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -1,5 +1,6 @@ from typing import List, Optional, Tuple from pydantic import BaseModel, Field +import logging class Stream(BaseModel): @@ -194,29 +195,38 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": return catalog - def add_properties_to_schema(self, properties: Optional[dict]) -> "Catalog": + def add_properties_to_schema(self, custom_schema: Optional[dict]) -> "Catalog": """ Adds custom properties to stream schema and metadata. """ # Make a copy of the existing catalog. catalog = self.copy(deep=True) - # Loop over the streams - for stream in catalog.streams: - # If the stream is specified in `properties` dictionary - if stream.tap_stream_id in properties: - # Get the custom properties for the current stream - custom_properties = properties[stream.tap_stream_id] - - # Loop over each custom property - for property_name, property_typing in custom_properties.items(): - # Add property to the stream schema with - stream.stream_schema["properties"][property_name] = property_typing - - # Add property to metadata and mark as selected - stream.upsert_metadata( - breadcrumb=["properties", property_name], - metadata={"selected": True}, - ) + # Loop over the streams referenced in the `custom_schema` + for stream_name in custom_schema.keys(): + # Find the stream + stream = catalog.find_stream(stream_name) + + # If the stream is not found, skip it + if stream is None: + # Log warning about an invalid stream name in the schema configuration + logging.warning( + f"Found stream `{stream_name}` in the `schema` definition that does not exist in the catalog." + ) + continue + + # Get the custom properties for the current stream + custom_properties = custom_schema[stream.tap_stream_id] + + # Loop over each custom property + for property_name, property_typing in custom_properties.items(): + # Add property to the stream schema with + stream.stream_schema["properties"][property_name] = property_typing + + # Add property to metadata and mark as selected + stream.upsert_metadata( + breadcrumb=["properties", property_name], + metadata={"selected": True}, + ) return catalog diff --git a/elx/tap.py b/elx/tap.py index b74ebb9..050cf34 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -53,7 +53,7 @@ def catalog(self) -> Catalog: catalog = catalog.set_replication_keys( replication_keys=self.replication_keys ) - catalog = catalog.add_properties_to_schema(properties=self.schema) + catalog = catalog.add_properties_to_schema(custom_schema=self.schema) return catalog @contextlib.asynccontextmanager