[RFC] Dagster Pipes (previously ext) #16319
Replies: 18 comments 32 replies
-
I have a question: In the example where
logic and subsequent logging calls are added, do you foresee that the script could remain runnable without dagster (as in whatever fashion it was runnable orriginally, like |
Beta Was this translation helpful? Give feedback.
-
My team is very interested in this. We have a huge graph of assets that are produced via Scala Spark code in a bespoke workflow manager. It would be amazing to be able to extend the bespoke workflow manager to produce Dagster events. We'd be happy to collaborate on something of this nature. As of right now these are run on top of Databricks, so it seems that you're already planning on developing a runner (not sure what the appropriate terminology is yet in this context) for that in the near-term, but it seems that there would need to be a bit of Scala code to do the serialization/deserialization - my team would be happy to contribute to this, we'd just need some guidance on the protocol as it becomes more standardized. Overall really excited about this, it opens up a whole new world of possibilities. The ability to write pipelines in a language like Scala or Rust and still interact with Dagster could be really huge, both for type-safety and performance. |
Beta Was this translation helpful? Give feedback.
-
This seems really sweet. We are creating a dataproc step launcher, but I guess it might make more sense to wait for this feature first. I do have some questions. One of the benefits of the step launcher is that you can write code inside of the dagster codebase and run that code remotely. Here, there would be an expectation that the code is already inside of the spark cluster, right? We would have to create some sort of 'dataproc_ext_client' to be able to run these jobs, right? What would the work of creating one of these clients entail compared to the amount of work of creating a step launcher (which is non-trivial)? Also, we have a bunch of node.js jobs that we currently run using a subprocess, what would change in the way that we run these scripts? Is there a future where we can declare jobs directly inside of node.js? |
Beta Was this translation helpful? Give feedback.
-
Thanks for this proposal and discussion - we really enjoy watching how Dagster is progressing. |
Beta Was this translation helpful? Give feedback.
-
Sounds pretty interesting. How do you imagine credentials might work? For instance, if |
Beta Was this translation helpful? Give feedback.
-
Already discussed this offline with Nick, but we're mega excited for this as we're just about to roll out Dagster and the library should enable us to onboard a product team much more quickly into the system! Specifically looking to orchestrate Spark computations running in our own Kubernetes cluster in Azure. Only question is, how are we expected to provide the |
Beta Was this translation helpful? Give feedback.
-
I'm curious if you've imagined dagster-ext being used to simplify / complement step launchers? It seems that this framework could be used in tandem with some more generalized tooling for moving step code to a remote location for external execution, to give a more solid framework to back step launchers. I totally see the utility of not having to go through step launchers, but they're also a pretty awesome way to give a more unified coding experience to devs where they can define all their pipelines in one place and not have to think about making sure code changes get deployed to the various different platforms on which it needs to be executed. This is particularly useful for local iteration, where a dev can be working in their IDE making changes and launching jobs through a local Dagster deployment. They don't have to remember to sync code to somewhere that Databricks can refer to every time they make a change, the step launcher does that at runtime for them. Sometimes if our devs are using a pre-configured step launcher or working on someone else's pipeline they don't even realize their code is executing on Databricks, it's that transparent. |
Beta Was this translation helpful? Give feedback.
-
I notice that in your k8s example, the command=["python", "the_existing_script.py"], If I have a team trying to run some bespoke CLI utility written in an arbitrary non-python language, will they be able to at least read the context information via pod-mounted env variables or similar? |
Beta Was this translation helpful? Give feedback.
-
Our team currently materializes a few assets in non-standard third party platforms and the way we've dealt with those has been through polling their API to check for asset readiness. We can run our code in those platforms, so I'm wondering if we would be able not only to share logs, but to push information telling dagster that the asset is materialized. I would love to drop this polling architecture... |
Beta Was this translation helpful? Give feedback.
-
How will I/O for assets executed by an external process be handled? For writing/persisting the asset, presumably, the external process now has to do the work of writing the output to disk/S3/data warehouse? If so, won't this represent a significant regression compared to the abstraction an For reading/loading the asset in downstream assets that are not executed in an external process, I assume you still need to define an |
Beta Was this translation helpful? Give feedback.
-
I'm really interested in this since at our site we have a mixed Cloud/HPC environment where we have dagster OSS on the cloud side and where we launch heavy steps on the HPC side with a custom StepLauncher. We are currently releasing in production the first pipelines and we have adopted the main dagster abstractions: Software Defined Assets, Resources and IOManager. My main concern in using this proposed protocol over a StepLauncher is that it seems like it will opt you out not only from the IOManager, but also from the Resource abstraction. Moreover the resulting Software Defined Asset is opaque since the code that is being executed and that actually materializes the asset is not even hosted in the same repository. I can see the value of this protocol in onboarding existing codebases but it would be really nice to use ext also within fresh pipelines natively written using the dagater framework and abstractions. It would probably be nice to have an @external_asset for such use cases, do you plan to add something similar in the future? When I originally got into dagster the IOManager was a key and distinguishing component of it. It was advertised as a good abstraction to use also with spark not only for scalars and small enough datasets. This new protocol, together with the deps argument and the latest docs seem to go in a different direction. While at the beginning dagster was requiring you to refactor the code to use its abstractions, now the underlying message I read is: "structure your code as you please and use the tools that you are most familiar with. We will provide a simple api to describe your pipeline in dagster and to stream metadata to it so that it can orchestrate and visualize the pipeline for you". |
Beta Was this translation helpful? Give feedback.
-
I was on a call today with Dagster Sales, and when describing my use case they pointed me toward this discussion on Dagster Ext. However, after reading this page, I am not sure it fits my use case. Let me explain what I am trying to do to see it can be solved using Dagster Ext. I am in the process of evaluating Dagster to replace SnapLogic for a large number of integrations (1200). These integrations run nightly to extract the last 24 hours of the customer's data and send it to our application's data warehouse. To accomplish this, we run an on-prem local agent (created by SnapLogic) that connects to the customer's SQL database in a customers environment, and the agent then ships the data up the the SnapLogic server. This is orchestrated from SnapLogic, with of course some setup on the customer's windows server to install and configure the remote agent. Reading through this discussion, it seems as though we could sort of accomplish this, but we would still have to write our own "agent" code, deploy that code to the customer's server, and then have that code read from the DB and send up the data of interest. The benefit of using Dagster Ext would be the logs would show up in Dagster Cloud, and we could configure some of the input parameters from Dagster Cloud that are sent to the "agent". The problem here is that if we want to deploy new code to the customer's server, we have to do that manually, we can't leverage Dagster to update the "agent". Am I understanding this correctly? Is there any other way to accomplish this with Dagster? |
Beta Was this translation helpful? Give feedback.
-
I'm coming from a thread on slack and wanted to verify if this addresses the thing I'm searching or it's something different. my question was:
i guess the basic question would be: if i orchestrate the job scheduling and running of dagster code locations myself, can I still send the metadata/job runs back to a central dagster instance which would visualize the job results? (failures, logs, history, ...) not sure if the following use case is what I'm looking forward to?
|
Beta Was this translation helpful? Give feedback.
-
Hey all. Quick update here. We decided to change the name from "ext" to "Pipes". We think that is a lot more clear and provides an analogy that people can latch onto. It goes out this week! |
Beta Was this translation helpful? Give feedback.
-
First let me say that I love dagster, and you guys are doing amazing work. I have never been more happy Dagit (or is it dagster-webserver now?), is such a huge gift for being able to see what's going on with the data, and for showing it to others. I had a much easier time convincing people that we should use dagster once they saw it. I absolutely love the idea of being able to leverage the beautiful single pane of glass from other languages, but I think I would love it much more if it were a deeper/more stringent integration. I know this is only a first step, and I believe/hope that you have plans to do deeper integration with other languages/ecosystems, but this step worries me because I can see it being a local maxima for a lot of people/teams. If someone can take their random jupyter notebook, lambda, aws step function, or whatever and pretend it works well with dagster then I feel like I lose leverage to push them for more rigor. Dagster as a framework has allowed and helped me to push for people to do better: to more accurately model their pipelines, to separate io/resources, to test things, to make them able to run locally and not just in production. I also feel like it makes dagit less useful, and much more like airflow. I no longer necessarily know what's going in or out of a given asset/op if it's using this, or what resources it needs/uses. Since it's sold as just adding a few lines of code I know I'm going to have trouble buying time to refactor/improve or even really understand the existing code. I know this is probably the better choice from a company perspective for dagster labs (RIP elementl), and that it's not dagster's responsibility to fix other company's engineering/organizational issues, but I worry that it detracts from the principled stance I've seen dagster as taking up until now and the state of the art. I would be much more excited if the concepts of registering resources/assets/ops/graphs etc was ported to other languages/ecosystems first so that we could have that deep integration with dagit and engineering rigor, though I recognize that would likely be much more difficult and I haven't though about this nearly as much as you all have. Anyway, that's just my two cents. Thank you all again so much for all that you've done so far and continue to do! |
Beta Was this translation helpful? Give feedback.
-
I was wondering if pipes would fit our requirements. But it would seem not? 🤔 Primary motivation:
Things we tried:
I was hoping the agent may be made available as part of the OSS offering (it seems to fit our problem statement well), and I'm aware that we can write a custom run launcher, but I do not want to write and maintain this type of distributed custom code long term which is hard to test, debug and may be brittle over time. |
Beta Was this translation helpful? Give feedback.
-
Pretty far-out vision: It appears that this will allow the use of Dagster's UI with not just external data |
Beta Was this translation helpful? Give feedback.
-
Hi all! We are now live. Wanted to thanks everyone in this discussion for such thoughtful commentary and feedback. It was an invaluable part of the process. Please go kick the tires and see if the reality matches the promise! |
Beta Was this translation helpful? Give feedback.
-
Introduction
Dagster’s 1.5 release will contain a new protocol designed to enhance integrations with external execution environments. We call it Pipes. Pipes is short for "Protocol for Inter-Process Execution with Streaming logs and metadata"
Pipes has a few goals:
Context
Dagster has traditionally integrated business logic and orchestration. Our tutorial focuses on this approach, where business logic is structured within dagster's definition objects and requires importing the full dagster library. For simple data pipelines where data fits in memory and is directly processed within the orchestrator, this works well.
However this approach falls flat in a number of important contexts:
The “way out” of this is for the body of an asset to invoke external environments directly via Python clients. However, users that do this are left with little support in Dagster:
Not repeating Airflow’s mistakes
Airflow has gone through a similar journey, and many users––especially ones that operate at-scale data platforms––use operators that separate execution and orchestration, such as the
K8sPodOperator
(see docs). This is a fantastic article that details this approach and its operational advantages.However there are also large costs associated with this approach. In particular users are forced to write per-task, bespoke CLI applications in order to invoke compute using
K8sPodOperator
. One of our users memorably described this: “In Airflow you have to choose between dependency hell (meaning a single, shared Python environment) and CLI hell.”Path Forward: A Protocol
What we propose is a protocol for between the orchestration environment and external execution, and a toolkit for building implementations of that protocol. In order for an external process to participate in a first-class way it must:
The transport layer varies depending on your operating context. For the subprocess case, the default transport layer is combination of environment variables and temp files for injecting context and parameters, and a temp file for streaming structured metadata. For a case like Databricks, parameters and context information are passed as parameters to the REST API, and logs and messages are streamed to dbfs.
We will provide out-of-the-box implementations of streaming logs and structured messages for major integrations and object stores. This is the most complex component of the protocol, and these are generally accessible from all cloud-based services. Customizing the "launch" behavior is more straightforward, and, with appropriate support and guardrails in the toolkit, we are confident that the community and users can implement that as required.
What does the code look like
For most users writing business logic, they will not have to understand or care that there is a “protocol.” What they will experience are much-improved integrations with environments like Kuberbetes, Lambda, Databricks, arbitrary subprocesses, and other hosted runtimes. Let's work through an example to see what it looks like in practice.
The scenario here is that you have been tasked with orchestrating an existing Python script that produces an asset, making its logs viewable in Dagster’s UI, and then altering it to emit some metadata back in Dagster (e.g. number of rows). However it is large, complex, untested, not authored by you, and incomprehensible. You do not fully understand it, nor do you have any desire to. You want to invoke this script as an external process, rather than bring that code into the Dagster process.
You write the following asset that invokes the external script. You use the PipedSubprocess that implements the orchestration side of protocol.
This works on the existing script without any modifications to that script. However now you want code in that script to emit metadata back to Dagster. Previously the way to do that would have been to write code within the asset function in the Dagster process. However you want to log metadata that is only available in the script. Dagster Pipes allows you to do that. You can with just a few lines of code:
dagster-pipes
has no dependencies. It easy to install and also easy to vendor (it is a single Python file), if necessary. You have to add a single line of code to initialize Dagster Pipes, and then a single line to report a materialization with metadata.With a few lines of code in the script, it is now a first-class Dagster asset. When it is run, its logs appear streaming in the Dagster UI, and its asset catalog entry is collecting metadata.
Kubernetes
Now imagine you wanted to move this script to Kubernetes. Here you'll see the power of standardization, as we can shift the code to execute in a different environment.
There are no modifications necessary in the other process. You can use the same code unmodified, as long as it is accessible inside of the container.
This structure is a big step forward for data engineering teams that want to incorporate stakeholder teams into a unified Dagster deployment. The stakeholder teams can, with minimal modifications, get their assets into the asset graph, use Dagster as their system of record for metadata, and use Dagster's UI for improved operations and observability. Contrast that with today's world, where stakeholder teams have to substantially restructure their business logic code to fit into Dagster definitions and bring the full dagster library into their Python environment in order to get those benefits.
What about Step Launchers?
Users who have grappled with the issues––especially in Spark––may be asking "what about step launchers?"
Historically we have tried to make the integration of business logic and orchestration work in external runtimes such as Spark with framework-level abstractions.
The step launcher was the framework-level abstraction designed to support ergonomic remote execution. However this has usability problems and is also inherently untenable for some users to adopt:
For users who do not want to structure their spark business logic in Dagster definitions, we think that pipes is the right path forward.
Multi-language future
The library to implement this on the external side is lightweight. In the case where this IPC is implemented using temp files and environment variables (for example our subprocess and kubernetes integrations work like this), no external dependencies are required and it is a small amount of Python code.
As a result, this protocol is fairly straightforward implement in other programming languages. They just have to deserialize and serialize standardized objects to a filesystem or an object store. This will enable a future where practitioners in any programming language and any hosted execution environment can participate in Dagster in a first-class way, which is an exciting future. As we mature the system, we'll formalize this protocol in a spec and provide implementations in other programming languages.
Call-to-action
We have two asks:
First please provide feedback on this idea. And if you see yourself using this, please let us know your concrete use case! It's always helpful to know all the different ways people can envision using a tool.
Second we are looking for design partners. This feature is under active development in our repo, and is in our public releases (but not in our top-level exports). We're looking for folks who want to use these capabilities immediately. If that is of interest to you, please reach out! Our subprocess and Kubernetes integrations are ready for use by active design partners. We've created a channel in the dagster slack, #dagster-pipes, for those who want to follow along.
We are targeting the following external environments (focusing on aws).
We can prioritize development based on demand for these (and other) environments, so please speak up! While this enables a multi-language future, we are only targeting Python in the near-term.
Please comment here with questions and feedback. And join the dagster-pipes slack channel! Thank you!
Beta Was this translation helpful? Give feedback.
All reactions