Skip to content

Commit

Permalink
Merge pull request #32 from quantile-development/feature/catalog-cust…
Browse files Browse the repository at this point in the history
…om-properties

Feature/catalog custom properties
  • Loading branch information
BernardWez authored Mar 5, 2024
2 parents f62da5d + bb514b3 commit 5731013
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 1 deletion.
37 changes: 37 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Tuple
from pydantic import BaseModel, Field
import logging


class Stream(BaseModel):
Expand Down Expand Up @@ -193,3 +194,39 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog":
)

return catalog

def add_properties_to_schema(self, custom_schema: Optional[dict]) -> "Catalog":
"""
Adds custom properties to stream schema and metadata.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Loop over the streams referenced in the `custom_schema`
for stream_name in custom_schema.keys():
# Find the stream
stream = catalog.find_stream(stream_name)

# If the stream is not found, skip it
if stream is None:
# Log warning about an invalid stream name in the schema configuration
logging.warning(
f"Found stream `{stream_name}` in the `schema` definition that does not exist in the catalog."
)
continue

# Get the custom properties for the current stream
custom_properties = custom_schema[stream.tap_stream_id]

# Loop over each custom property
for property_name, property_typing in custom_properties.items():
# Add property to the stream schema with
stream.stream_schema["properties"][property_name] = property_typing

# Add property to metadata and mark as selected
stream.upsert_metadata(
breadcrumb=["properties", property_name],
metadata={"selected": True},
)

return catalog
3 changes: 3 additions & 0 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ def __init__(
config: dict = {},
deselected: List[str] = None,
replication_keys: dict = {},
schema: dict = {},
):
super().__init__(spec, executable, config)
self.deselected = deselected
self.replication_keys = replication_keys
self.schema = schema

def discover(self, config_path: Path) -> dict:
"""
Expand Down Expand Up @@ -51,6 +53,7 @@ def catalog(self) -> Catalog:
catalog = catalog.set_replication_keys(
replication_keys=self.replication_keys
)
catalog = catalog.add_properties_to_schema(custom_schema=self.schema)
return catalog

@contextlib.asynccontextmanager
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from elx import Tap


@pytest.fixture(scope="session")
@pytest.fixture()
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
Expand Down
53 changes: 53 additions & 0 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,56 @@ def test_catalog_set_stream_replication_key(tap: Tap):

assert catalog.streams[1].replication_method == "INCREMENTAL"
assert catalog.streams[1].replication_key == "updated_at"


def test_catalog_add_custom_property(tap: Tap):
"""If we add a custom property, the catalog should be updated."""
custom_properties = {
"users": {
"custom_property": {
"type": "string",
},
}
}

# Add a custom property to the tap schema
tap.schema = custom_properties

# Verify that the custom property is in the metadata of the catalog
assert (
tap.catalog.streams[1].find_metadata_by_breadcrumb(
["properties", "custom_property"]
)
!= None
)

# Verify that the custom property is in the schema of the catalog
assert "custom_property" in tap.catalog.streams[1].stream_schema["properties"]


def test_catalog_add_nested_custom_property(tap: Tap):
"""If we add a custom property, the catalog should be updated."""
custom_properties = {
"users": {
"items": {
"properties": {
"custom_property": {
"type": "string",
},
},
"type": "object",
}
}
}

# Add a custom property to the tap schema
tap.schema = custom_properties

# Verify that the custom property is in the metadata of the catalog
assert (
tap.catalog.streams[1].find_metadata_by_breadcrumb(["properties", "items"])
!= None
)

# Verify that the custom property is in the schema of the catalog
assert "items" in tap.catalog.streams[1].stream_schema["properties"]

0 comments on commit 5731013

Please sign in to comment.