Skip to content

Commit

Permalink
Feature/selection (#19)
Browse files Browse the repository at this point in the history
* First catalog selection

* Update elx package and tests

* Refactor select method in Catalog and Tap classes

* Fix runner.tap.streams to
runner.tap.catalog.streams in load_assets function

* Update metadata selection logic in Catalog and CLI

* Update version number in pyproject.toml
  • Loading branch information
JulesHuisman authored Nov 9, 2023
1 parent 7e20d57 commit 0f2866e
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 118 deletions.
1 change: 1 addition & 0 deletions elx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from elx.tap import Tap
from elx.target import Target
from elx.runner import Runner
from elx.catalog import Catalog

logger = logging.getLogger("pipx")
logger.setLevel(logging.CRITICAL)
61 changes: 54 additions & 7 deletions elx/catalog.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from typing import List, Optional
from typing import List, Optional, Tuple
from pydantic import BaseModel, Field


class Schema(BaseModel):
properties: dict


class Stream(BaseModel):
tap_stream_id: str
replication_method: Optional[str] = "FULL_REFRESH"
stream: str = Field(alias="tap_stream_id")
table_name: Optional[str] = None
replication_method: Optional[str] = "FULL_TABLE"
key_properties: List[str]
stream_schema: Schema = Field(alias="schema")
stream_schema: dict = Field(alias="schema")
is_view: Optional[bool] = False
metadata: List[dict] = Field(default_factory=list)

@property
def name(self) -> str:
Expand All @@ -19,3 +19,50 @@ def name(self) -> str:
@property
def safe_name(self) -> str:
return self.name.replace("-", "_")

def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
"""
for metadata in self.metadata:
if metadata["breadcrumb"] == breadcrumb:
return metadata

return None


class Catalog(BaseModel):
streams: List[Stream] = Field(default_factory=list)

def select(self, streams: Optional[List[str]]) -> "Catalog":
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Simply return the catalog if no streams are selected.
if streams is None:
return catalog

# Loop through the streams in the catalog.
for stream in catalog.streams:
# Find the stream metadata.
metadata = stream.find_by_breadcrumb([])

# Update the metadata if it exists.
if metadata:
metadata["metadata"]["selected"] = (
stream.tap_stream_id in streams
) or (stream.safe_name in streams)

# Otherwise, create the metadata.
else:
stream.metadata.append(
{
"breadcrumb": [],
"metadata": {
"selected": (stream.tap_stream_id in streams)
or (stream.safe_name in streams),
},
}
)

return catalog
2 changes: 1 addition & 1 deletion elx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def invoke(
inquirer.List(
"stream",
message="Which stream do you want to invoke?",
choices=["all", *[stream.name for stream in tap.streams]],
choices=["all", *[stream.name for stream in tap.catalog.streams]],
default="all",
carousel=True,
),
Expand Down
2 changes: 1 addition & 1 deletion elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
key_prefix=dagster_safe_name(runner.tap.executable),
code_version=runner.tap.hash_key,
)
for stream in runner.tap.streams
for stream in runner.tap.catalog.streams
},
can_subset=True,
group_name=dagster_safe_name(runner.tap.executable),
Expand Down
70 changes: 18 additions & 52 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@
from pathlib import Path
from typing import Generator, List, Optional
from elx.singer import Singer, require_install
from elx.catalog import Stream
from elx.catalog import Stream, Catalog
from elx.json_temp_file import json_temp_file
from subprocess import Popen, PIPE


class Tap(Singer):
def __init__(
self,
spec: str,
executable: str | None = None,
config: dict = {},
selected: List[str] = None,
):
super().__init__(spec, executable, config)
self.selected = selected

def discover(self, config_path: Path) -> dict:
"""
Run the tap in discovery mode.
Expand All @@ -25,7 +35,7 @@ def discover(self, config_path: Path) -> dict:
return self.run(["--config", str(config_path), "--discover"])

@cached_property
def catalog(self) -> dict:
def catalog(self) -> Catalog:
"""
Discover the catalog.
Expand All @@ -34,52 +44,8 @@ def catalog(self) -> dict:
"""
with json_temp_file(self.config) as config_path:
catalog = self.discover(config_path)
return catalog

def filtered_catalog(
self, catalog: dict, streams: Optional[List[str]] = None
) -> dict:
"""
Filter the catalog.
Args:
catalog (dict): The catalog.
streams (Optional[List[str]], optional): The streams to filter on. Defaults to None.
Returns:
dict: The filtered catalog.
"""
if not streams:
return catalog

return {
"streams": [
{
**stream,
"selected": stream["tap_stream_id"] in streams,
"metadata": stream["metadata"]
+ [
{
"metadata": {
"selected": stream["tap_stream_id"] in streams,
},
"breadcrumb": [],
}
],
}
for stream in catalog["streams"]
]
}

@property
def streams(self) -> list[Stream]:
"""
Get the streams from the catalog.
Returns:
list[Stream]: The streams.
"""
return [Stream(**stream) for stream in self.catalog["streams"]]
catalog = Catalog(**catalog)
return catalog.select(streams=self.selected)

@contextlib.asynccontextmanager
@require_install
Expand All @@ -94,10 +60,10 @@ async def process(
Returns:
Popen: The tap process.
"""
catalog = self.filtered_catalog(catalog=self.catalog, streams=streams)
catalog = self.catalog.select(streams=streams)

with json_temp_file(self.config) as config_path:
with json_temp_file(catalog) as catalog_path:
with json_temp_file(catalog.dict(by_alias=True)) as catalog_path:
with json_temp_file(state) as state_path:
yield await asyncio.create_subprocess_exec(
*[
Expand Down Expand Up @@ -126,10 +92,10 @@ def invoke(
"""
# TODO: Make use of the process context manager.

catalog = self.filtered_catalog(catalog=self.catalog, streams=streams)
catalog = self.catalog.select(streams=streams)

with json_temp_file(self.config) as config_path:
with json_temp_file(catalog) as catalog_path:
with json_temp_file(catalog.dict(by_alias=True)) as catalog_path:
with json_temp_file({}) as state_path:
process = Popen(
[
Expand Down
63 changes: 37 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "elx"
version = "0.1.0a7"
version = "0.1.0"
description = "A lightweight Python interface for extracting and loading using the Singer.io spec."
authors = ["Jules Huisman <[email protected]>"]
readme = "README.md"
Expand All @@ -18,8 +18,8 @@ boto3 = {version = "^1.0.0", optional = true}
google-cloud-storage = {version = "^2.6.0", optional = true}
requests = {version = "^2.0.0", optional = true}
paramiko = {version = "^2.0.0", optional = true}
dagster = {version = "^1.3.14", optional = true}
dagster-webserver = {version = "^1.3.14", optional = true}
dagster = {version = "^1.5.6", optional = true}
dagster-webserver = {version = "^1.5.6", optional = true}
typer = {extras = ["all"], version = "^0.9.0"}
inquirer = "^3.1.3"

Expand Down
Loading

0 comments on commit 0f2866e

Please sign in to comment.