Skip to content

Commit

Permalink
add backoff for duckdb connections
Browse files Browse the repository at this point in the history
  • Loading branch information
dehume committed Dec 11, 2024
1 parent 12b54a1 commit bb82196
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/dagster-university/next-env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/basic-features/typescript for more information.
// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def taxi_zones() -> None:
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(sql_query)
```
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ from datetime import datetime, timedelta
from . import constants

import pandas as pd
from dagster._utils.backoff import backoff

@asset(
deps=["taxi_trips"]
)
def trips_by_week() -> None:
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)

current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
```python
import duckdb
import os
from dagster._utils.backoff import backoff
```

2. Copy and paste the code below into the bottom of the `trips.py` file. Note how this code looks similar to the asset definition code for the `taxi_trips_file` and the `taxi_zones` assets:
Expand Down Expand Up @@ -42,7 +43,14 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(sql_query)
```

Expand All @@ -54,7 +62,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage

3. Next, a variable named `sql_query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.

4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, it uses the `.connect` method from the `duckdb` library, passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.
4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, we first wrap everything with the Dagster utility function `backoff`. Using the backoff function ensures that multiple assets can use the DuckDB safely without locking resources. The backoff function takes in function we want to call, in this case the `.connect` method from the `duckdb` library, any errors to retry on (`RuntimeError` and `duckdb.IOException`), the max number of retires and finally the args to supply to the `.connect` DuckDB method. In this case we are passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.

The `DUCKDB_DATABASE` environment variable, sourced from your project’s `.env` file, resolves to `data/staging/data.duckdb`. **Note**: We set up this file in Lesson 2 - refer to this lesson if you need a refresher. If this file isn’t set up correctly, the materialization will result in an error.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ Throughout this module, you’ve used DuckDB to store and transform your data. E
)
def taxi_trips() -> None:
...
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
...
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ def taxi_trips() -> None:
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(sql_query)
```

Expand Down Expand Up @@ -100,7 +107,14 @@ To refactor `taxi_trips` to use the `database` resource, we had to:
3. Replace the lines that connect to DuckDB and execute a query:

```python
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
```

Expand All @@ -111,6 +125,8 @@ To refactor `taxi_trips` to use the `database` resource, we had to:
conn.execute(query)
```

Notice that we no longer need to use the `backoff` function. The Dagster `DuckDBResource` handles this functionality for us.

---

## Before you continue
Expand Down

0 comments on commit bb82196

Please sign in to comment.