diff --git a/docs/docs-beta/docs/getting-started/quickstart.md b/docs/docs-beta/docs/getting-started/quickstart.md index afb15e582ed45..c2d31c604e737 100644 --- a/docs/docs-beta/docs/getting-started/quickstart.md +++ b/docs/docs-beta/docs/getting-started/quickstart.md @@ -153,5 +153,5 @@ id,name,age,city,age_group Congratulations! You've just built and run your first pipeline with Dagster. Next, you can: -- Continue with the [ETL pipeline tutorial](/tutorial/tutorial-etl) to learn how to build a more complex ETL pipeline +- Continue with the [ETL pipeline tutorial](/tutorial/etl-tutorial-introduction) to learn how to build a more complex ETL pipeline - Learn how to [Think in assets](/guides/build/assets-concepts/index.md) diff --git a/docs/docs-beta/docs/tutorial/01-etl-tutorial-introduction.md b/docs/docs-beta/docs/tutorial/01-etl-tutorial-introduction.md new file mode 100644 index 0000000000000..9d719ff89e077 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/01-etl-tutorial-introduction.md @@ -0,0 +1,136 @@ +--- +title: Build an ETL Pipeline +description: Learn how to build an ETL pipeline with Dagster +last_update: + author: Alex Noonan +--- + +# Build your first ETL pipeline + +In this tutorial, you'll build an ETL pipeline with Dagster that: + +- Imports sales data to DuckDB +- Transforms data into reports +- Runs scheduled reports automatically +- Generates one-time reports on demand + +## You will learn to: + +- Set up a Dagster project with the recommended project structure +- Create Assets with metadata +- Connect Dagster to external systems with Resources +- Build dependencies between assets +- Run a pipeline by materializing assets +- Add schedules, sensors, and partitions to your assets +- Refactor project when it becomes more complex + +## Prerequisites + +
+ Prerequisites + +To follow the steps in this guide, you'll need: + +- Basic Python knowledge +- Python 3.9+ installed on your system. Refer to the [Installation guide](/getting-started/installation) for information. +- Familiarity with SQL and Python data manipulation libraries, such as Pandas. +- Understanding of data pipelines and the extract, transform, and load process. +
+ + +## Step 1: Set up your Dagster environment + +First, set up a new Dagster project. + +1. Open your terminal and create a new directory for your project: + + ```bash + mkdir dagster-etl-tutorial + cd dagster-etl-tutorial + ``` + +2. Create and activate a virtual environment: + + + + ```bash + python -m venv dagster_tutorial + source dagster_tutorial/bin/activate + ``` + + + ```bash + python -m venv dagster_tutorial + dagster_tutorial\Scripts\activate + ``` + + + +3. Install Dagster and the required dependencies: + + ```bash + pip install dagster dagster-webserver pandas dagster-duckdb + ``` + +## Step 2: Create the Dagster project structure + +Run the following command to create the project directories and files for this tutorial: + + ```bash + dagster project from-example --example getting_started_etl_tutorial + ``` + +Your project should have this structure: +{/* vale off */} +``` +dagster-etl-tutorial/ +├── data/ +│ └── products.csv +│ └── sales_data.csv +│ └── sales_reps.csv +│ └── sample_request/ +│ └── request.json +├── etl_tutorial/ +│ └── definitions.py +├── pyproject.toml +├── setup.cfg +├── setup.py +``` +{/* vale on */} + +:::info +Dagster has several example projects you can install depending on your use case. To see the full list, run `dagster project list-examples`. For more information on the `dagster project` command, see the [API documentation](https://docs-preview.dagster.io/api/cli#dagster-project). +::: + +### Dagster Project Structure + +#### dagster-etl-tutorial root directory + +In the `dagster-etl-tutorial` root directory, there are three configuration files that are common in Python package management. These files manage dependencies and identify the Dagster modules in the project. +| File | Purpose | +|------|---------| +| pyproject.toml | This file is used to specify build system requirements and package metadata for Python projects. It is part of the Python packaging ecosystem. | +| setup.cfg | This file is used for configuration of your Python package. It can include metadata about the package, dependencies, and other configuration options. | +| setup.py | This script is used to build and distribute your Python package. It is a standard file in Python projects for specifying package details. | + +#### etl_tutorial directory + +This is the main directory where you will define your assets, jobs, schedules, sensors, and resources. +| File | Purpose | +|------|---------| +| definitions.py | This file is typically used to define jobs, schedules, and sensors. It organizes the various components of your Dagster project. This allows Dagster to load the definitions in a module. | + +#### data directory + +The data directory contains the raw data files for the project. We will reference these files in our software-defined assets in the next step of the tutorial. + +## Step 3: Launch the Dagster webserver + +To make sure Dagster and its dependencies were installed correctly, navigate to the project root directory and start the Dagster webserver:" + +followed by a bash code snippet for `dagster dev` + + +## Next steps + +- Continue this tutorial by [creating and materializing assets](/tutorial/create-and-materialize-assets) diff --git a/docs/docs-beta/docs/tutorial/02-create-and-materialize-assets.md b/docs/docs-beta/docs/tutorial/02-create-and-materialize-assets.md new file mode 100644 index 0000000000000..7735d0d72a77b --- /dev/null +++ b/docs/docs-beta/docs/tutorial/02-create-and-materialize-assets.md @@ -0,0 +1,107 @@ +--- +title: Create and materialize assets +description: Load project data and create and materialize assets +last_update: + author: Alex Noonan +--- + + +In the first step of the tutorial, you created your Dagster project with the raw data files. In this step, you will: +- Create your initial Definitions object +- Add a DuckDB resource +- Build software-defined assets +- Materialize your assets + +## 1. Create a Definitions object + +In Dagster, the [Definitions API docs](/todo) object is where you define and organize various components within your project, such as assets and resources. + +Open the `definitions.py` file in the `etl_tutorial` directory and copy the following code into it: + + ```python + import json + import os + + from dagster_duckdb import DuckDBResource + + import dagster as dg + + defs = dg.Definitions( + assets=[], + resources={}, + ) + ``` + +## 2. Define the DuckDB resource + +In Dagster, [Resources API docs](/todo) are the external services, tools, and storage backends you need to do your job. For the storage backend in this project, we'll use [DuckDB](https://duckdb.org/), a fast, in-process SQL database that runs inside your application. We'll define it once in the definitions object, making it available to all assets and objects that need it. + + ```python + defs = dg.Definitions( + assets=[], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +## 3. Create assets + +Software defined [assets API docs](/todo) are the main building blocks in Dagster. An asset is composed of three components: +1. Asset key or unique identifier. +2. An op which is a function that is invoked to produce the asset. +3. Upstream dependencies that the asset depends on. + +You can read more about our philosophy behind the [asset centric approach](https://dagster.io/blog/software-defined-assets). + +### Products asset + +First, we will create an asset that creates a DuckDB table to hold data from the products CSV. This asset takes the `duckdb` resource defined earlier and returns a `MaterializeResult` object. +Additionally, this asset contains metadata in the `@dg.asset` decorator parameters to help categorize the asset, and in the `return` block to give us a preview of the asset in the Dagster UI. + +To create this asset, open the `definitions.py `file and copy the following code into it: + + + +### Sales reps asset + +The code for the sales reps asset is similar to the product asset code. In the `definitions.py` file, add the following code below the product asset code: + + + +### Sales data asset + +To add the sales data asset, copy the following code into your `definitions.py` file below the sales reps asset: + + + +## 4. Add assets to the Definitions object + +Now to pull these assets into our Definitions object. Adding them to the Definitions object makes them available to the Dagster project. Add them to the empty list in the assets parameter. + + ```python + defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + ], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +## 5. Materialize assets + +To materialize your assets: +1. In a browser, navigate to the URL of the Dagster server that yous started earlier. +2. Navigate to **Deployment**. +3. Click Reload definitions. +4. Click **Assets**, then click "View global asset lineage" to see all of your assets. + + ![2048 resolution](/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png) + +5. Click materialize all. +6. Navigate to the runs tab and select the most recent run. Here you can see the logs from the run. + ![2048 resolution](/images/tutorial/etl-tutorial/first-asset-run.png) + + +## Next steps + +- Continue this tutorial with your with your [asset dependencies](/tutorial/create-and-materialize-a-downstream-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/03-create-and-materialize-a-downstream-asset.md b/docs/docs-beta/docs/tutorial/03-create-and-materialize-a-downstream-asset.md new file mode 100644 index 0000000000000..e4774338759d2 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/03-create-and-materialize-a-downstream-asset.md @@ -0,0 +1,44 @@ +--- +title: Create and materialize a downstream asset +description: Reference Assets as dependencies to other assets +last_update: + author: Alex Noonan +--- + +Now that we have the raw data loaded into DuckDB, we need to create a [downstream asset](/guides/build/assets-concepts/asset-dependencies) that combines the upstream assets together. In this step, you will: + +- Create a downstream asset +- Materialize that asset + +## 1. Creating a downstream asset + +Now that we have all of our raw data loaded into DuckDB our next step is to merge it together in a view composed of data from all three source tables. + +To accomplish this in SQL we will bring in our `sales_data` table and then left join on `sales_reps` and `products` on their respective id columns. Additionally, we will keep this view concise and only have relevant columns for analysis. + +As you can see here this asset looks a lot like our previous ones with a few small changes. We put this asset into a different group. To make this asset dependent on the raw tables we add the asset keys the `deps` parameter in the asset definition. + + + +## 2. Materialize the Asset + +1. We need to add the Asset we just made to the Definitions object. + +Your Definitions object should now look like this: + + ```python + defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + ], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +2. In the Dagster UI, reload definitions and materialize the `joined_data` asset + +## Next steps + +- Continue this tutorial with [create and materialize a partitioned asset](/tutorial/ensuring-data-quality-with-asset-checks) \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/04-ensuring-data-quality-with-asset-checks.md b/docs/docs-beta/docs/tutorial/04-ensuring-data-quality-with-asset-checks.md new file mode 100644 index 0000000000000..bbef9192b3319 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/04-ensuring-data-quality-with-asset-checks.md @@ -0,0 +1,51 @@ +--- +title: Ensuring data quality with asset checks +description: Ensure assets are correct with asset checks +last_update: + author: Alex Noonan +--- + +Data Quality is critical in data pipelines. Much like in a factory producing cars, inspecting parts after they complete certain steps ensures that defects are caught before the car is completely assembled. + +In Dagster, you define [asset checks](/guides/test/asset-checks) in a similar way that you would define an Asset. In this step you will: + +- Define an asset check +- Execute that asset check in the UI + +## 1. Define the Asset CHeck + +In this case we want to create a check to identify if there are any rows that have a product or sales rep that are not in the `joined_data` table. + +Paste the following code beneath the `joined_data` asset. + + + +## 2. Run the asset Check + +Before the asset check can be ran it needs to be added to the definitions object. Asset checks are added to their own list like assets. + +Your definitions object should look like this now: + +```python +defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + ], + asset_checks=[missing_dimension_check], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, +) +``` +Asset checks will run when an asset is materialized, but asset checks can also be executed manually in the UI. + +1. Reload Definitions +2. Navigate to the Asset Details page for the `joined_data` asset. +3. Select the checks tab. +4. Press the execute button in for `missing_dimension_check` + + ![2048 resolution](/images/tutorial/etl-tutorial/asset-check.png) + +## Next steps + +- Continue this tutorial with [Asset Checks](/tutorial/create-and-materialize-partitioned-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/05-create-and-materialize-partitioned-asset.md b/docs/docs-beta/docs/tutorial/05-create-and-materialize-partitioned-asset.md new file mode 100644 index 0000000000000..4e4158c4843e6 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/05-create-and-materialize-partitioned-asset.md @@ -0,0 +1,178 @@ +--- +title: Create and Materialize Partitioned Assets +description: Partitioning Assets by datetime and categories +last_update: + date: 2024-11-25 + author: Alex Noonan +--- + +[Partitions](/guides/build/create-a-pipeline/partitioning) are a core abstraction in Dagster, they are how you manage large datasets, process incremental updates, and improve pipeline performance. In Dagster you can partition assets the following ways: + +1. Time-based: Split data by time periods (e.g., daily, monthly) +2. Category-based: Divide by known categories (e.g., country, product type) +3. Two-dimensional: Combine two partition types (e.g., country + date) +4. Dynamic: Create partitions based on runtime conditions + +In this step you will: + +- Create an asset that is partitioned by month +- Create an asset that is partitioned by defined categories + +## 1. Create a time based partitioned asset + +Partitioning by datetime groups is supported in Dagster natively. We want to create an asset that calculates the monthly performance for each sales rep. To create the monthly partition copy the following code below the `missing_dimension_check` asset check. + + + +Partition data are accessed within an asset by context. We want to create an asset that does this calculation for a given month from the partition and deletes any previous value for that month. Paste the following asset under the `monthly_partition` we just created + + ```python + @dg.asset( + partitions_def=monthly_partition, + compute_kind="duckdb", + group_name="analysis", + deps=[joined_data], + ) + def monthly_sales_performance( + context: dg.AssetExecutionContext, duckdb: DuckDBResource + ): + partition_date_str = context.partition_key + month_to_fetch = partition_date_str[:-3] + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists monthly_sales_performance ( + partition_date varchar, + rep_name varchar, + product varchar, + total_dollar_amount double + ); + + delete from monthly_sales_performance where partition_date = '{month_to_fetch}'; + + insert into monthly_sales_performance + select + '{month_to_fetch}' as partition_date, + rep_name, + product_name, + sum(dollar_amount) as total_dollar_amount + from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}' + group by '{month_to_fetch}', rep_name, product_name; + """ + ) + + preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + select count(*) + from monthly_sales_performance + where partition_date = '{month_to_fetch}' + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + ``` + +## 2. Create a statically partitioned asset + +Using known defined partitions is a simple way to break up your dataset when you know the different groups you want to subset it by. In our pipeline we want to create a an asset that is the performance of each product category. + +1. To create the static defined partition for the product category paste this code beneath the `monthly_sales_performance` asset: + + + +2. Now that the partition has been defined we can use that in an asset that calculates the product category performance. + +```python +@dg.asset( + deps=[joined_data], + partitions_def=product_category_partition, + group_name="analysis", + compute_kind="duckdb", +) +def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource): + product_category_str = context.partition_key + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists product_performance ( + product_category varchar, + product_name varchar, + total_dollar_amount double, + total_units_sold double + ); + + delete from product_performance where product_category = '{product_category_str}'; + + insert into product_performance + select + '{product_category_str}' as product_category, + product_name, + sum(dollar_amount) as total_dollar_amount, + sum(quantity) as total_units_sold + from joined_data + where category = '{product_category_str}' + group by '{product_category_str}', product_name; + """ + ) + preview_query = f"select * from product_performance where product_category = '{product_category_str}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + SELECT COUNT(*) + FROM product_performance + WHERE product_category = '{product_category_str}'; + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) +``` + + + +## 4. Materialize partitioned assets + +Now that we have our partitioned assets lets add them to our definitions object. + +Your definitions object should look like this: + +```python +defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + monthly_sales_performance, + product_performance, + ], + asset_checks=[missing_dimension_check], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, +) +``` + +To materialize these assets : +1. Navigate to the assets page. +2. Reload definitions. +3. Select the `monthly_performance` asset then Materialize selected. +4. Ensure all partitions are selected, then launch backfill. +5. Select the `product_performance` asset then Materialize selected. +6. Ensure all partitions are selected, then launch backfill. + +## Next Steps + +Now that we have the main assets in our ETL pipeline, its time to add [automation to our pipeline](/tutorial/automating-your-pipeline) diff --git a/docs/docs-beta/docs/tutorial/06-automating-your-pipeline.md b/docs/docs-beta/docs/tutorial/06-automating-your-pipeline.md new file mode 100644 index 0000000000000..4e1e6882811f9 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/06-automating-your-pipeline.md @@ -0,0 +1,55 @@ +--- +title: Automating your pipeline +description: Set schedules and utilize asset based automation +last_update: + author: Alex Noonan +--- + +There are several ways to automate pipelines and assets [in Dagster](/guides/automate). + +In this step you will: + +- Add automation to assets to run when upstream assets are materialized +- Create a schedule to run a set of assets on a cron schedule + +## 1. Automating asset materialization + +Ideally, the reporting assets created in the last step should refresh whenever the upstream data is updated. This can be done simply using [declarative automation](/guides/automate/declarative-automation) and adding an automation condition to the asset definition. + +Update the `monthly_sales_performance` asset to have the automation condition in the decorator: + + + +Do the same thing for `product_performance`: + + + +## 2. Scheduled Jobs + +CRON based schedules are common in data orchestration. For our pipeline, assume updated csv's get dropped into a file location every week at a specified time by an external process. We want to have a job that runs the pipeline and materialize the asset. Since we already defined the performance assets to materialize using the eager condition, When the upstream data is updated the entire pipeline will refresh. + +Copy the following code underneath the `product performance` asset: + + + +## 3. Running the entire pipeline + +With automation in Dagster the final step is to turn on the automations in the UI. + +To accomplish this: +1. Navigate to the Automation page. +2. Select all the automations. +3. Using actions, start all automations. +4. Select the `analysis_update_job` +5. Test Schedule and evaluate for any time in the drop down. +6. Open in Launchpad + +The job is now executing. + +Additionally if you navigate to the runs tab you will see that materializations for `monthly_sales_performance` and `product_performance` have ran as well. + + ![2048 resolution](/images/tutorial/etl-tutorial/automation-final.png) + +## Next steps + +- Continue this tutorial with adding a [sensor based asset](/tutorial/creating-a-sensor-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/07-creating-a-sensor-asset.md b/docs/docs-beta/docs/tutorial/07-creating-a-sensor-asset.md new file mode 100644 index 0000000000000..279c6d0edf2d3 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/07-creating-a-sensor-asset.md @@ -0,0 +1,71 @@ +--- +title: Event Driven Assets +description: Use sensors to create event driven pipelines +last_update: + author: Alex Noonan +--- + +[Sensors](/guides/automate/sensors) in Dagster are a powerful tool for automating workflows based on external events or conditions. They allow you to trigger jobs when specific criteria are met, making them essential for event-driven automation. + +Event driven automations to support situations where jobs occur at irregular cadences or in rapid succession. is the building block in Dagster you can use to support this. + +Consider using sensors in the following situations: +- **Event-Driven Workflows**: When your workflow depends on external events, such as the arrival of a new data file or a change in an API response. +- **Conditional Execution**: When you want to execute jobs only if certain conditions are met, reducing unnecessary computations. +- **Real-Time Processing**: When you need to process data as soon as it becomes available, rather than waiting for a scheduled time. + +In this step you will: + +- Create an asset the runs based on a event driven workflow +- Create a sensor to listen for conditions to materialize the asset. + +## 1. Event Driven Asset + +For our pipeline, we want to model a situation where an executive wants a pivot table report of sales results by department and product. They want that processed in real time from their request and it isn't a high priority to build the reporting to have this available and refreshing. + +For this asset we need to define the structure of the request that it is expecting in the materialization context. + +Other than that defining this asset is the same as our previous assets. Copy the following code beneath `product_performance` + + + +## 2. Build the Sensor + +To define a sensor in Dagster, you use the `@sensor` decorator. This decorator is applied to a function that evaluates whether the conditions for triggering a job are met. Here's a basic example: + +sensors include the following elements: + +- **Job**: The job that the sensor will trigger when the conditions are met. +- **RunRequest**: An object that specifies the configuration for the job run. It includes a `run_key` to ensure idempotency and a `run_config` for job-specific settings. + + + + +## 3. Materialize the sensor asset + +1. Update definitions object to the following: + + + +2. Reload definitions + +3. Navigate to the Automation Page + +4. Turn on the `automation_request_sensor` + +5. Click on the `automation_request_sensor` details. + + ![2048 resolution](/images/tutorial/etl-tutorial/sensor-evaluation.png) + +6. Add `request.json` from the `sample_request` folder to `requests` folder + +7. Click on the green tick to see the run for this request. + + ![2048 resolution](/images/tutorial/etl-tutorial/sensor-asset-run.png) + + +## Next Steps + +Now that we have our complete project, the next step is to refactor the project into more a more manageable structure so we can add to it as needed. + +Finish the tutorial with [refactoring the project](/tutorial/refactoring-the-project) \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/08-refactoring-the-project.md b/docs/docs-beta/docs/tutorial/08-refactoring-the-project.md new file mode 100644 index 0000000000000..96bccc6508de7 --- /dev/null +++ b/docs/docs-beta/docs/tutorial/08-refactoring-the-project.md @@ -0,0 +1,85 @@ +--- +title: Refactoring the Project +description: Refactor the completed project into a structure that is more organized and scalable. +last_update: + author: Alex Noonan +--- + +Many engineers generally leave something alone once its working as expected. But the first time you do something is rarely the best implementation of a use case and all projects benefit from incremental improvements. + +## Splitting up project structure + +Right now the project is contained within one definitions file. This has gotten kinda unwieldy and if we were to add more to the project it would only get more disorganized. So we're going to create separate files for all the different Dagster core concepts: + +- Assets +- schedules +- sensors +- partitions + +The final project structure should look like this: +``` +dagster-etl-tutorial/ +├── data/ +│ └── products.csv +│ └── sales_data.csv +│ └── sales_reps.csv +│ └── sample_request/ +│ └── request.json +├── etl_tutorial/ +│ └── assets.py +│ └── definitions.py +│ └── partitions.py +│ └── schedules.py +│ └── sensors.py +├── pyproject.toml +├── setup.cfg +├── setup.py +``` + +### Assets + +Assets make up a majority of our project and this will be our largest file. + + + +### Schedules + +The schedules file will only contain the `weekly_update_schedule`. + + + +### Sensors + +The sensors file will have the job and sensor for the `adhoc_request` asset. + + + +## Adjusting definitions object + +Now that we have separate files we need to adjust how the different elements are adding to definitions since they are in separate files + +1. Imports + +The Dagster project runs from the root directory so whenever you are doing file references you need to have that as the starting point. + +Additionally, Dagster has functions to load all the assets `load_assets_from_modules` and asset checks `load_asset_checks_from_modules` from a module. + +2. Definitions + +To bring our project together copy the following code into your `definitions.py` file: + + + +## Quick Validation + +If you want to validate that your definitions file loads and validates you can run the `dagster definitions validate` in the same directory that you would run `dagster dev`. This command is useful for CI/CD pipelines and allows you to check that your project loads correctly without starting the webserver. + +## Thats it! + +Congratulations! You have completed your first project with Dagster and have an example of how to use the building blocks to build your own data pipelines. + +## Recommended next steps + +- Join our [Slack community](https://dagster.io/slack). +- Continue learning with [Dagster University](https://courses.dagster.io/) courses. +- Start a [free trial of Dagster+](https://dagster.cloud/signup) for your own project. \ No newline at end of file diff --git a/docs/docs-beta/docs/tutorial/tutorial-etl.md b/docs/docs-beta/docs/tutorial/tutorial-etl.md deleted file mode 100644 index 44e430d6c0b76..0000000000000 --- a/docs/docs-beta/docs/tutorial/tutorial-etl.md +++ /dev/null @@ -1,62 +0,0 @@ ---- -title: Build an ETL Pipeline -description: Learn how to build an ETL pipeline with Dagster -last_update: - date: 2024-08-10 - author: Pedram Navid ---- - -# Build your first ETL pipeline - -Welcome to this hands-on tutorial where you'll learn how to build an ETL pipeline with Dagster while exploring key parts of Dagster. -If you haven't already, complete the [Quick Start](/getting-started/quickstart) tutorial to get familiar with Dagster. - -## What you'll learn - -- Setting up a Dagster project with the recommended project structure -- Creating Assets and using Resources to connect to external systems -- Adding metadata to your assets -- Building dependencies between assets -- Running a pipeline by materializing assets -- Adding schedules, sensors, and partitions to your assets - -## Step 1: Set up your Dagster environment - -First, set up a new Dagster project. - -1. Open your terminal and create a new directory for your project: - - ```bash title="Create a new directory" - mkdir dagster-etl-tutorial - cd dagster-etl-tutorial - ``` - -2. Create a virtual environment and activate it: - - ```bash title="Create a virtual environment" - python -m venv venv - source venv/bin/activate - # On Windows, use `venv\Scripts\activate` - ``` - -3. Install Dagster and the required dependencies: - - ```bash title="Install Dagster and dependencies" - pip install dagster dagster-webserver pandas - ``` - -## What you've learned - -Congratulations! You've just built and run your first ETL pipeline with Dagster. You've learned how to: - -- Set up a Dagster project -- Define Software-Defined Assets for each step of your ETL process -- Use Dagster's UI to run and monitor your pipeline - -## Next steps - -To expand on this tutorial, you could: - -- Add more complex transformations -- Implement error handling and retries -- Create a schedule to run your pipeline periodically diff --git a/docs/docs-beta/sidebars.ts b/docs/docs-beta/sidebars.ts index e11e287199fde..02889b01dab40 100644 --- a/docs/docs-beta/sidebars.ts +++ b/docs/docs-beta/sidebars.ts @@ -11,7 +11,15 @@ const sidebars: SidebarsConfig = { type: 'category', label: 'Tutorial', collapsed: false, - items: ['tutorial/tutorial-etl', 'tutorial/multi-asset-integration'], + items: ['tutorial/etl-tutorial-introduction', + 'tutorial/create-and-materialize-assets', + 'tutorial/create-and-materialize-a-downstream-asset', + 'tutorial/ensuring-data-quality-with-asset-checks', + 'tutorial/create-and-materialize-partitioned-asset', + 'tutorial/automating-your-pipeline', + 'tutorial/creating-a-sensor-asset', + 'tutorial/refactoring-the-project', + 'tutorial/multi-asset-integration'], }, { type: 'category', diff --git a/docs/docs-beta/src/theme/MDXComponents.tsx b/docs/docs-beta/src/theme/MDXComponents.tsx index d26d4fe3c81ca..d780d971c00ac 100644 --- a/docs/docs-beta/src/theme/MDXComponents.tsx +++ b/docs/docs-beta/src/theme/MDXComponents.tsx @@ -1,6 +1,6 @@ // Import the original mapper import MDXComponents from '@theme-original/MDXComponents'; -import { PyObject } from '../components/PyObject'; +import {PyObject} from '../components/PyObject'; import CodeExample from '../components/CodeExample'; import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; diff --git a/docs/docs-beta/src/theme/NotFound/Content/index.tsx b/docs/docs-beta/src/theme/NotFound/Content/index.tsx index b83cdc0b20424..cc2a672d172c6 100644 --- a/docs/docs-beta/src/theme/NotFound/Content/index.tsx +++ b/docs/docs-beta/src/theme/NotFound/Content/index.tsx @@ -25,7 +25,7 @@ export default function NotFoundContent({className}: Props): JSX.Element {
Welcome to Dagster Build your first Dagster project - Build your first ETL pipeline + Build your first ETL pipeline
diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png new file mode 100644 index 0000000000000..900695edb3b50 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png new file mode 100644 index 0000000000000..375965a274b60 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png new file mode 100644 index 0000000000000..35abdc8e450de Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png new file mode 100644 index 0000000000000..c6abbfc605ed7 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png new file mode 100644 index 0000000000000..761a69994038d Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png new file mode 100644 index 0000000000000..f2617f7251270 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png differ diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py index a2c5b7bb397fe..0c0202cae210d 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py @@ -6,25 +6,6 @@ import dagster as dg -def query_to_markdown(conn, query, limit=10): - """Performs SQL query and converts result to markdown. - - Args: - conn(DuckDBPyConnection) connection to duckdb - query (str): query to run against duckdb connection - limit (int): maximum number of rows to render to markdown - - Returns: - Markdown representation of query result - - """ - result = conn.execute(query).fetchdf() - if result.empty: - return "No results found." - - return result.head(limit).to_markdown(index=False) - - @dg.asset( compute_kind="duckdb", group_name="ingestion", @@ -40,14 +21,14 @@ def products(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from products limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from products").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -67,14 +48,14 @@ def sales_reps(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from sales_reps limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from sales_reps").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -93,14 +74,14 @@ def sales_data(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "SELECT * FROM sales_data LIMIT 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from sales_data").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -137,14 +118,15 @@ def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from joined_data limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute("select count(*) from joined_data").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -167,7 +149,7 @@ def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult: ) -# datetime partitions & automaterializations +# datetime partitions monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2024-01-01") @@ -208,7 +190,7 @@ def monthly_sales_performance( ) preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute( f""" select count(*) @@ -221,7 +203,7 @@ def monthly_sales_performance( return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -266,7 +248,7 @@ def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResourc """ ) preview_query = f"select * from product_performance where product_category = '{product_category_str}';" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute( f""" SELECT COUNT(*) @@ -279,20 +261,14 @@ def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResourc return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) -analysis_assets = dg.AssetSelection.keys("joined_data").upstream() - -analysis_update_job = dg.define_asset_job( - name="analysis_update_job", - selection=analysis_assets, -) - weekly_update_schedule = dg.ScheduleDefinition( - job=analysis_update_job, + name="analysis_update_job", + target=dg.AssetSelection.keys("joined_data").upstream(), cron_schedule="0 0 * * 1", # every Monday at midnight ) @@ -329,9 +305,11 @@ def adhoc_request( """ with duckdb.get_connection() as conn: - preview_md = query_to_markdown(conn, query) + preview_df = conn.execute(query).fetchdf() - return dg.MaterializeResult(metadata={"preview": dg.MetadataValue.md(preview_md)}) + return dg.MaterializeResult( + metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))} + ) adhoc_request_job = dg.define_asset_job( @@ -385,10 +363,11 @@ def adhoc_request_sensor(context: dg.SensorEvaluationContext): joined_data, monthly_sales_performance, product_performance, + adhoc_request, ], asset_checks=[missing_dimension_check], schedules=[weekly_update_schedule], - jobs=[analysis_update_job, adhoc_request_job], + jobs=[adhoc_request_job], sensors=[adhoc_request_sensor], resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, ) diff --git a/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py b/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py index a05269e82e0fc..f3b4218dfbd05 100644 --- a/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py +++ b/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py @@ -1,6 +1,5 @@ import dagster as dg -from dagster_duckdb import DuckDBResource defs = dg.Definitions( - resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + resources={}, )