diff --git a/elx/catalog.py b/elx/catalog.py index 20109be..44b488c 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -158,3 +158,38 @@ def select(self, streams: Optional[List[str]]) -> "Catalog": ) return catalog + + def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": + """ + Set the replication key for streams and updates the catalog. + + Args: + keys (Optional[dict]): Dictionary stream replication_key value-pairs. + E.g. {"stream_one": "updated_at", "stream_two": "modified_at"} + + Returns: + Catalog: A new catalog with updated replication settings. + """ + # Make a copy of the existing catalog. + catalog = self.copy(deep=True) + + # Loop over the streams + for stream in catalog.streams: + # If the stream is specified in `replication_keys` dictionary + if stream.tap_stream_id in replication_keys: + # Set the replication method to INCREMENTAL + stream.replication_method = "INCREMENTAL" + + # The replication key for current stream + replication_key = replication_keys[stream.tap_stream_id] + + # Update the replication key value + stream.replication_key = replication_key + + # Set inclusion of replication property metadata to `automatic` + stream.upsert_metadata( + breadcrumb=["properties", replication_key], + metadata={"inclusion": "automatic"}, + ) + + return catalog diff --git a/elx/tap.py b/elx/tap.py index da352fc..af4bc11 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -17,9 +17,11 @@ def __init__( executable: str | None = None, config: dict = {}, deselected: List[str] = None, + replication_keys: dict = {}, ): super().__init__(spec, executable, config) self.deselected = deselected + self.replication_keys = replication_keys def discover(self, config_path: Path) -> dict: """ @@ -45,7 +47,11 @@ def catalog(self) -> Catalog: with json_temp_file(self.config) as config_path: catalog = self.discover(config_path) catalog = Catalog(**catalog) - return catalog.deselect(patterns=self.deselected) + catalog = catalog.deselect(patterns=self.deselected) + catalog = catalog.set_replication_keys( + replication_keys=self.replication_keys + ) + return catalog @contextlib.asynccontextmanager @require_install diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index b334d9b..b7981b0 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -12,4 +12,5 @@ def tap() -> Generator[Tap, None, None]: executable="tap-mock-fixture", spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", config={}, + replication_keys={"users": "updated_at"}, ) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 90e3792..2db0d64 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -78,7 +78,6 @@ "selected": True, "selected-by-default": True, "table-key-properties": ["id"], - "valid-replication-keys": ["updated_at"], }, }, ], @@ -154,27 +153,9 @@ def test_catalog_replication_key(tap: Tap): ) -def test_catalog_valid_replication_keys(tap: Tap): - """ - If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental - stream containing the key: `valid-replication-keys`. - - This key should be associated with a list containing the fields that could be used as replication keys. - For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key. - - "metadata": { - "inclusion": "available", - "selected": True, - "selected-by-default": True, - "table-key-properties": ["id"], - "valid-replication-keys": ["updated_at"], - } - """ - catalog_dict = tap.catalog.dict(by_alias=True) - - replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get( - "valid-replication-keys", None - ) +def test_catalog_set_stream_replication_key(tap: Tap): + """If we define a replication key, the catalog should be updated.""" + catalog = tap.catalog - # Checks that value of `valid-replication-keys` equals to the replication-key - assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]] + assert catalog.streams[1].replication_method == "INCREMENTAL" + assert catalog.streams[1].replication_key == "updated_at"