From fdf6ff75c2d2f6a04b085ec204a626343a3a05ad Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 23 Nov 2023 17:24:34 +0100 Subject: [PATCH 1/7] Add replication_keys to Tap config --- elx/tap.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/elx/tap.py b/elx/tap.py index da352fc..af4bc11 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -17,9 +17,11 @@ def __init__( executable: str | None = None, config: dict = {}, deselected: List[str] = None, + replication_keys: dict = {}, ): super().__init__(spec, executable, config) self.deselected = deselected + self.replication_keys = replication_keys def discover(self, config_path: Path) -> dict: """ @@ -45,7 +47,11 @@ def catalog(self) -> Catalog: with json_temp_file(self.config) as config_path: catalog = self.discover(config_path) catalog = Catalog(**catalog) - return catalog.deselect(patterns=self.deselected) + catalog = catalog.deselect(patterns=self.deselected) + catalog = catalog.set_replication_keys( + replication_keys=self.replication_keys + ) + return catalog @contextlib.asynccontextmanager @require_install From 615dfa1a0715d0d446c867cd30a0f2edba7e36a8 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 23 Nov 2023 17:25:04 +0100 Subject: [PATCH 2/7] Add Catalog method for setting replication_keys --- elx/catalog.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/elx/catalog.py b/elx/catalog.py index 20109be..3706eb3 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -158,3 +158,45 @@ def select(self, streams: Optional[List[str]]) -> "Catalog": ) return catalog + + def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": + """ + Set the replication key for streams and updates the catalog. + + Args: + keys (Optional[dict]): Dictionary stream replication_key value-pairs. + E.g. {"stream_one": "updated_at", "stream_two": "modified_at"} + + Returns: + Catalog: A new catalog with updated replication settings. + """ + # Make a copy of the existing catalog. + catalog = self.copy(deep=True) + + # Loop over the streams + for stream in catalog.streams: + # If the stream is specified in `replication_keys` dictionary + if stream.tap_stream_id in replication_keys: + # Set the replication method to INCREMENTAL + stream.replication_method = "INCREMENTAL" + + # Update the replication key value + stream.replication_key = replication_keys[stream.tap_stream_id] + + # Add the replication key to stream metadata + stream.upsert_metadata( + breadcrumb=[], + metadata={ + "valid-replication-keys": [ + replication_keys[stream.tap_stream_id] + ] + }, + ) + + # Set inclusion of replication property metadata to `automatic` + stream.upsert_metadata( + breadcrumb=["properties", replication_keys[stream.tap_stream_id]], + metadata={"inclusion": "automatic"}, + ) + + return catalog From e7668c0fc7e9ba28ee494f0e1af708faf69da46a Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 23 Nov 2023 17:25:15 +0100 Subject: [PATCH 3/7] Update tap fixture --- tests/fixtures/tap.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index b334d9b..b7981b0 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -12,4 +12,5 @@ def tap() -> Generator[Tap, None, None]: executable="tap-mock-fixture", spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", config={}, + replication_keys={"users": "updated_at"}, ) From 5c41aa88095a12a81a96acbc2b09f9a90ba699a0 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 23 Nov 2023 17:25:44 +0100 Subject: [PATCH 4/7] Add test for setting replication key --- tests/test_elx/test_catalog.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 90e3792..a0ef4d0 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -178,3 +178,11 @@ def test_catalog_valid_replication_keys(tap: Tap): # Checks that value of `valid-replication-keys` equals to the replication-key assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]] + + +def test_catalog_set_stream_replication_key(tap: Tap): + """If we define a replication key, the catalog should be updated.""" + catalog = tap.catalog + + assert catalog.streams[1].replication_method == "INCREMENTAL" + assert catalog.streams[1].replication_key == "updated_at" From f3c1fe511000393270184d22006d90db73ff6861 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 24 Nov 2023 09:50:38 +0100 Subject: [PATCH 5/7] Small refactor `set_replication_keys` method --- elx/catalog.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/elx/catalog.py b/elx/catalog.py index 3706eb3..b1be63b 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -180,22 +180,21 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": # Set the replication method to INCREMENTAL stream.replication_method = "INCREMENTAL" + # The replication key for current stream + replication_key = replication_keys[stream.tap_stream_id] + # Update the replication key value - stream.replication_key = replication_keys[stream.tap_stream_id] + stream.replication_key = replication_key # Add the replication key to stream metadata stream.upsert_metadata( breadcrumb=[], - metadata={ - "valid-replication-keys": [ - replication_keys[stream.tap_stream_id] - ] - }, + metadata={"valid-replication-keys": [replication_key]}, ) # Set inclusion of replication property metadata to `automatic` stream.upsert_metadata( - breadcrumb=["properties", replication_keys[stream.tap_stream_id]], + breadcrumb=["properties", replication_key], metadata={"inclusion": "automatic"}, ) From bd524ef92d438b5e2f716aadd1c2297ce94190a1 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 24 Nov 2023 09:53:04 +0100 Subject: [PATCH 6/7] Remove metadata upsert for valid-replication-keys --- elx/catalog.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/elx/catalog.py b/elx/catalog.py index b1be63b..44b488c 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -186,12 +186,6 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": # Update the replication key value stream.replication_key = replication_key - # Add the replication key to stream metadata - stream.upsert_metadata( - breadcrumb=[], - metadata={"valid-replication-keys": [replication_key]}, - ) - # Set inclusion of replication property metadata to `automatic` stream.upsert_metadata( breadcrumb=["properties", replication_key], From 02b149705655438c00bb85fa9e2d7ed534479a65 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 24 Nov 2023 10:01:34 +0100 Subject: [PATCH 7/7] Update & remove test relating to valid-replication-keys --- tests/test_elx/test_catalog.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index a0ef4d0..2db0d64 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -78,7 +78,6 @@ "selected": True, "selected-by-default": True, "table-key-properties": ["id"], - "valid-replication-keys": ["updated_at"], }, }, ], @@ -154,32 +153,6 @@ def test_catalog_replication_key(tap: Tap): ) -def test_catalog_valid_replication_keys(tap: Tap): - """ - If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental - stream containing the key: `valid-replication-keys`. - - This key should be associated with a list containing the fields that could be used as replication keys. - For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key. - - "metadata": { - "inclusion": "available", - "selected": True, - "selected-by-default": True, - "table-key-properties": ["id"], - "valid-replication-keys": ["updated_at"], - } - """ - catalog_dict = tap.catalog.dict(by_alias=True) - - replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get( - "valid-replication-keys", None - ) - - # Checks that value of `valid-replication-keys` equals to the replication-key - assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]] - - def test_catalog_set_stream_replication_key(tap: Tap): """If we define a replication key, the catalog should be updated.""" catalog = tap.catalog