diff --git a/docs/docs-beta/docs/dagster-plus/deployment/code-locations/index.md b/docs/docs-beta/docs/dagster-plus/deployment/code-locations/index.md index facde90edf2aa..5162ebfce4e54 100644 --- a/docs/docs-beta/docs/dagster-plus/deployment/code-locations/index.md +++ b/docs/docs-beta/docs/dagster-plus/deployment/code-locations/index.md @@ -13,7 +13,7 @@ This guide will cover three options for adding a new code location:
Prerequisites -1. An existing Dagster project. Refer to the [recommended project structure](/tutorial/create-new-project) and [code requirements](/dagster-plus/deployment/code-requirements) pages for more information. +1. An existing Dagster project. Refer to the [recommended project structure](/guides/build/project-structure) and [code requirements](/dagster-plus/deployment/code-requirements) pages for more information. 2. Editor, Admin, or Organization Admin permissions in Dagster+. diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/automate-your-pipeline.md b/docs/docs-beta/docs/etl-pipeline-tutorial/automate-your-pipeline.md new file mode 100644 index 0000000000000..e58b186e52886 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/automate-your-pipeline.md @@ -0,0 +1,56 @@ +--- +title: Automate your pipeline +description: Set schedules and utilize asset based automation +last_update: + author: Alex Noonan +sidebar_position: 60 +--- + +There are several ways to automate pipelines and assets [in Dagster](/guides/automate). + +In this step you will: + +- Add automation to assets to run when upstream assets are materialized. +- Create a schedule to run a set of assets on a cron schedule. + +## 1. Automate asset materialization + +Ideally, the reporting assets created in the last step should refresh whenever the upstream data is updated. Dagster's [declarative automation](/guides/automate/declarative-automation) framework allows you do this by adding an automation condition to the asset definition. + +Update the `monthly_sales_performance` asset to add the automation condition to the decorator: + + + +Do the same thing for `product_performance`: + + + +## 2. Scheduled jobs + +Cron-based schedules are common in data orchestration. For our pipeline, assume that updated CSVs are uploaded to a file location at a specific time every week by an external process. + +Copy the following code underneath the `product performance` asset: + + + +## 3. Enable and test automations + +The final step is to enable the automations in the UI. + +To accomplish this: +1. Navigate to the Automation page. +2. Select all automations. +3. Using actions, start all automations. +4. Select the `analysis_update_job`. +5. Test the schedule and evaluate for any time in the dropdown menu. +6. Open in Launchpad. + +The job is now executing. + +Additionally, if you navigate to the Runs tab, you should see that materializations for `monthly_sales_performance` and `product_performance` have run as well. + + ![2048 resolution](/images/tutorial/etl-tutorial/automation-final.png) + +## Next steps + +- Continue this tutorial with adding a [sensor based asset](create-a-sensor-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/create-a-sensor-asset.md b/docs/docs-beta/docs/etl-pipeline-tutorial/create-a-sensor-asset.md new file mode 100644 index 0000000000000..890e9c0c14159 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/create-a-sensor-asset.md @@ -0,0 +1,69 @@ +--- +title: Create a sensor asset +description: Use sensors to create event driven pipelines +last_update: + author: Alex Noonan +sidebar_position: 70 +--- + +[Sensors](/guides/automate/sensors) allow you to automate workflows based on external events or conditions, making them useful for event-driven automation, especially in situations where jobs occur at irregular cadences or in rapid succession. + +Consider using sensors in the following situations: +- **Event-driven workflows**: When your workflow depends on external events, such as the arrival of a new data file or a change in an API response. +- **Conditional execution**: When you want to execute jobs only if certain conditions are met, reducing unnecessary computations. +- **Real-time processing**: When you need to process data as soon as it becomes available, rather than waiting for a scheduled time. + +In this step you will: + +- Create an asset that runs based on a event-driven workflow +- Create a sensor to listen for conditions to materialize the asset + +## 1. Create an event-driven asset + +For our pipeline, we want to model a situation where an executive wants a pivot table report of sales results by department and product. They want that processed in real time from their request. + +For this asset, we need to define the structure of the request that it is expecting in the materialization context. + +Other than that, defining this asset is the same as our previous assets. Copy the following code beneath `product_performance`. + + + +## 2. Build the sensor + +To define a sensor in Dagster, use the `@sensor` decorator. This decorator is applied to a function that evaluates whether the conditions for triggering a job are met. + +Sensors include the following elements: + +- **Job**: The job that the sensor will trigger when the conditions are met. +- **RunRequest**: An object that specifies the configuration for the job run. It includes a `run_key` to ensure idempotency and a `run_config` for job-specific settings. + + + +## 3. Materialize the sensor asset + +1. Update your Definitions object to the following: + + + +2. Reload your Definitions. + +3. Navigate to the Automation page. + +4. Turn on the `automation_request_sensor`. + +5. Click on the `automation_request_sensor` details. + + ![2048 resolution](/images/tutorial/etl-tutorial/sensor-evaluation.png) + +6. Add `request.json` from the `sample_request` folder to `requests` folder. + +7. Click on the green tick to see the run for this request. + + ![2048 resolution](/images/tutorial/etl-tutorial/sensor-asset-run.png) + + +## Next steps + +Now that we have our complete project, the next step is to refactor the project into more a more manageable structure so we can add to it as needed. + +Finish the tutorial by [refactoring your project](refactor-your-project). \ No newline at end of file diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-a-downstream-asset.md b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-a-downstream-asset.md new file mode 100644 index 0000000000000..ddc173fb71447 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-a-downstream-asset.md @@ -0,0 +1,43 @@ +--- +title: Create and materialize a downstream asset +description: Reference Assets as dependencies to other assets +last_update: + author: Alex Noonan +sidebar_position: 30 +--- + +Now that we have the raw data loaded into DuckDB, we need to create a [downstream asset](/guides/build/create-asset-pipelines/assets-concepts/asset-dependencies) that combines the upstream assets together. In this step, you will: + +- Create a downstream asset +- Materialize that asset + +## 1. Create a downstream asset + +Now that we have all of our raw data loaded into DuckDB, our next step is to merge it together in a view composed of data from all three source tables. + +To accomplish this in SQL, we will bring in our `sales_data` table and then left join on `sales_reps` and `products` on their respective id columns. Additionally, we will keep this view concise and only have relevant columns for analysis. + +As you can see, the new `joined_data` asset looks a lot like our previous ones, with a few small changes. We put this asset into a different group. To make this asset dependent on the raw tables, we add the asset keys to the `deps` parameter in the asset definition. + + + +## 2. Materialize the asset + +1. Add the joined_data asset to the Definitions object + + ```python + defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + ], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +2. In the Dagster UI, reload definitions and materialize the `joined_data` asset. + +## Next steps + +- Continue this tutorial with by [creating and materializing a partitioned asset](ensure-data-quality-with-asset-checks). \ No newline at end of file diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-assets.md b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-assets.md new file mode 100644 index 0000000000000..b04eba4863c4e --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-assets.md @@ -0,0 +1,108 @@ +--- +title: Create and materialize assets +description: Load project data and create and materialize assets +last_update: + author: Alex Noonan +sidebar_position: 20 +--- + + +In the first step of the tutorial, you created your Dagster project with the raw data files. In this step, you will: +- Create your initial Definitions object +- Add a DuckDB resource +- Build software-defined assets +- Materialize your assets + +## 1. Create a definitions object + +In Dagster, the [Definitions API docs](/todo) object is where you define and organize various components within your project, such as assets and resources. + +Open the `definitions.py` file in the `etl_tutorial` directory and copy the following code into it: + + ```python + import json + import os + + from dagster_duckdb import DuckDBResource + + import dagster as dg + + defs = dg.Definitions( + assets=[], + resources={}, + ) + ``` + +## 2. Define the DuckDB resource + +In Dagster, [Resources API docs](/todo) are the external services, tools, and storage backends you need to do your job. For the storage backend in this project, we'll use [DuckDB](https://duckdb.org/), a fast, in-process SQL database that runs inside your application. We'll define it once in the definitions object, making it available to all assets and objects that need it. + + ```python + defs = dg.Definitions( + assets=[], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +## 3. Create assets + +Software defined [assets API docs](/todo) are the main building blocks in Dagster. An asset is composed of three components: +1. Asset key or unique identifier. +2. An op which is a function that is invoked to produce the asset. +3. Upstream dependencies that the asset depends on. + +You can read more about our philosophy behind the [asset centric approach](https://dagster.io/blog/software-defined-assets). + +### Products asset + +First, we will create an asset that creates a DuckDB table to hold data from the products CSV. This asset takes the `duckdb` resource defined earlier and returns a `MaterializeResult` object. +Additionally, this asset contains metadata in the `@dg.asset` decorator parameters to help categorize the asset, and in the `return` block to give us a preview of the asset in the Dagster UI. + +To create this asset, open the `definitions.py` file and copy the following code into it: + + + +### Sales reps asset + +The code for the sales reps asset is similar to the product asset code. In the `definitions.py` file, copy the following code below the product asset code: + + + +### Sales data asset + +To add the sales data asset, copy the following code into your `definitions.py` file below the sales reps asset: + + + +## 4. Add assets to the definitions object + +Now to pull these assets into our Definitions object. Adding them to the Definitions object makes them available to the Dagster project. Add them to the empty list in the assets parameter. + + ```python + defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + ], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + ) + ``` + +## 5. Materialize assets + +To materialize your assets: +1. In a browser, navigate to the URL of the Dagster server that yous started earlier. +2. Navigate to **Deployment**. +3. Click Reload definitions. +4. Click **Assets**, then click "View global asset lineage" to see all of your assets. + + ![2048 resolution](/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png) + +5. Click materialize all. +6. Navigate to the runs tab and select the most recent run. Here you can see the logs from the run. + ![2048 resolution](/images/tutorial/etl-tutorial/first-asset-run.png) + + +## Next steps + +- Continue this tutorial with your [asset dependencies](create-and-materialize-a-downstream-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-partitioned-asset.md b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-partitioned-asset.md new file mode 100644 index 0000000000000..d72bbb0811743 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/create-and-materialize-partitioned-asset.md @@ -0,0 +1,180 @@ +--- +title: Create and materialize partitioned assets +description: Partitioning Assets by datetime and categories +last_update: + date: 2024-11-25 + author: Alex Noonan +sidebar_position: 50 +--- + +[Partitions](/guides/build/create-asset-pipelines/partitioning) are a core abstraction in Dagster, that allow you to manage large datasets, process incremental updates, and improve pipeline performance. You can partition assets the following ways: + +- Time-based: Split data by time periods (e.g., daily, monthly) +- Category-based: Divide by known categories (e.g., country, product type) +- Two-dimensional: Combine two partition types (e.g., country + date) +- Dynamic: Create partitions based on runtime conditions + +In this step, you will: + +- Create a time-based asset partitioned by month +- Create a category-based asset partitioned by product category + +## 1. Create a time-based partitioned asset + +Dagster natively supports partitioning assets by datetime groups. We want to create an asset that calculates the monthly performance for each sales rep. To create the monthly partition copy the following code below the `missing_dimension_check` asset check. + + + +Partition data are accessed within an asset by context. We want to create an asset that does this calculation for a given month from the partition + and deletes any previous value for that month. Copy the following asset under the `monthly_partition` we just created. + + ```python + @dg.asset( + partitions_def=monthly_partition, + compute_kind="duckdb", + group_name="analysis", + deps=[joined_data], + ) + def monthly_sales_performance( + context: dg.AssetExecutionContext, duckdb: DuckDBResource + ): + partition_date_str = context.partition_key + month_to_fetch = partition_date_str[:-3] + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists monthly_sales_performance ( + partition_date varchar, + rep_name varchar, + product varchar, + total_dollar_amount double + ); + + delete from monthly_sales_performance where partition_date = '{month_to_fetch}'; + + insert into monthly_sales_performance + select + '{month_to_fetch}' as partition_date, + rep_name, + product_name, + sum(dollar_amount) as total_dollar_amount + from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}' + group by '{month_to_fetch}', rep_name, product_name; + """ + ) + + preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + select count(*) + from monthly_sales_performance + where partition_date = '{month_to_fetch}' + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + ``` + +## 2. Create a category-based partitioned asset + +Using known defined partitions is a simple way to break up your dataset when you know the different groups you want to subset it by. In our pipeline, we want to create an asset that represents the performance of each product category. + +1. To create the statically-defined partition for the product category, copy this code beneath the `monthly_sales_performance` asset: + + + +2. Now that the partition has been defined, we can use that in an asset that calculates the product category performance: + +```python +@dg.asset( + deps=[joined_data], + partitions_def=product_category_partition, + group_name="analysis", + compute_kind="duckdb", +) +def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource): + product_category_str = context.partition_key + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists product_performance ( + product_category varchar, + product_name varchar, + total_dollar_amount double, + total_units_sold double + ); + + delete from product_performance where product_category = '{product_category_str}'; + + insert into product_performance + select + '{product_category_str}' as product_category, + product_name, + sum(dollar_amount) as total_dollar_amount, + sum(quantity) as total_units_sold + from joined_data + where category = '{product_category_str}' + group by '{product_category_str}', product_name; + """ + ) + preview_query = f"select * from product_performance where product_category = '{product_category_str}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + SELECT COUNT(*) + FROM product_performance + WHERE product_category = '{product_category_str}'; + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) +``` + + + +## 4. Materialize partitioned assets + +Now that we have our partitioned assets, let's add them to our Definitions object: + +Your Definitions object should look like this: + +```python +defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + monthly_sales_performance, + product_performance, + ], + asset_checks=[missing_dimension_check], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, +) +``` + +To materialize these assets: +1. Navigate to the assets page. +2. Reload definitions. +3. Select the `monthly_performance` asset, then **Materialize selected**. +4. Ensure all partitions are selected, then launch a backfill. +5. Select the `product_performance` asset, then **Materialize selected**. +6. Ensure all partitions are selected, then launch a backfill. + +## Next steps + +Now that we have the main assets in our ETL pipeline, its time to add [automation to our pipeline](automate-your-pipeline) diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/ensure-data-quality-with-asset-checks.md b/docs/docs-beta/docs/etl-pipeline-tutorial/ensure-data-quality-with-asset-checks.md new file mode 100644 index 0000000000000..8982286b9ee53 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/ensure-data-quality-with-asset-checks.md @@ -0,0 +1,52 @@ +--- +title: Ensure data quality with asset checks +description: Ensure assets are correct with asset checks +last_update: + author: Alex Noonan +sidebar_position: 40 +--- + +Data quality is critical in data pipelines. Inspecting individual assets ensures that data quality issues are caught before they affect the entire pipeline. + +In Dagster, you define [asset checks](/guides/test/asset-checks) like you define assets. Asset checks run when an asset is materialized. In this step you will: + +- Define an asset check +- Execute that asset check in the UI + +## 1. Define an asset check + +In this case we want to create a check to identify if there are any rows in `joined_data` that are missing a value for `rep_name` or `product_name`. + +Copy the following code beneath the `joined_data` asset. + + + +## 2. Run the asset check + +Before you can run the asset check, you need to add it to the Definitions object. Like assets, asset checks are added to their own list. + +Your Definitions object should look like this now: + +```python +defs = dg.Definitions( + assets=[products, + sales_reps, + sales_data, + joined_data, + ], + asset_checks=[missing_dimension_check], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, +) +``` +Asset checks will run when an asset is materialized, but asset checks can also be executed manually in the UI: + +1. Reload your Definitions. +2. Navigate to the Asset Details page for the `joined_data` asset. +3. Select the "Checks" tab. +4. Click the **Execute** button for `missing_dimension_check`. + + ![2048 resolution](/images/tutorial/etl-tutorial/asset-check.png) + +## Next steps + +- Continue this tutorial with [Asset Checks](create-and-materialize-partitioned-asset) \ No newline at end of file diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/index.md b/docs/docs-beta/docs/etl-pipeline-tutorial/index.md new file mode 100644 index 0000000000000..7906885d3c72f --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/index.md @@ -0,0 +1,137 @@ +--- +title: Build an ETL Pipeline +description: Learn how to build an ETL pipeline with Dagster +last_update: + author: Alex Noonan +sidebar_position: 10 +sidebar_class_name: hidden +--- + +# Build your first ETL pipeline + +In this tutorial, you'll build an ETL pipeline with Dagster that: + +- Imports sales data to DuckDB +- Transforms data into reports +- Runs scheduled reports automatically +- Generates one-time reports on demand + +## You will learn to: + +- Set up a Dagster project with the recommended project structure +- Create and materialize assets +- Create and materialize dependant assets +- Ensure data quality with asset checks +- Create and materialize partitioned assets +- Automate the pipeline +- Create and materialize a sensor asset +- Refactor your project when it becomes more complex + +
+ Prerequisites + +To follow the steps in this guide, you'll need: + +- Basic Python knowledge +- Python 3.9+ installed on your system. Refer to the [Installation guide](/getting-started/installation) for information. +- Familiarity with SQL and Python data manipulation libraries, such as Pandas. +- Understanding of data pipelines and the extract, transform, and load process. +
+ + +## Step 1: Set up your Dagster environment + +First, set up a new Dagster project. + +1. Open your terminal and create a new directory for your project: + + ```bash + mkdir dagster-etl-tutorial + cd dagster-etl-tutorial + ``` + +2. Create and activate a virtual environment: + + + + ```bash + python -m venv dagster_tutorial + source dagster_tutorial/bin/activate + ``` + + + ```bash + python -m venv dagster_tutorial + dagster_tutorial\Scripts\activate + ``` + + + +3. Install Dagster and the required dependencies: + + ```bash + pip install dagster dagster-webserver pandas dagster-duckdb + ``` + +## Step 2: Create the Dagster project structure + +Run the following command to create the project directories and files for this tutorial: + + ```bash + dagster project from-example --example getting_started_etl_tutorial + ``` + +Your project should have this structure: +{/* vale off */} +``` +dagster-etl-tutorial/ +├── data/ +│ └── products.csv +│ └── sales_data.csv +│ └── sales_reps.csv +│ └── sample_request/ +│ └── request.json +├── etl_tutorial/ +│ └── definitions.py +├── pyproject.toml +├── setup.cfg +├── setup.py +``` +{/* vale on */} + +:::info +Dagster has several example projects you can install depending on your use case. To see the full list, run `dagster project list-examples`. For more information on the `dagster project` command, see the [API documentation](https://docs-preview.dagster.io/api/cli#dagster-project). +::: + +### Dagster project structure + +#### dagster-etl-tutorial root directory + +In the `dagster-etl-tutorial` root directory, there are three configuration files that are common in Python package management. These files manage dependencies and identify the Dagster modules in the project. +| File | Purpose | +|------|---------| +| pyproject.toml | This file is used to specify build system requirements and package metadata for Python projects. It is part of the Python packaging ecosystem. | +| setup.cfg | This file is used for configuration of your Python package. It can include metadata about the package, dependencies, and other configuration options. | +| setup.py | This script is used to build and distribute your Python package. It is a standard file in Python projects for specifying package details. | + +#### etl_tutorial directory + +This is the main directory where you will define your assets, jobs, schedules, sensors, and resources. +| File | Purpose | +|------|---------| +| definitions.py | This file is typically used to define jobs, schedules, and sensors. It organizes the various components of your Dagster project. This allows Dagster to load the definitions in a module. | + +#### data directory + +The data directory contains the raw data files for the project. We will reference these files in our software-defined assets in the next step of the tutorial. + +## Step 3: Launch the Dagster webserver + +To make sure Dagster and its dependencies were installed correctly, navigate to the project root directory and start the Dagster webserver:" + +followed by a bash code snippet for `dagster dev` + + +## Next steps + +- Continue this tutorial by [creating and materializing assets](create-and-materialize-assets) diff --git a/docs/docs-beta/docs/etl-pipeline-tutorial/refactor-your-project.md b/docs/docs-beta/docs/etl-pipeline-tutorial/refactor-your-project.md new file mode 100644 index 0000000000000..540bf7316ab47 --- /dev/null +++ b/docs/docs-beta/docs/etl-pipeline-tutorial/refactor-your-project.md @@ -0,0 +1,83 @@ +--- +title: Refactor your project +description: Refactor your completed project into a structure that is more organized and scalable. +last_update: + author: Alex Noonan +sidebar_position: 80 +--- + +Many engineers generally leave something alone once it's working as expected. But the first time you do something is rarely the best implementation of a use case and all projects benefit from incremental improvements. + +## Splitting up project structure + +Currently, your project is contained in one definitions file. However, this file has gotten fairly complex, and adding to it would only increase its complexity. To fix that, we will create separate files for each core Dagster concept: + +- Assets +- Schedules +- Sensors +- Partitions + +The final project structure should look like this: +``` +dagster-etl-tutorial/ +├── data/ +│ └── products.csv +│ └── sales_data.csv +│ └── sales_reps.csv +│ └── sample_request/ +│ └── request.json +├── etl_tutorial/ +│ └── assets.py +│ └── definitions.py +│ └── partitions.py +│ └── schedules.py +│ └── sensors.py +├── pyproject.toml +├── setup.cfg +├── setup.py +``` + +### Assets + +Assets make up a majority of our project and this will be our largest file. + + + +### Schedules + +The schedules file will only contain the `weekly_update_schedule`. + + + +### Sensors + +The sensors file will have the job and sensor for the `adhoc_request` asset. + + + +## Refactoring the Definitions object + +Now that we have separate files, we need to adjust how the different elements are added to the Definitions object. + +:::note +The Dagster project runs from the root directory, so whenever you reference files in your project, you need to use the root as the starting point. +Additionally, Dagster has functions to load all assets and asset checks from a module (load_assets_from_modules and load_asset_checks_from_modules, respectively). +::: + +To bring your project together, copy the following code into your `definitions.py` file: + + + +## Quick validation + +To validate that your definitions file loads and validates, you can run `dagster definitions validate` in the same directory that you would run `dagster dev`. This command is useful for CI/CD pipelines and allows you to check that your project loads correctly without starting the web server. + +## Thats it! + +Congratulations! You have completed your first project with Dagster and have an example of how to use the building blocks to build your own data pipelines. + +## Recommended next steps + +- Join our [Slack community](https://dagster.io/slack). +- Continue learning with [Dagster University](https://courses.dagster.io/) courses. +- Start a [free trial of Dagster+](https://dagster.cloud/signup) for your own project. \ No newline at end of file diff --git a/docs/docs-beta/docs/getting-started/quickstart.md b/docs/docs-beta/docs/getting-started/quickstart.md index 4cf472297aa99..9465d2ecf8483 100644 --- a/docs/docs-beta/docs/getting-started/quickstart.md +++ b/docs/docs-beta/docs/getting-started/quickstart.md @@ -151,5 +151,5 @@ id,name,age,city,age_group Congratulations! You've just built and run your first pipeline with Dagster. Next, you can: -- Continue with the [ETL pipeline tutorial](/tutorial/tutorial-etl) to learn how to build a more complex ETL pipeline -- Learn how to [Think in assets](/guides/build/create-asset-pipelines/assets-concepts/index.md) +- Continue with the [ETL pipeline tutorial](/etl-pipeline-tutorial/) to learn how to build a more complex ETL pipeline +- Learn how to [Think in assets](/guides/build/create-asset-pipelines/assets-concepts/) diff --git a/docs/docs-beta/docs/integrations/guides/multi-asset-integration.md b/docs/docs-beta/docs/integrations/guides/multi-asset-integration.md index df0615e217fab..92ec6f133c47b 100644 --- a/docs/docs-beta/docs/integrations/guides/multi-asset-integration.md +++ b/docs/docs-beta/docs/integrations/guides/multi-asset-integration.md @@ -1,5 +1,269 @@ --- title: Creating a multi-asset integration +description: Create a decorator based multi-asset integration --- -{/* TODO write this */} \ No newline at end of file +When working in the Dagster ecosystem, you may have noticed that decorators are frequently used. For example, assets, jobs, and ops use decorators. If you have a service that produces many assets, it's possible to define it as a multi-asset decorator-offering a consistent and intuitive developer experience to existing Dagster APIs. + +In the context of Dagster, decorators are helpful because they often wrap some form of processing. For example, when writing an asset, you define your processing code and then annotate the function with the `asset` decorator /> decorator. Then, the internal Dagster code can register the asset, assign metadata, pass in context data, or perform any other variety of operations that are required to integrate your asset code with the Dagster platform. + +In this guide, you'll learn how to develop a multi-asset integration for a hypothetical replication tool. + +:::note +This guide assumes basic familiarity with Dagster and Python decorators. +::: + +## Step 1: Input + +For this guide, let's imagine a tool that replicates data between two databases. It's configured using a `replication.yaml` configuration file, in which a user is able to define source and destination databases, along with the tables that they would like to replicate between these systems. + +```yml +connections: + source: + type: duckdb + connection: example.duckdb + destination: + type: postgres + connection: postgresql://postgres:postgres@localhost/postgres + +tables: + - name: users + primary_key: id + - name: products + primary_key: id + - name: activity + primary_key: id +``` + +For the integration we're building, we want to provide a multi-asset that encompasses this replication process, and generates an asset for each table being replicated. + +We will define a dummy function named `replicate` that will mock the replication process, and return a dictionary with the replication status of each table. In the real world, this could be a function in a library, or a call to a command-line tool. + +```python +import yaml + +from pathlib import Path +from typing import Mapping, Iterator, Any + + +def replicate(replication_configuration_yaml: Path) -> Iterator[Mapping[str, Any]]: + data = yaml.safe_load(replication_configuration_yaml.read_text()) + for table in data.get("tables"): + # < perform replication here, and get status > + yield {"table": table.get("name"), "status": "success"} +``` + +## Step 2: Implementation + +First, let's define a `Project` object that takes in the path of our configuration YAML file. This will allow us to encapsulate the logic that gets metadata and table information from our project configuration. + +```python +import yaml +from pathlib import Path + + +class ReplicationProject(): + def __init__(self, replication_configuration_yaml: str): + self.replication_configuration_yaml = replication_configuration_yaml + + def load(self): + return yaml.safe_load(Path(self.replication_configuration_yaml).read_text()) +``` + +Next, define a function that returns a `multi_asset` function. The `multi_asset` function is a decorator itself, so this allows us to customize the behavior of `multi_asset` and create a new decorator of our own: + +```python +def custom_replication_assets( + *, + replication_project: ReplicationProject, + name: Optional[str] = None, + group_name: Optional[str] = None, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + project = replication_project.load() + + return multi_asset( + name=name, + group_name=group_name, + specs=[ + AssetSpec( + key=table.get("name"), + ) + for table in project.get("tables") + ], + ) +``` + +Let's review what this code does: + +- Defines a function that returns a `multi_asset` function +- Loads our replication project and iterates over the tables defined in the input YAML file +- Uses the tables to create a list of `AssetSpec` objects and passes them to the `specs` parameter, thus defining assets that will be visible in the Dagster UI + +Next, we'll show you how to perform the execution of the replication function. + +Recall that decorators allow us to wrap a function that performs some operation. In the case of our `multi_asset`, we defined `AssetSpec` objects for our tables, and the actual processing that takes place will be in the body of the decorated function. + +In this function, we will perform the replication, and then yield `AssetMaterialization` objects indicating that the replication was successful for a given table. + +```python +from dagster import AssetExecutionContext + + +replication_project_path = "replication.yaml" +replication_project = ReplicationProject(replication_project_path) + + +@custom_replication_assets( + replication_project=replication_project, + name="my_custom_replication_assets", + group_name="replication", +) +def my_assets(context: AssetExecutionContext): + results = replicate(Path(replication_project_path)) + for table in results: + if table.get("status") == "SUCCESS": + yield AssetMaterialization(asset_key=str(table.get("name")), metadata=table) +``` + +There are a few limitations to this approach: + +- **We have not encapsulated the logic for replicating tables.** This means that users who use the `custom_replication_assets` decorator would be responsible for yielding asset materializations themselves. +- **Users can't customize the attributes of the asset**. + +For the first limitation, we can resolve this by refactoring the code in the body of our asset function into a Dagster resource. + +## Step 3: Moving the replication logic into a resource + +Refactoring the replication logic into a resource enables us to support better configuration and re-use of our logic. + +To accomplish this, we will extend the `ConfigurableResource` object to create a custom resource. Then, we will define a `run` method that will perform the replication operation: + +```python +from dagster import ConfigurableResource +from dagster._annotations import public + + +class ReplicationResource(ConfigurableResource): + @public + def run( + self, replication_project: ReplicationProject + ) -> Iterator[AssetMaterialization]: + results = replicate(Path(replication_project.replication_configuration_yaml)) + for table in results: + if table.get("status") == "SUCCESS": + # NOTE: this assumes that the table name is the same as the asset key + yield AssetMaterialization( + asset_key=str(table.get("name")), metadata=table + ) +``` + +Now, we can refactor our `custom_replication_assets` instance to use this resource: + +```python +@custom_replication_assets( + replication_project=replication_project, + name="my_custom_replication_assets", + group_name="replication", +) +def my_assets(replication_resource: ReplicationProject): + replication_resource.run(replication_project) +``` + +## Step 4: Using translators + +At the end of [Step 2](#step-2-implementation), we mentioned that end users were unable to customize asset attributes, like the asset key, generated by our decorator. Translator classes are the recommended way of defining this logic, and they provide users with the option to override the default methods used to convert a concept from your tool (for example, a table name) to the corresponding concept in Dagster (for example, asset key). + +To start, we will define a translator method to map the table specification to a Dagster asset key. + +:::note +In a real world integration, you will want to define methods for all common attributes like dependencies, group names, and metadata. +::: + +```python +from dagster import AssetKey, _check as check + +from dataclasses import dataclass + + +@dataclass +class ReplicationTranslator: + @public + def get_asset_key(self, table_definition: Mapping[str, str]) -> AssetKey: + return AssetKey(str(table_definition.get("name"))) +``` + +Next, we'll update `custom_replication_assets` to use the translator when defining the `key` on the `AssetSpec`. + +:::note +Note that we took this opportunity to also include the replication project and translator instance on the `AssetSpec` metadata. This is a workaround that we tend to employ in this approach, as it makes it possible to define these objects once and then access them on the context of our asset. +::: + +```python +def custom_replication_assets( + *, + replication_project: ReplicationProject, + name: Optional[str] = None, + group_name: Optional[str] = None, + translator: Optional[ReplicationTranslator] = None, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + project = replication_project.load() + + translator = ( + check.opt_inst_param(translator, "translator", ReplicationTranslator) + or ReplicationTranslator() + ) + + return multi_asset( + name=name, + group_name=group_name, + specs=[ + AssetSpec( + key=translator.get_asset_key(table), + metadata={ + "replication_project": project, + "replication_translator": translator, + }, + ) + for table in project.get("tables") + ], + ) +``` + +Finally, we have to update our resource to use the translator and project provided in the metadata. We are using the `check` method provided by `dagster._check` to ensure that the type of the object is appropriate as we retrieve it from the metadata. + +Now, we can use the same `translator.get_asset_key` when yielding the asset materialization, thus ensuring that our asset declarations match our asset materializations: + +```python +class ReplicationResource(ConfigurableResource): + @public + def run(self, context: AssetExecutionContext) -> Iterator[AssetMaterialization]: + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + + project = check.inst( + first_asset_metadata.get("replication_project"), + ReplicationProject, + ) + + translator = check.inst( + first_asset_metadata.get("replication_translator"), + ReplicationTranslator, + ) + + results = replicate(Path(project.replication_configuration_yaml)) + for table in results: + if table.get("status") == "SUCCESS": + yield AssetMaterialization( + asset_key=translator.get_asset_key(table), metadata=table + ) +``` + +## Conclusion + +In this guide we walked through how to define a custom multi-asset decorator, a resource for encapsulating tool logic, and a translator for defining the logic to translate a specification to Dagster concepts. + +Defining integrations with this approach aligns nicely with the overall development paradigm of Dagster, and is suitable for tools that generate many assets. + +The code in its entirety can be seen below: + + diff --git a/docs/docs-beta/docs/tutorial/create-new-project.md b/docs/docs-beta/docs/tutorial/create-new-project.md deleted file mode 100644 index 25867d5cbee78..0000000000000 --- a/docs/docs-beta/docs/tutorial/create-new-project.md +++ /dev/null @@ -1,3 +0,0 @@ ---- -unlisted: true ---- diff --git a/docs/docs-beta/docs/tutorial/introduction.md b/docs/docs-beta/docs/tutorial/introduction.md deleted file mode 100644 index fa350b858b24b..0000000000000 --- a/docs/docs-beta/docs/tutorial/introduction.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -title: "Introduction" -description: "Welcome to the Dagster documentation! If this is your first time developing a Dagster pipeline, read through this Getting Started section to get familiar with the basics. Otherwise, feel free to explore our guides and API documentation!" -slug: introduction -hide_title: false -unlisted: true ---- diff --git a/docs/docs-beta/docs/tutorial/multi-asset-integration.md b/docs/docs-beta/docs/tutorial/multi-asset-integration.md deleted file mode 100644 index 92ec6f133c47b..0000000000000 --- a/docs/docs-beta/docs/tutorial/multi-asset-integration.md +++ /dev/null @@ -1,269 +0,0 @@ ---- -title: Creating a multi-asset integration -description: Create a decorator based multi-asset integration ---- - -When working in the Dagster ecosystem, you may have noticed that decorators are frequently used. For example, assets, jobs, and ops use decorators. If you have a service that produces many assets, it's possible to define it as a multi-asset decorator-offering a consistent and intuitive developer experience to existing Dagster APIs. - -In the context of Dagster, decorators are helpful because they often wrap some form of processing. For example, when writing an asset, you define your processing code and then annotate the function with the `asset` decorator /> decorator. Then, the internal Dagster code can register the asset, assign metadata, pass in context data, or perform any other variety of operations that are required to integrate your asset code with the Dagster platform. - -In this guide, you'll learn how to develop a multi-asset integration for a hypothetical replication tool. - -:::note -This guide assumes basic familiarity with Dagster and Python decorators. -::: - -## Step 1: Input - -For this guide, let's imagine a tool that replicates data between two databases. It's configured using a `replication.yaml` configuration file, in which a user is able to define source and destination databases, along with the tables that they would like to replicate between these systems. - -```yml -connections: - source: - type: duckdb - connection: example.duckdb - destination: - type: postgres - connection: postgresql://postgres:postgres@localhost/postgres - -tables: - - name: users - primary_key: id - - name: products - primary_key: id - - name: activity - primary_key: id -``` - -For the integration we're building, we want to provide a multi-asset that encompasses this replication process, and generates an asset for each table being replicated. - -We will define a dummy function named `replicate` that will mock the replication process, and return a dictionary with the replication status of each table. In the real world, this could be a function in a library, or a call to a command-line tool. - -```python -import yaml - -from pathlib import Path -from typing import Mapping, Iterator, Any - - -def replicate(replication_configuration_yaml: Path) -> Iterator[Mapping[str, Any]]: - data = yaml.safe_load(replication_configuration_yaml.read_text()) - for table in data.get("tables"): - # < perform replication here, and get status > - yield {"table": table.get("name"), "status": "success"} -``` - -## Step 2: Implementation - -First, let's define a `Project` object that takes in the path of our configuration YAML file. This will allow us to encapsulate the logic that gets metadata and table information from our project configuration. - -```python -import yaml -from pathlib import Path - - -class ReplicationProject(): - def __init__(self, replication_configuration_yaml: str): - self.replication_configuration_yaml = replication_configuration_yaml - - def load(self): - return yaml.safe_load(Path(self.replication_configuration_yaml).read_text()) -``` - -Next, define a function that returns a `multi_asset` function. The `multi_asset` function is a decorator itself, so this allows us to customize the behavior of `multi_asset` and create a new decorator of our own: - -```python -def custom_replication_assets( - *, - replication_project: ReplicationProject, - name: Optional[str] = None, - group_name: Optional[str] = None, -) -> Callable[[Callable[..., Any]], AssetsDefinition]: - project = replication_project.load() - - return multi_asset( - name=name, - group_name=group_name, - specs=[ - AssetSpec( - key=table.get("name"), - ) - for table in project.get("tables") - ], - ) -``` - -Let's review what this code does: - -- Defines a function that returns a `multi_asset` function -- Loads our replication project and iterates over the tables defined in the input YAML file -- Uses the tables to create a list of `AssetSpec` objects and passes them to the `specs` parameter, thus defining assets that will be visible in the Dagster UI - -Next, we'll show you how to perform the execution of the replication function. - -Recall that decorators allow us to wrap a function that performs some operation. In the case of our `multi_asset`, we defined `AssetSpec` objects for our tables, and the actual processing that takes place will be in the body of the decorated function. - -In this function, we will perform the replication, and then yield `AssetMaterialization` objects indicating that the replication was successful for a given table. - -```python -from dagster import AssetExecutionContext - - -replication_project_path = "replication.yaml" -replication_project = ReplicationProject(replication_project_path) - - -@custom_replication_assets( - replication_project=replication_project, - name="my_custom_replication_assets", - group_name="replication", -) -def my_assets(context: AssetExecutionContext): - results = replicate(Path(replication_project_path)) - for table in results: - if table.get("status") == "SUCCESS": - yield AssetMaterialization(asset_key=str(table.get("name")), metadata=table) -``` - -There are a few limitations to this approach: - -- **We have not encapsulated the logic for replicating tables.** This means that users who use the `custom_replication_assets` decorator would be responsible for yielding asset materializations themselves. -- **Users can't customize the attributes of the asset**. - -For the first limitation, we can resolve this by refactoring the code in the body of our asset function into a Dagster resource. - -## Step 3: Moving the replication logic into a resource - -Refactoring the replication logic into a resource enables us to support better configuration and re-use of our logic. - -To accomplish this, we will extend the `ConfigurableResource` object to create a custom resource. Then, we will define a `run` method that will perform the replication operation: - -```python -from dagster import ConfigurableResource -from dagster._annotations import public - - -class ReplicationResource(ConfigurableResource): - @public - def run( - self, replication_project: ReplicationProject - ) -> Iterator[AssetMaterialization]: - results = replicate(Path(replication_project.replication_configuration_yaml)) - for table in results: - if table.get("status") == "SUCCESS": - # NOTE: this assumes that the table name is the same as the asset key - yield AssetMaterialization( - asset_key=str(table.get("name")), metadata=table - ) -``` - -Now, we can refactor our `custom_replication_assets` instance to use this resource: - -```python -@custom_replication_assets( - replication_project=replication_project, - name="my_custom_replication_assets", - group_name="replication", -) -def my_assets(replication_resource: ReplicationProject): - replication_resource.run(replication_project) -``` - -## Step 4: Using translators - -At the end of [Step 2](#step-2-implementation), we mentioned that end users were unable to customize asset attributes, like the asset key, generated by our decorator. Translator classes are the recommended way of defining this logic, and they provide users with the option to override the default methods used to convert a concept from your tool (for example, a table name) to the corresponding concept in Dagster (for example, asset key). - -To start, we will define a translator method to map the table specification to a Dagster asset key. - -:::note -In a real world integration, you will want to define methods for all common attributes like dependencies, group names, and metadata. -::: - -```python -from dagster import AssetKey, _check as check - -from dataclasses import dataclass - - -@dataclass -class ReplicationTranslator: - @public - def get_asset_key(self, table_definition: Mapping[str, str]) -> AssetKey: - return AssetKey(str(table_definition.get("name"))) -``` - -Next, we'll update `custom_replication_assets` to use the translator when defining the `key` on the `AssetSpec`. - -:::note -Note that we took this opportunity to also include the replication project and translator instance on the `AssetSpec` metadata. This is a workaround that we tend to employ in this approach, as it makes it possible to define these objects once and then access them on the context of our asset. -::: - -```python -def custom_replication_assets( - *, - replication_project: ReplicationProject, - name: Optional[str] = None, - group_name: Optional[str] = None, - translator: Optional[ReplicationTranslator] = None, -) -> Callable[[Callable[..., Any]], AssetsDefinition]: - project = replication_project.load() - - translator = ( - check.opt_inst_param(translator, "translator", ReplicationTranslator) - or ReplicationTranslator() - ) - - return multi_asset( - name=name, - group_name=group_name, - specs=[ - AssetSpec( - key=translator.get_asset_key(table), - metadata={ - "replication_project": project, - "replication_translator": translator, - }, - ) - for table in project.get("tables") - ], - ) -``` - -Finally, we have to update our resource to use the translator and project provided in the metadata. We are using the `check` method provided by `dagster._check` to ensure that the type of the object is appropriate as we retrieve it from the metadata. - -Now, we can use the same `translator.get_asset_key` when yielding the asset materialization, thus ensuring that our asset declarations match our asset materializations: - -```python -class ReplicationResource(ConfigurableResource): - @public - def run(self, context: AssetExecutionContext) -> Iterator[AssetMaterialization]: - metadata_by_key = context.assets_def.metadata_by_key - first_asset_metadata = next(iter(metadata_by_key.values())) - - project = check.inst( - first_asset_metadata.get("replication_project"), - ReplicationProject, - ) - - translator = check.inst( - first_asset_metadata.get("replication_translator"), - ReplicationTranslator, - ) - - results = replicate(Path(project.replication_configuration_yaml)) - for table in results: - if table.get("status") == "SUCCESS": - yield AssetMaterialization( - asset_key=translator.get_asset_key(table), metadata=table - ) -``` - -## Conclusion - -In this guide we walked through how to define a custom multi-asset decorator, a resource for encapsulating tool logic, and a translator for defining the logic to translate a specification to Dagster concepts. - -Defining integrations with this approach aligns nicely with the overall development paradigm of Dagster, and is suitable for tools that generate many assets. - -The code in its entirety can be seen below: - - diff --git a/docs/docs-beta/docs/tutorial/tutorial-etl.md b/docs/docs-beta/docs/tutorial/tutorial-etl.md deleted file mode 100644 index 44e430d6c0b76..0000000000000 --- a/docs/docs-beta/docs/tutorial/tutorial-etl.md +++ /dev/null @@ -1,62 +0,0 @@ ---- -title: Build an ETL Pipeline -description: Learn how to build an ETL pipeline with Dagster -last_update: - date: 2024-08-10 - author: Pedram Navid ---- - -# Build your first ETL pipeline - -Welcome to this hands-on tutorial where you'll learn how to build an ETL pipeline with Dagster while exploring key parts of Dagster. -If you haven't already, complete the [Quick Start](/getting-started/quickstart) tutorial to get familiar with Dagster. - -## What you'll learn - -- Setting up a Dagster project with the recommended project structure -- Creating Assets and using Resources to connect to external systems -- Adding metadata to your assets -- Building dependencies between assets -- Running a pipeline by materializing assets -- Adding schedules, sensors, and partitions to your assets - -## Step 1: Set up your Dagster environment - -First, set up a new Dagster project. - -1. Open your terminal and create a new directory for your project: - - ```bash title="Create a new directory" - mkdir dagster-etl-tutorial - cd dagster-etl-tutorial - ``` - -2. Create a virtual environment and activate it: - - ```bash title="Create a virtual environment" - python -m venv venv - source venv/bin/activate - # On Windows, use `venv\Scripts\activate` - ``` - -3. Install Dagster and the required dependencies: - - ```bash title="Install Dagster and dependencies" - pip install dagster dagster-webserver pandas - ``` - -## What you've learned - -Congratulations! You've just built and run your first ETL pipeline with Dagster. You've learned how to: - -- Set up a Dagster project -- Define Software-Defined Assets for each step of your ETL process -- Use Dagster's UI to run and monitor your pipeline - -## Next steps - -To expand on this tutorial, you could: - -- Add more complex transformations -- Implement error handling and retries -- Create a schedule to run your pipeline periodically diff --git a/docs/docs-beta/sidebars.ts b/docs/docs-beta/sidebars.ts index 3fc6022c5a6c4..5ca67df1079b1 100644 --- a/docs/docs-beta/sidebars.ts +++ b/docs/docs-beta/sidebars.ts @@ -14,9 +14,15 @@ const sidebars: SidebarsConfig = { }, { type: 'category', - label: 'Tutorial', + label: 'ETL pipeline tutorial', collapsed: false, - items: ['tutorial/tutorial-etl', 'tutorial/multi-asset-integration'], + link: {type: 'doc', id: 'etl-pipeline-tutorial/index'}, + items: [ + { + type: 'autogenerated', + dirName: 'etl-pipeline-tutorial' + } + ], }, { type: 'category', diff --git a/docs/docs-beta/src/theme/NotFound/Content/index.tsx b/docs/docs-beta/src/theme/NotFound/Content/index.tsx index b83cdc0b20424..3ab9adab3dba1 100644 --- a/docs/docs-beta/src/theme/NotFound/Content/index.tsx +++ b/docs/docs-beta/src/theme/NotFound/Content/index.tsx @@ -25,7 +25,7 @@ export default function NotFoundContent({className}: Props): JSX.Element {
Welcome to Dagster Build your first Dagster project - Build your first ETL pipeline + Build your first ETL pipeline
diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png new file mode 100644 index 0000000000000..900695edb3b50 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/asset-check.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png new file mode 100644 index 0000000000000..375965a274b60 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/automation-final.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png new file mode 100644 index 0000000000000..35abdc8e450de Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/etl-tutorial-first-asset-lineage.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png new file mode 100644 index 0000000000000..c6abbfc605ed7 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/first-asset-run.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png new file mode 100644 index 0000000000000..761a69994038d Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-asset-run.png differ diff --git a/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png new file mode 100644 index 0000000000000..f2617f7251270 Binary files /dev/null and b/docs/docs-beta/static/images/tutorial/etl-tutorial/sensor-evaluation.png differ diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py index a2c5b7bb397fe..0c0202cae210d 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py @@ -6,25 +6,6 @@ import dagster as dg -def query_to_markdown(conn, query, limit=10): - """Performs SQL query and converts result to markdown. - - Args: - conn(DuckDBPyConnection) connection to duckdb - query (str): query to run against duckdb connection - limit (int): maximum number of rows to render to markdown - - Returns: - Markdown representation of query result - - """ - result = conn.execute(query).fetchdf() - if result.empty: - return "No results found." - - return result.head(limit).to_markdown(index=False) - - @dg.asset( compute_kind="duckdb", group_name="ingestion", @@ -40,14 +21,14 @@ def products(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from products limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from products").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -67,14 +48,14 @@ def sales_reps(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from sales_reps limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from sales_reps").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -93,14 +74,14 @@ def sales_data(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "SELECT * FROM sales_data LIMIT 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute("select count(*) from sales_data").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -137,14 +118,15 @@ def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult: ) preview_query = "select * from joined_data limit 10" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute("select count(*) from joined_data").fetchone() count = row_count[0] if row_count else 0 return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -167,7 +149,7 @@ def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult: ) -# datetime partitions & automaterializations +# datetime partitions monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2024-01-01") @@ -208,7 +190,7 @@ def monthly_sales_performance( ) preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute( f""" select count(*) @@ -221,7 +203,7 @@ def monthly_sales_performance( return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) @@ -266,7 +248,7 @@ def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResourc """ ) preview_query = f"select * from product_performance where product_category = '{product_category_str}';" - preview_md = query_to_markdown(conn, preview_query) + preview_df = conn.execute(preview_query).fetchdf() row_count = conn.execute( f""" SELECT COUNT(*) @@ -279,20 +261,14 @@ def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResourc return dg.MaterializeResult( metadata={ "row_count": dg.MetadataValue.int(count), - "preview": dg.MetadataValue.md(preview_md), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), } ) -analysis_assets = dg.AssetSelection.keys("joined_data").upstream() - -analysis_update_job = dg.define_asset_job( - name="analysis_update_job", - selection=analysis_assets, -) - weekly_update_schedule = dg.ScheduleDefinition( - job=analysis_update_job, + name="analysis_update_job", + target=dg.AssetSelection.keys("joined_data").upstream(), cron_schedule="0 0 * * 1", # every Monday at midnight ) @@ -329,9 +305,11 @@ def adhoc_request( """ with duckdb.get_connection() as conn: - preview_md = query_to_markdown(conn, query) + preview_df = conn.execute(query).fetchdf() - return dg.MaterializeResult(metadata={"preview": dg.MetadataValue.md(preview_md)}) + return dg.MaterializeResult( + metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))} + ) adhoc_request_job = dg.define_asset_job( @@ -385,10 +363,11 @@ def adhoc_request_sensor(context: dg.SensorEvaluationContext): joined_data, monthly_sales_performance, product_performance, + adhoc_request, ], asset_checks=[missing_dimension_check], schedules=[weekly_update_schedule], - jobs=[analysis_update_job, adhoc_request_job], + jobs=[adhoc_request_job], sensors=[adhoc_request_sensor], resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, ) diff --git a/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py b/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py index a05269e82e0fc..f3b4218dfbd05 100644 --- a/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py +++ b/examples/getting_started_etl_tutorial/etl_tutorial/definitions.py @@ -1,6 +1,5 @@ import dagster as dg -from dagster_duckdb import DuckDBResource defs = dg.Definitions( - resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, + resources={}, )