Skip to content

Commit

Permalink
Merge pull request #23 from quantile-development/feature/catalog-repl…
Browse files Browse the repository at this point in the history
…ication-key

Feature/catalog replication key
  • Loading branch information
BernardWez authored Nov 21, 2023
2 parents 1010207 + de7ce2f commit 9a65d15
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 1 deletion.
1 change: 1 addition & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Stream(BaseModel):
stream: str = Field(alias="tap_stream_id")
table_name: Optional[str] = None
replication_method: Optional[str] = "FULL_TABLE"
replication_key: Optional[str] = None
key_properties: List[str]
stream_schema: dict = Field(alias="schema")
is_view: Optional[bool] = False
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fixtures.singer import singer
from fixtures.tap import tap
from fixtures.tap import tap, tap_incremental
from fixtures.target import target
from fixtures.runner import runner
from fixtures.state import state_manager
12 changes: 12 additions & 0 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,15 @@ def tap() -> Generator[Tap, None, None]:
],
},
)


@pytest.fixture(scope="session")
def tap_incremental() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
"""
yield Tap(
executable="tap-mock-incremental",
spec="git+https://github.com/quantile-taps/tap-mock-incremental.git",
config={},
)
95 changes: 95 additions & 0 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{
"tap_stream_id": "animals",
"replication_method": "FULL_TABLE",
"replication_key": None,
"table_name": None,
"is_view": False,
"key_properties": [],
Expand Down Expand Up @@ -56,6 +57,54 @@
]
}

INCREMENTAL_CATALOG = {
"streams": [
{
"tap_stream_id": "users",
"replication_method": "INCREMENTAL",
"replication_key": "updated_at",
"table_name": None,
"is_view": False,
"key_properties": ["id"],
"schema": {
"properties": {
"id": {"type": ["string", "null"]},
"name": {"type": ["string", "null"]},
"updated_at": {
"format": "date-time",
"type": ["string", "null"],
},
},
"type": "object",
},
"metadata": [
{
"breadcrumb": ["properties", "id"],
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": ["properties", "name"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "updated_at"],
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
},
},
],
}
]
}


def test_catalog(tap: Tap):
"""
Expand Down Expand Up @@ -103,3 +152,49 @@ def test_catalog_deselect_property(tap: Tap):
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False


def test_catalog_replication_method(tap_incremental: Tap):
"""If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

assert (
catalog_dict["streams"][0]["replication_method"]
== INCREMENTAL_CATALOG["streams"][0]["replication_method"]
)


def test_catalog_replication_key(tap_incremental: Tap):
"""If we have an incremental stream, the catalog should have a `replication_key`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

assert (
catalog_dict["streams"][0]["replication_key"]
== INCREMENTAL_CATALOG["streams"][0]["replication_key"]
)


def test_catalog_valid_replication_keys(tap_incremental: Tap):
"""
If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental
stream containing the key: `valid-replication-keys`.
This key should be associated with a list containing the fields that could be used as replication keys.
For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key.
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": ["id"],
"valid-replication-keys": ["updated_at"],
}
"""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)

replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get(
"valid-replication-keys", None
)

# Checks that value of `valid-replication-keys` equals to the replication-key
assert replication_keys == [INCREMENTAL_CATALOG["streams"][0]["replication_key"]]

0 comments on commit 9a65d15

Please sign in to comment.