Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[du] consistency issues #26416

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/dagster-university/next-env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/basic-features/typescript for more information.
// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ lesson: '2'
To install Dagster, you’ll need:

- **To install Python**. Dagster supports Python 3.9 through 3.12.
- **A package manager like pip or poetry**. If you need to install a package manager, refer to the following installation guides:
- **A package manager like pip, Poetry, or uv**. If you need to install a package manager, refer to the following installation guides:
- [pip](https://pip.pypa.io/en/stable/installation/)
- [Poetry](https://python-poetry.org/docs/)
- [uv](https://docs.astral.sh/uv/getting-started/installation/)

To check that Python and the pip or Poetry package manager are already installed in your environment, run:
To check that Python and the package manager are already installed in your environment, run:

```shell
python --version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ An asset is an object in persistent storage that captures some understanding of

- **A database table or view**, such as those in a Google BigQuery data warehouse
- **A file**, such as a file in your local machine or blob storage like Amazon S3
- **A machine learning model**
- **An asset from an integration,** like a dbt model or a Fivetran connector
- **A machine learning model**, such as TensorFlow or PyTorch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding in example since it was the only bullet point without one

- **An asset from an integration,** such as a dbt model or a Fivetran connector

Assets aren’t limited to just the objects listed above - these are just some common examples.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The asset you built should look similar to the following code. Click **View answ
deps=["taxi_zones_file"]
)
def taxi_zones() -> None:
sql_query = f"""
query = f"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standardizing around query as the variable name since that is what the dagster university repo uses in functions

create or replace table zones as (
select
LocationID as zone_id,
Expand All @@ -42,5 +42,5 @@ def taxi_zones() -> None:
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
"""
The raw taxi trips dataset, loaded into a DuckDB database
"""
sql_query = """
query = """
create or replace table trips as (
select
VendorID as vendor_id,
Expand All @@ -43,7 +43,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```

Let’s walk through what this code does:
Expand All @@ -52,13 +52,13 @@ Now that you have a query that produces an asset, let’s use Dagster to manage

2. The `taxi_trips_file` asset is defined as a dependency of `taxi_trips` through the `deps` argument.

3. Next, a variable named `sql_query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.
3. Next, a variable named `query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.

4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, it uses the `.connect` method from the `duckdb` library, passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.

The `DUCKDB_DATABASE` environment variable, sourced from your project’s `.env` file, resolves to `data/staging/data.duckdb`. **Note**: We set up this file in Lesson 2 - refer to this lesson if you need a refresher. If this file isn’t set up correctly, the materialization will result in an error.

5. Finally, `conn` is paired with the DuckDB `execute` method, where our SQL query (`sql_query`) is passed in as an argument. This tells the asset that, when materializing, to connect to the DuckDB database and execute the query in `sql_query`.
5. Finally, `conn` is paired with the DuckDB `execute` method, where our SQL query (`query`) is passed in as an argument. This tells the asset that, when materializing, to connect to the DuckDB database and execute the query in `query`.

3. Save the changes to the file.

Expand Down Expand Up @@ -98,9 +98,9 @@ This is because you’ve told Dagster that taxi_trips depends on the taxi_trips_
To confirm that the `taxi_trips` asset materialized properly, you can access the newly made `trips` table in DuckDB. In a new terminal session, open a Python REPL and run the following snippet:

```python
> import duckdb
> conn = duckdb.connect(database="data/staging/data.duckdb") # assumes you're writing to the same destination as specified in .env.example
> conn.execute("select count(*) from trips").fetchall()
import duckdb
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing > for consistency and so it is easier to copy and run in Python REPL

conn = duckdb.connect(database="data/staging/data.duckdb") # assumes you're writing to the same destination as specified in .env.example
conn.execute("select count(*) from trips").fetchall()
```

The command should succeed and return a row count of the taxi trips that were ingested. When finished, make sure to stop the terminal process before continuing or you may encounter an error. Use `Control+C` or `Command+C` to stop the process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from dagster import asset
deps=["taxi_trips_file"],
)
def taxi_trips() -> None:
sql_query = """
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
Expand All @@ -49,7 +49,7 @@ def taxi_trips() -> None:
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```

---
Expand All @@ -72,7 +72,7 @@ from dagster import asset
deps=["taxi_trips_file"],
)
def taxi_trips(database: DuckDBResource) -> None:
sql_query = """
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
Expand All @@ -90,7 +90,7 @@ def taxi_trips(database: DuckDBResource) -> None:
"""

with database.get_connection() as conn:
conn.execute(sql_query)
conn.execute(query)
```

To refactor `taxi_trips` to use the `database` resource, we had to:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Despite many schedulers and orchestrators replacing the cron program since then,

Consider the following example:

```python
```
15 5 * * 1-5
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ To add the partition to the asset:
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First example snippet uses type annotation. Including for consistency

partition_date_str = context.partition_key
```

Expand All @@ -73,7 +73,7 @@ To add the partition to the asset:
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
```
Expand All @@ -86,7 +86,7 @@ from ..partitions import monthly_partition
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ To practice what you’ve learned, partition the `taxi_trips` asset by month usi
{% callout %}
You’ll need to drop the existing `taxi_trips` because of the new `partition_date` column. In a Python REPL or scratch script, run the following:

```yaml
```
import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb")
conn.execute("drop table trips;")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ To add partition to the job, make the following changes:
The job should now look like this:

```python
from dagster import define_asset_job, AssetSelection, AssetKey
from dagster import define_asset_job, AssetSelection
from ..partitions import monthly_partition

trips_by_week = AssetSelection.assets("trips_by_week")

trip_update_job = define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition, # partitions added here
selection=AssetSelection.all() - AssetSelection.assets(["trips_by_week"])
selection=AssetSelection.all() - trips_by_week
)
```
Loading