diff --git a/elx/catalog.py b/elx/catalog.py index d09d7ce..d4e1caa 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -7,6 +7,7 @@ class Stream(BaseModel): stream: str = Field(alias="tap_stream_id") table_name: Optional[str] = None replication_method: Optional[str] = "FULL_TABLE" + replication_key: Optional[str] = None key_properties: List[str] stream_schema: dict = Field(alias="schema") is_view: Optional[bool] = False diff --git a/tests/conftest.py b/tests/conftest.py index 746ec38..067a7c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ from fixtures.singer import singer -from fixtures.tap import tap +from fixtures.tap import tap, tap_incremental from fixtures.target import target from fixtures.runner import runner from fixtures.state import state_manager diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index c16b870..5780457 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -20,3 +20,15 @@ def tap() -> Generator[Tap, None, None]: ], }, ) + + +@pytest.fixture(scope="session") +def tap_incremental() -> Generator[Tap, None, None]: + """ + Return a Tap instance for the executable with an incremental stream. + """ + yield Tap( + executable="tap-mock-incremental", + spec="git+https://github.com/quantile-taps/tap-mock-incremental.git", + config={}, + ) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index b5f2d1c..ab4b026 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -7,6 +7,7 @@ { "tap_stream_id": "animals", "replication_method": "FULL_TABLE", + "replication_key": None, "table_name": None, "is_view": False, "key_properties": [], @@ -56,6 +57,54 @@ ] } +INCREMENTAL_CATALOG = { + "streams": [ + { + "tap_stream_id": "users", + "replication_method": "INCREMENTAL", + "replication_key": "updated_at", + "table_name": None, + "is_view": False, + "key_properties": ["id"], + "schema": { + "properties": { + "id": {"type": ["string", "null"]}, + "name": {"type": ["string", "null"]}, + "updated_at": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + "type": "object", + }, + "metadata": [ + { + "breadcrumb": ["properties", "id"], + "metadata": {"inclusion": "automatic"}, + }, + { + "breadcrumb": ["properties", "name"], + "metadata": {"inclusion": "available"}, + }, + { + "breadcrumb": ["properties", "updated_at"], + "metadata": {"inclusion": "automatic"}, + }, + { + "breadcrumb": [], + "metadata": { + "inclusion": "available", + "selected": True, + "selected-by-default": True, + "table-key-properties": ["id"], + "valid-replication-keys": ["updated_at"], + }, + }, + ], + } + ] +} + def test_catalog(tap: Tap): """ @@ -103,3 +152,49 @@ def test_catalog_deselect_property(tap: Tap): catalog_dict = catalog.dict(by_alias=True) assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False + + +def test_catalog_replication_method(tap_incremental: Tap): + """If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`.""" + catalog_dict = tap_incremental.catalog.dict(by_alias=True) + + assert ( + catalog_dict["streams"][0]["replication_method"] + == INCREMENTAL_CATALOG["streams"][0]["replication_method"] + ) + + +def test_catalog_replication_key(tap_incremental: Tap): + """If we have an incremental stream, the catalog should have a `replication_key`.""" + catalog_dict = tap_incremental.catalog.dict(by_alias=True) + + assert ( + catalog_dict["streams"][0]["replication_key"] + == INCREMENTAL_CATALOG["streams"][0]["replication_key"] + ) + + +def test_catalog_valid_replication_keys(tap_incremental: Tap): + """ + If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental + stream containing the key: `valid-replication-keys`. + + This key should be associated with a list containing the fields that could be used as replication keys. + For example, the metadata breadcrumb of the stream should look as follows if `updated_at` is its replication_key. + + "metadata": { + "inclusion": "available", + "selected": True, + "selected-by-default": True, + "table-key-properties": ["id"], + "valid-replication-keys": ["updated_at"], + } + """ + catalog_dict = tap_incremental.catalog.dict(by_alias=True) + + replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get( + "valid-replication-keys", None + ) + + # Checks that value of `valid-replication-keys` equals to the replication-key + assert replication_keys == [INCREMENTAL_CATALOG["streams"][0]["replication_key"]]