Skip to content

Commit

Permalink
Refactor add_properties_to_schema method
Browse files Browse the repository at this point in the history
  • Loading branch information
BernardWez committed Feb 13, 2024
1 parent 8d364ba commit bb514b3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
46 changes: 28 additions & 18 deletions elx/catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Tuple
from pydantic import BaseModel, Field
import logging


class Stream(BaseModel):
Expand Down Expand Up @@ -194,29 +195,38 @@ def set_replication_keys(self, replication_keys: Optional[dict]) -> "Catalog":

return catalog

def add_properties_to_schema(self, properties: Optional[dict]) -> "Catalog":
def add_properties_to_schema(self, custom_schema: Optional[dict]) -> "Catalog":
"""
Adds custom properties to stream schema and metadata.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Loop over the streams
for stream in catalog.streams:
# If the stream is specified in `properties` dictionary
if stream.tap_stream_id in properties:
# Get the custom properties for the current stream
custom_properties = properties[stream.tap_stream_id]

# Loop over each custom property
for property_name, property_typing in custom_properties.items():
# Add property to the stream schema with
stream.stream_schema["properties"][property_name] = property_typing

# Add property to metadata and mark as selected
stream.upsert_metadata(
breadcrumb=["properties", property_name],
metadata={"selected": True},
)
# Loop over the streams referenced in the `custom_schema`
for stream_name in custom_schema.keys():
# Find the stream
stream = catalog.find_stream(stream_name)

# If the stream is not found, skip it
if stream is None:
# Log warning about an invalid stream name in the schema configuration
logging.warning(
f"Found stream `{stream_name}` in the `schema` definition that does not exist in the catalog."
)
continue

# Get the custom properties for the current stream
custom_properties = custom_schema[stream.tap_stream_id]

# Loop over each custom property
for property_name, property_typing in custom_properties.items():
# Add property to the stream schema with
stream.stream_schema["properties"][property_name] = property_typing

# Add property to metadata and mark as selected
stream.upsert_metadata(
breadcrumb=["properties", property_name],
metadata={"selected": True},
)

return catalog
2 changes: 1 addition & 1 deletion elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def catalog(self) -> Catalog:
catalog = catalog.set_replication_keys(
replication_keys=self.replication_keys
)
catalog = catalog.add_properties_to_schema(properties=self.schema)
catalog = catalog.add_properties_to_schema(custom_schema=self.schema)
return catalog

@contextlib.asynccontextmanager
Expand Down

0 comments on commit bb514b3

Please sign in to comment.