Skip to content

Commit

Permalink
Improved Dagster logging (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesHuisman authored Aug 17, 2023
1 parent b1efe8b commit 7b2dc4d
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 394 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__pycache__/
.pytest_cache/
.env
tap-*.json
tap-*.json
tmp*
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"**/Thumbs.db": true,
"**/.venv": true,
"**/.mypy_cache": true,
"**/.pytest_cache": true
"**/.pytest_cache": true,
"**/tmp*": true
}
}
90 changes: 74 additions & 16 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,83 @@
from typing import List
from elx import Runner, Tap, Target
from elx.catalog import Stream
from dagster import AssetsDefinition, asset
from typing import Generator, List
from elx import Runner
from dagster import (
AssetsDefinition,
Nothing,
OpExecutionContext,
Output,
multi_asset,
AssetOut,
get_dagster_logger,
)
from elx.extensions.dagster.utils import dagster_safe_name, generate_description

logger = get_dagster_logger()


def load_assets(runner: Runner) -> List[AssetsDefinition]:
def run_factory(runner: Runner, stream: Stream):
def run(context):
runner.run(stream=stream.name)
return dagster_safe_name(stream.name)
"""
Load the assets for a runner, each asset represents one tap target combination.
Args:
runner (Runner): The runner to extract from.
Returns:
List[AssetsDefinition]: The assets.
"""

def run_factory(runner: Runner) -> callable:
"""
Create a run function for a runner.
Args:
runner (Runner): The runner to create a run function for.
Returns:
callable: The run function that gets executed by Dagster.
"""

def run(context: OpExecutionContext) -> Generator[Output, None, None]:
"""
Run a tap target combination.
Args:
context (OpExecutionContext): The context to run in.
Yields:
Generator[Output, None, None]: The names of the selected outputs.
"""
# Execute the runner and yield the selected outputs.
runner.run(
streams=list(context.selected_output_names),
logger=logger,
)

for context_output_name in context.selected_output_names:
yield Output(
value=Nothing,
output_name=context_output_name,
metadata={
"state_path": f"{runner.state_manager.base_path}/{runner.state_file_name}",
"state": runner.load_state(),
},
)

return run

return [
asset(
name=dagster_safe_name(stream.name),
key_prefix=dagster_safe_name(runner.tap.executable),
description=generate_description(runner=runner, stream=stream),
multi_asset(
name=f"run_{dagster_safe_name(runner.tap.executable)}_{dagster_safe_name(runner.target.executable)}",
outs={
dagster_safe_name(stream.name): AssetOut(
is_required=False,
description=generate_description(runner=runner, stream=stream),
key_prefix=dagster_safe_name(runner.tap.executable),
code_version=runner.tap.hash_key,
)
for stream in runner.tap.streams
},
can_subset=True,
group_name=dagster_safe_name(runner.tap.executable),
compute_kind="python",
)(run_factory(runner, stream))
for stream
in runner.tap.streams
]
)(run_factory(runner))
]
12 changes: 9 additions & 3 deletions elx/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
import subprocess
from typing import Optional
from typing import List, Optional

from functools import cached_property
from elx.tap import Tap
Expand Down Expand Up @@ -56,10 +56,14 @@ def interpolation_values(self) -> dict:
"TARGET_NAME": self.target.executable.replace("-", "_"),
}

def run(self, stream: Optional[str] = None) -> None:
def run(
self,
streams: Optional[List[str]] = None,
logger: logging.Logger = None,
) -> None:
state = self.load_state()

with self.tap.process(state=state, stream=stream) as tap_process:
with self.tap.process(state=state, streams=streams) as tap_process:
with self.target.process(tap_process=tap_process) as target_process:

def log_lines():
Expand All @@ -68,6 +72,8 @@ def log_lines():

for line in log_lines():
print(line.decode("utf-8"))
if logger:
logger.info(line.decode("utf-8"))

tap_process.stdout.close()
stdout, stderr = target_process.communicate()
Expand Down
50 changes: 24 additions & 26 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,39 @@ def catalog(self) -> dict:
return catalog

def filtered_catalog(
self, catalog: dict, selected_stream: Optional[str] = None
self, catalog: dict, streams: Optional[List[str]] = None
) -> dict:
"""
Filter the catalog.
Args:
catalog (dict): The catalog.
stream (Optional[str], optional): The stream to filter on. Defaults to None.
streams (Optional[List[str]], optional): The streams to filter on. Defaults to None.
Returns:
dict: The filtered catalog.
"""
# TODO: Abstract this away.
if selected_stream:
return {
"streams": [
{
**stream,
"selected": stream["tap_stream_id"] == selected_stream,
"metadata": stream["metadata"]
+ [
{
"metadata": {
"selected": stream["tap_stream_id"]
== selected_stream
},
"breadcrumb": [],
}
],
}
for stream in catalog["streams"]
]
}
if not streams:
return catalog

return catalog
return {
"streams": [
{
**stream,
"selected": stream["tap_stream_id"] in streams,
"metadata": stream["metadata"]
+ [
{
"metadata": {
"selected": stream["tap_stream_id"] in streams,
},
"breadcrumb": [],
}
],
}
for stream in catalog["streams"]
]
}

@property
def streams(self) -> list[Stream]:
Expand All @@ -87,15 +85,15 @@ def streams(self) -> list[Stream]:
def process(
self,
state: dict = {},
stream: Optional[str] = None,
streams: Optional[List[str]] = None,
) -> Generator[Popen, None, None]:
"""
Run the tap process.
Returns:
Popen: The tap process.
"""
catalog = self.filtered_catalog(self.catalog, selected_stream=stream)
catalog = self.filtered_catalog(catalog=self.catalog, streams=streams)

with json_temp_file(self.config) as config_path:
with json_temp_file(catalog) as catalog_path:
Expand Down
Loading

0 comments on commit 7b2dc4d

Please sign in to comment.