diff --git a/elx/catalog.py b/elx/catalog.py index 44b488c..67fb1db 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -1,5 +1,6 @@ from typing import List, Optional, Tuple from pydantic import BaseModel, Field +import logging class Stream(BaseModel): @@ -193,3 +194,39 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog": ) return catalog + + def add_properties_to_schema(self, custom_schema: Optional[dict]) -> "Catalog": + """ + Adds custom properties to stream schema and metadata. + """ + # Make a copy of the existing catalog. + catalog = self.copy(deep=True) + + # Loop over the streams referenced in the `custom_schema` + for stream_name in custom_schema.keys(): + # Find the stream + stream = catalog.find_stream(stream_name) + + # If the stream is not found, skip it + if stream is None: + # Log warning about an invalid stream name in the schema configuration + logging.warning( + f"Found stream `{stream_name}` in the `schema` definition that does not exist in the catalog." + ) + continue + + # Get the custom properties for the current stream + custom_properties = custom_schema[stream.tap_stream_id] + + # Loop over each custom property + for property_name, property_typing in custom_properties.items(): + # Add property to the stream schema with + stream.stream_schema["properties"][property_name] = property_typing + + # Add property to metadata and mark as selected + stream.upsert_metadata( + breadcrumb=["properties", property_name], + metadata={"selected": True}, + ) + + return catalog diff --git a/elx/tap.py b/elx/tap.py index 417e775..050cf34 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -18,10 +18,12 @@ def __init__( config: dict = {}, deselected: List[str] = None, replication_keys: dict = {}, + schema: dict = {}, ): super().__init__(spec, executable, config) self.deselected = deselected self.replication_keys = replication_keys + self.schema = schema def discover(self, config_path: Path) -> dict: """ @@ -51,6 +53,7 @@ def catalog(self) -> Catalog: catalog = catalog.set_replication_keys( replication_keys=self.replication_keys ) + catalog = catalog.add_properties_to_schema(custom_schema=self.schema) return catalog @contextlib.asynccontextmanager diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index b7981b0..66b906c 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -3,7 +3,7 @@ from elx import Tap -@pytest.fixture(scope="session") +@pytest.fixture() def tap() -> Generator[Tap, None, None]: """ Return a Tap instance for the executable with an incremental stream. diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 2db0d64..9682635 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -159,3 +159,56 @@ def test_catalog_set_stream_replication_key(tap: Tap): assert catalog.streams[1].replication_method == "INCREMENTAL" assert catalog.streams[1].replication_key == "updated_at" + + +def test_catalog_add_custom_property(tap: Tap): + """If we add a custom property, the catalog should be updated.""" + custom_properties = { + "users": { + "custom_property": { + "type": "string", + }, + } + } + + # Add a custom property to the tap schema + tap.schema = custom_properties + + # Verify that the custom property is in the metadata of the catalog + assert ( + tap.catalog.streams[1].find_metadata_by_breadcrumb( + ["properties", "custom_property"] + ) + != None + ) + + # Verify that the custom property is in the schema of the catalog + assert "custom_property" in tap.catalog.streams[1].stream_schema["properties"] + + +def test_catalog_add_nested_custom_property(tap: Tap): + """If we add a custom property, the catalog should be updated.""" + custom_properties = { + "users": { + "items": { + "properties": { + "custom_property": { + "type": "string", + }, + }, + "type": "object", + } + } + } + + # Add a custom property to the tap schema + tap.schema = custom_properties + + # Verify that the custom property is in the metadata of the catalog + assert ( + tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "items"]) + != None + ) + + # Verify that the custom property is in the schema of the catalog + assert "items" in tap.catalog.streams[1].stream_schema["properties"]