Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/catalog custom properties #32

Merged
merged 12 commits into from
Mar 5, 2024
37 changes: 37 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Tuple
from pydantic import BaseModel, Field
import logging


class Stream(BaseModel):
Expand Down Expand Up @@ -193,3 +194,39 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog":
)

return catalog

def add_properties_to_schema(self, custom_schema: Optional[dict]) -> "Catalog":
"""
Adds custom properties to stream schema and metadata.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Loop over the streams referenced in the `custom_schema`
for stream_name in custom_schema.keys():
# Find the stream
stream = catalog.find_stream(stream_name)

# If the stream is not found, skip it
if stream is None:
# Log warning about an invalid stream name in the schema configuration
logging.warning(
f"Found stream `{stream_name}` in the `schema` definition that does not exist in the catalog."
)
continue

# Get the custom properties for the current stream
custom_properties = custom_schema[stream.tap_stream_id]

# Loop over each custom property
for property_name, property_typing in custom_properties.items():
# Add property to the stream schema with
stream.stream_schema["properties"][property_name] = property_typing

# Add property to metadata and mark as selected
stream.upsert_metadata(
breadcrumb=["properties", property_name],
metadata={"selected": True},
)

return catalog
3 changes: 3 additions & 0 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ def __init__(
config: dict = {},
deselected: List[str] = None,
replication_keys: dict = {},
schema: dict = {},
):
super().__init__(spec, executable, config)
self.deselected = deselected
self.replication_keys = replication_keys
self.schema = schema

def discover(self, config_path: Path) -> dict:
"""
Expand Down Expand Up @@ -51,6 +53,7 @@ def catalog(self) -> Catalog:
catalog = catalog.set_replication_keys(
replication_keys=self.replication_keys
)
catalog = catalog.add_properties_to_schema(custom_schema=self.schema)
return catalog

@contextlib.asynccontextmanager
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from elx import Tap


@pytest.fixture(scope="session")
@pytest.fixture()
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
Expand Down
53 changes: 53 additions & 0 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,56 @@ def test_catalog_set_stream_replication_key(tap: Tap):

assert catalog.streams[1].replication_method == "INCREMENTAL"
assert catalog.streams[1].replication_key == "updated_at"


def test_catalog_add_custom_property(tap: Tap):
"""If we add a custom property, the catalog should be updated."""
custom_properties = {
"users": {
"custom_property": {
"type": "string",
},
}
}

# Add a custom property to the tap schema
tap.schema = custom_properties

# Verify that the custom property is in the metadata of the catalog
assert (
tap.catalog.streams[1].find_metadata_by_breadcrumb(
["properties", "custom_property"]
)
!= None
)

# Verify that the custom property is in the schema of the catalog
assert "custom_property" in tap.catalog.streams[1].stream_schema["properties"]


def test_catalog_add_nested_custom_property(tap: Tap):
"""If we add a custom property, the catalog should be updated."""
custom_properties = {
"users": {
"items": {
"properties": {
"custom_property": {
"type": "string",
},
},
"type": "object",
}
}
}

# Add a custom property to the tap schema
tap.schema = custom_properties

# Verify that the custom property is in the metadata of the catalog
assert (
tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "items"])
!= None
)

# Verify that the custom property is in the schema of the catalog
assert "items" in tap.catalog.streams[1].stream_schema["properties"]
Loading