Skip to content

Commit

Permalink
Merge pull request #25 from quantile-development/feature/replication-…
Browse files Browse the repository at this point in the history
…keys

Feature/replication keys
  • Loading branch information
BernardWez authored Nov 24, 2023
2 parents eaec5ac + 02b1497 commit 887da30
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
35 changes: 35 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,38 @@ def select(self, streams: Optional[List[str]]) -> "Catalog":
)

return catalog

def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog":
"""
Set the replication key for streams and updates the catalog.
Args:
keys (Optional[dict]): Dictionary stream replication_key value-pairs.
E.g. {"stream_one": "updated_at", "stream_two": "modified_at"}
Returns:
Catalog: A new catalog with updated replication settings.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Loop over the streams
for stream in catalog.streams:
# If the stream is specified in `replication_keys` dictionary
if stream.tap_stream_id in replication_keys:
# Set the replication method to INCREMENTAL
stream.replication_method = "INCREMENTAL"

# The replication key for current stream
replication_key = replication_keys[stream.tap_stream_id]

# Update the replication key value
stream.replication_key = replication_key

# Set inclusion of replication property metadata to `automatic`
stream.upsert_metadata(
breadcrumb=["properties", replication_key],
metadata={"inclusion": "automatic"},
)

return catalog
8 changes: 7 additions & 1 deletion elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ def __init__(
executable: str | None = None,
config: dict = {},
deselected: List[str] = None,
replication_keys: dict = {},
):
super().__init__(spec, executable, config)
self.deselected = deselected
self.replication_keys = replication_keys

def discover(self, config_path: Path) -> dict:
"""
Expand All @@ -45,7 +47,11 @@ def catalog(self) -> Catalog:
with json_temp_file(self.config) as config_path:
catalog = self.discover(config_path)
catalog = Catalog(**catalog)
return catalog.deselect(patterns=self.deselected)
catalog = catalog.deselect(patterns=self.deselected)
catalog = catalog.set_replication_keys(
replication_keys=self.replication_keys
)
return catalog

@contextlib.asynccontextmanager
@require_install
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ def tap() -> Generator[Tap, None, None]:
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={},
replication_keys={"users": "updated_at"},
)
29 changes: 5 additions & 24 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
},
},
],
Expand Down Expand Up @@ -154,27 +153,9 @@ def test_catalog_replication_key(tap: Tap):
)


def test_catalog_valid_replication_keys(tap: Tap):
"""
If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental
stream containing the key: `valid-replication-keys`.
This key should be associated with a list containing the fields that could be used as replication keys.
For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key.
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
}
"""
catalog_dict = tap.catalog.dict(by_alias=True)

replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get(
"valid-replication-keys", None
)
def test_catalog_set_stream_replication_key(tap: Tap):
"""If we define a replication key, the catalog should be updated."""
catalog = tap.catalog

# Checks that value of `valid-replication-keys` equals to the replication-key
assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]]
assert catalog.streams[1].replication_method == "INCREMENTAL"
assert catalog.streams[1].replication_key == "updated_at"

0 comments on commit 887da30

Please sign in to comment.