RFC: Community Input for the Dagster Embedded ELT #17300
Replies: 18 comments 49 replies
-
I really like:
I think inbuilt stateful data quality checks (anomaly detection) would be really cool |
Beta Was this translation helpful? Give feedback.
-
I think dlt would be a great next integration 😄 |
Beta Was this translation helpful? Give feedback.
-
Proposing something related to the greater topic of integrations: What if If you look at the source code for our data ingestion integrations, they often follow the same implementation patterns. An option that we can follow is making builder pattern utilities that standardize how to do a task in Dagster and providing a spec/abstract class for each tool on how to incorporate into that standard. This way,
Here's a short conceptual example of what this could look like for ingesting data with dlt: from dagster_embedded_elt import build_ingestion_assets, EmbeddedEltResource
class DltResource(EmbeddedEltResource):
source: str
destination: str
@override
def start_ingestion():
# defines how to start dlt
...
pipeline.run(...)
@override
def poll_ingestion():
# defines how to eval the sync progress (in-progress vs done)
# also sends logs back to Dagster
pass
@override
def get_asset_keys():
# define how to figure out what assets are being made at compile-time
pass
@override
def get_materialization_metadata():
# define how metadata is made for a materialization
pass
mongo_to_bigquery_resource = DltResource(source="mongodb", destination="bigquery")
dlt_ingested_assets = build_ingestion_assets(mongo_to_bigquery_resource)
defs = Definitions(
assets=dlt_ingested_assets
) This can eventually span any other category of tools in the stack. Embedded ELT can standardize what it means to:
And we can package common representations into the library, allowing people to work with their Slings, Meltanos, dlts, etc. out of the box, but also be able to quickly add a new integration for the next incoming ingestion tool or a proprietary in-house service. |
Beta Was this translation helpful? Give feedback.
-
Love the idea about embeddable ELT, well spotted that big data is not necessarily that big and many tasks can be solved with lightweight tools. Now I have tested it out and have few comments:
And then
It this situation it would be easier to have one
If SlingResource has just one source+target pair, source_connection/target_connection can be defaulted to it.
|
Beta Was this translation helpful? Give feedback.
-
Fantastic addition! This is exactly as simple as basic data moving should be. One thing that I'd love would be a simple means of leveraging the Another use case that'd be nice to support would be dynamic sling asset creation. For example, if I were to only want to move certain tables from one db to another with a full-refresh, it'd be ideal to just provide a list of the table names. It's a lot of boilerplate to have to manually write out Thanks for listening and for all your doing! Love the direction this is going! |
Beta Was this translation helpful? Give feedback.
-
I really like this integration in general. It seems to solve a lot of the frustration I had when setting up dlt and I'm enjoying exploring Sling. I'd be curious to hear what others think of this idea: what I like about the dbt integration (dbt, not dlt) is that you can operate dbt as a CLI tool, and Dagster can ingest that configuration and we can orchestrate dbt commands through Dagster. As is, I can have a YAML config I use with the Sling CLI, but I have to recreate that config in Dagster / python if I want to use the current integration. It would be annoying / difficult to keep these in sync. The Sling CLI is nice to use and hypothetically convenient for my co-workers who might want to do occasional ad-hoc runs without having to get Dagster involved |
Beta Was this translation helpful? Give feedback.
-
@PedramNavid Is there anywhere to follow the |
Beta Was this translation helpful? Give feedback.
-
Finally have some bandwidth for this, and I'm excited by the prospect of being able to insource whatever we're still using Fivetran and Airbyte for. I learned about dlt after seeing it mentioned in the video explaining Embedded ELT, and that would take a solid bite out of our managed services. Here's what we're paying for currently:
Like the comment above, I'd love to see a roadmap too. Our fiscal year is in July, so it would be great if we could have a (mostly) reliable integration sometime in the next 6 months. |
Beta Was this translation helpful? Give feedback.
-
Hey all, I wanted to give a quick preview of where we've been headed since this RFC was posted, I know it's been a while since this post, but we have been working on making sure the next iteration addresses all of the feedback here. Here's an example asset that is using a new asset decorator, rather than the existing asset builder methods. As you can see, you can include multiple connections in a single resource. It uses the new Sling replication API: https://docs.slingdata.io/sling-cli/run/configuration#replication-config, which has both a sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)
@sling_assets(replication_config=replication_config) # can also add partitions_def, backfill_policy and op_tags here
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
debug=True,
):
context.log.info(lines) An example replication config: source: MY_POSTGRES
target: MY_DUCKDB
defaults:
mode: full-refresh
object: '{stream_schema}_{stream_table}'
streams:
public.accounts:
public.users:
disabled: true
public.finance_departments_old:
object: 'departments' # overwrite default object
source_options:
empty_as_null: false
meta:
dagster_source: boo
public."Transactions":
mode: incremental # overwrite default mode
primary_key: id
update_key: last_updated_at
public.all_users:
sql: |
select all_user_id, name
from public."all_Users"
object: public.all_users # need to add 'object' key for custom SQL
env:
SLING_LOADED_AT_COLUMN: true # adds the _sling_loaded_at timestamp column
SLING_STREAM_URL_COLUMN: true # if source is file, adds a _sling_stream_url column with file path / URL With the above code and config, you'll get an asset graph like this: The implementation is now working, so I Just wanted to share this as I work to wrap up docs, examples and tests. Thank you to @nixent for the feedback on resources and partition defs, to @hello-world-bfree for the suggestions on making stdout available to process, and @cdchan for allowing you to use a shared Sling replication config. Please keep the feedback coming, we do read it and appreciate it. |
Beta Was this translation helpful? Give feedback.
-
PyAirbyte just entered public beta. Maybe this is something to consider on the roadmap since they have more connectors than other packages? 😄 |
Beta Was this translation helpful? Give feedback.
-
Hi all! Just wanted to let you all know our latest version of dagter-embedded-elt just shipped. Check out our docs for the latest: https://docs.dagster.io/integrations/embedded-elt And you can see example code right: This release makes it ever easier to sync data using multiple connections and to define multiple tables using Sling's replication yaml. We've tested it on millions of rows and it's worked really well, but as always, would love your feedback! Much of the feedback you all provided has gone into making this latest release possible. |
Beta Was this translation helpful? Give feedback.
-
👋 Hey @PedramNavid! Not sure how deep y'all are into the dlt integration, but I wanted to provide some hopefully helpful feedback after integrating for myself. I actually found integrating the two to be a fairly painless process after getting up-to-speed on how dlt works under the hood. I'm sure you guys will make working with dlt in Dagster more ergonomic but I wanted to suggest that the coupling be kept as light and loose as possible. I'm tempted to go as far as saying an explicit integration may not even be necessary. Some helper functions and education - maybe an opinionated best practices guide - alone might do the trick. Showing folks how to do it - and how easy it is! - may be enough. dlt is so flexible that I fear any explicit integration may lead to unnecessary restrictions and limitations. With that said, I'm loving the pairing of Dagster and dlt! It was the last piece of the puzzle to allow my org to completely cut ties with Fivetran and Airbyte. Appreciate you bringing the idea of embedded ELT to the forefront! |
Beta Was this translation helpful? Give feedback.
-
Hey everyone! I'm excited to share that the dltHub integration has landed in version 1.7 of Dagster. You can find relevant documentation and the introductory blog post here:
Thanks so much to all of the community members requested this integration, and for the early feedback from members including: I look forward to further collaboration and making continued improvements to the integration. Cheers! |
Beta Was this translation helpful? Give feedback.
-
The dlt enhancement is great, thanks @cmpadden. Now that I'm using it fully I have some observations and questions:
|
Beta Was this translation helpful? Give feedback.
-
I know I'm late to the party, but I missed this before. I had previously been building my own assets on top of the replication config using an earlier version of the sling integration, but now I'm trying to switch to the new decorator. However, it seems that it makes a multi-asset with can_subset explicitly set to False, which disallows running a single stream. I was wondering if there's a reason for that, as it is really helpful if you have just one table or stream that needs to be replicated at a given time. I think this also disallows making jobs that replicate certain subsets of the data more frequently, though that's less important for my immediate use case which is just running/re-running failed assets. Replicating a subset of streams is supported by sling with the --streams cli option - https://docs.slingdata.io/sling-cli/run/configuration/cli-flags. |
Beta Was this translation helpful? Give feedback.
-
So I tried using the DLT integration just briefly, so I might be missing some things, but I have a couple of questions around making the pipeline in the asset decorator:
|
Beta Was this translation helpful? Give feedback.
-
Extract telemetry i.e. number of rows imported from ELT/sling. Is there already some work on retrieving the affected record count i.e. after a sync via sling into the asset metadata? |
Beta Was this translation helpful? Give feedback.
-
I find it very troublesome to define upstream dependencies and auto materialization for individual mapped dlt asset. In dbt, I can define auto materialization and dependencies in model schema.yml for each individual model. And I don't need to write a specific translator. But I don't find a way to define them for each dlt resource. If I define them in translator, it means same auto materialization and same dependencies for a batch of dlt resource. Unless I write a dlt_asset for every single dlt resource, I can control auto materialization and dependencies on a per dlt resource grain. But then I don't see the point of using this integration library anymore, I can just write generic dagster asset defs. |
Beta Was this translation helpful? Give feedback.
-
Hello!
With the great reception we've seen of our initial launch of
dagster-embedded-elt
, we'd love to get feedback from the community about what they think about our philosophy and approach. We've captured these thoughts in more detail in our blog post, but briefly, we believe that smaller embedded libraries work really well when paired with a powerful orchestrator.We've shipped our initial version using Sling and the [docs] cover the API and code examples.
By not having to reinvent an orchestrator, these libraries can be focused on what makes ingestion hard, while Dagster can be used to fill in the orchestration gaps, such as state management, scheduling, logging, and so on.
Are there particular integrations that you would like us to focus on next? What do you think about this overall approach? Do you think there are other parts of the data lifecycle that would benefit from this as well?
Appreciate it!
Beta Was this translation helpful? Give feedback.
All reactions