-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dagster-elt #16681
Add dagster-elt #16681
Conversation
Can you confirm that this library works on Serverless? |
I want to surface the concern I have (which @sryza also shares and flagged yesterday) that the I fully support having |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Req'ing changes for q mgmt
Let's call it |
3489eab
to
2b9c00d
Compare
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
Deploy preview for dagster-university ready! ✅ Preview Built with commit 2b9c00d. |
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 17f3570. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 17f3570. |
python_modules/libraries/dagster-embedded-elt/dagster_embbeded_elt/sling/asset_defs.py
Outdated
Show resolved
Hide resolved
17f3570
to
9e5a907
Compare
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/types.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/types.py
Outdated
Show resolved
Hide resolved
9ede5ec
to
8905ad7
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-ot6tliece-elementl.vercel.app Direct link to changed pages: |
5fd7cc2
to
88977bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not appropriate to be the sole accept, but the api design and docs lgtm, barring a few small notes
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test.csv
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Ben Pankow <[email protected]>
2ce0f6e
to
c74c399
Compare
update_key = [update_key] | ||
|
||
@multi_asset( | ||
compute_kind="sling", specs=[asset_spec], required_resource_keys={sling_resource_key} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks nice and well-document. A few comments:
- This is a very large PR. We generally like to work in more incremental, digestible pieces. This leads to higher quality, more efficient code review and better outcomes.
- There are some complicated components (esp regexs) that are worthy on independent unit tests, so I'm little concerned about test coverage here.
However:
Take my comments as things to note for future and followups.
|
||
@staticmethod | ||
def _exec_sling_cmd(cmd, stdin=None, stdout=PIPE, stderr=STDOUT) -> Generator[str, None, None]: | ||
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this regex is complex enough that it is worthy of it's own comment and independent unit tests
assert proc.stdout | ||
|
||
for line in proc.stdout: | ||
fmt_line = str(line, "utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this another way of going line.decode("utf-8")
?
def _exec_sling_cmd(cmd, stdin=None, stdout=PIPE, stderr=STDOUT) -> Generator[str, None, None]: | ||
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") | ||
with Popen(cmd, shell=True, stdin=stdin, stdout=stdout, stderr=stderr) as proc: | ||
assert proc.stdout | ||
|
||
for line in proc.stdout: | ||
fmt_line = str(line, "utf-8") | ||
clean_line = ansi_escape.sub("", fmt_line).replace("INF", "") | ||
yield clean_line | ||
|
||
proc.wait() | ||
if proc.returncode != 0: | ||
raise Exception("Sling command failed with error code %s", proc.returncode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole code path def a bit scary and I wish it had its own test suite
if self.source_connection.type == "file" and not source_stream.startswith("file://"): | ||
source_stream = "file://" + source_stream | ||
|
||
if self.target_connection.type == "file" and not target_object.startswith("file://"): | ||
target_object = "file://" + target_object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems both very magical and very undocumented
primary_key (Optional[Union[str, List[str]]], optional): The optional primary key to use when syncing. | ||
update_key (Optional[Union[str, List[str]]], optional): The optional update key to use when syncing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to clarify that this is not an asset key. Threw me for a second.
compute_kind="sling", specs=[asset_spec], required_resource_keys={sling_resource_key} | ||
) | ||
def sync(context: AssetExecutionContext) -> MaterializeResult: | ||
sling: SlingResource = getattr(context.resources, sling_resource_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should really have a better API for this where you can just get a dict
of resources
match = re.search(r"(\d+) rows", stdout_line) | ||
if match: | ||
last_row_count_observed = int(match.group(1)) | ||
context.log.info(stdout_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info or debug?
## Summary & Motivation This adds the `dagster-elt` integration which aims to be a set of embedded ELT/ETL tooling to help data engineers with common ingestion/transform tasks such as ingesting data to and from databases, warehouses, and files. This first iteration is a wrapper around the [sling](https://slingdata.io) package. Sling is a lightweight CLI tool that allows users to sync data from a source/destination without the extra burden of heavyweight tooling that other solutions require. ## How I Tested These Changes Unit tests, local testing. --------- Co-authored-by: Ben Pankow <[email protected]>
Summary & Motivation
This adds the
dagster-elt
integration which aims to be a set of embedded ELT/ETL tooling to help data engineers with common ingestion/transform tasks such as ingesting data to and from databases, warehouses, and files.This first iteration is a wrapper around the sling package. Sling is a lightweight CLI tool that allows users to sync data from a source/destination without the extra burden of heavyweight tooling that other solutions require.
How I Tested These Changes
Unit tests, local testing.