Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/catalog replication key #23

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Stream(BaseModel):
stream: str = Field(alias="tap_stream_id")
table_name: Optional[str] = None
replication_method: Optional[str] = "FULL_TABLE"
replication_key: Optional[str] = None
key_properties: List[str]
stream_schema: dict = Field(alias="schema")
is_view: Optional[bool] = False
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fixtures.singer import singer
from fixtures.tap import tap
from fixtures.tap import tap, tap_incremental
from fixtures.target import target
from fixtures.runner import runner
from fixtures.state import state_manager
12 changes: 12 additions & 0 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,15 @@ def tap() -> Generator[Tap, None, None]:
],
},
)


@pytest.fixture(scope="session")
def tap_incremental() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
"""
yield Tap(
executable="tap-mock-incremental",
spec="git+https://github.com/quantile-taps/tap-mock-incremental.git",
config={},
)
96 changes: 96 additions & 0 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{
"tap_stream_id": "animals",
"replication_method": "FULL_TABLE",
"replication_key": None,
"table_name": None,
"is_view": False,
"key_properties": [],
Expand Down Expand Up @@ -56,6 +57,54 @@
]
}

INCREMENTAL_CATALOG = {
"streams": [
{
"tap_stream_id": "users",
"replication_method": "INCREMENTAL",
"replication_key": "updated_at",
"table_name": None,
"is_view": False,
"key_properties": ["id"],
"schema": {
"properties": {
"id": {"type": ["string", "null"]},
"name": {"type": ["string", "null"]},
"updated_at": {
"format": "date-time",
"type": ["string", "null"],
},
},
"type": "object",
},
"metadata": [
{
"breadcrumb": ["properties", "id"],
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": ["properties", "name"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "updated_at"],
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
},
},
],
}
]
}


def test_catalog(tap: Tap):
"""
Expand Down Expand Up @@ -103,3 +152,50 @@ def test_catalog_deselect_property(tap: Tap):
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False


def test_catalog_replication_method(tap_incremental: Tap):
"""If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

replication_method = INCREMENTAL_CATALOG["streams"][0]["replication_method"]
assert replication_method == "INCREMENTAL"
assert catalog_dict["streams"][0]["replication_method"] == "INCREMENTAL"
BernardWez marked this conversation as resolved.
Show resolved Hide resolved


def test_catalog_replication_key(tap_incremental: Tap):
"""If we have an incremental stream, the catalog should have a `replication_key`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

replication_key = INCREMENTAL_CATALOG["streams"][0]["replication_key"]
assert replication_key == "updated_at"
assert catalog_dict["streams"][0]["replication_key"] == replication_key
BernardWez marked this conversation as resolved.
Show resolved Hide resolved


def test_catalog_valid_replication_keys(tap_incremental: Tap):
"""
If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental
stream containing the key: `valid-replication-keys`.

This key should be associated with a list containing the fields that could be used as replication keys.
For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key.

"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
}
"""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get(
"valid-replication-keys", None
)

# Checks that the `valid-replication-keys` key exists
assert replication_keys != None

# Checks that value of `valid-replication-keys` equals to the replication-key
assert replication_keys == ["updated_at"]
BernardWez marked this conversation as resolved.
Show resolved Hide resolved
Loading