-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,80 @@ | ||
--- | ||
title: "Creating asset factories" | ||
title: 'Creating domain-specific languages with asset factories' | ||
sidebar_position: 60 | ||
sidebar_label: "Creating asset factories" | ||
sidebar_label: 'Creating domain-specific languages' | ||
--- | ||
|
||
Often times in data engineering, you'll find yourself needing to create a large number of similar assets. For example, you might have a set of tables in a database that all have the same schema, or a set of files in a directory that all have the same format. In these cases, it can be helpful to create a factory that generates these assets for you. | ||
|
||
Additionally, you might be serving stakeholders who are not familiar with Python or Dagster, and would prefer to interact with your assets using a domain-specific language (DSL) built on top of a configuration language such as YAML. | ||
Check warning on line 9 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
Check warning on line 9 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
You can solve both of these problems using the asset factory pattern. In this guide, we'll show you how to build a simple asset factory in Python, and then how to build a DSL on top of it. | ||
Check failure on line 11 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
Check warning on line 11 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
## What you'll learn | ||
|
||
- Building a simple asset factory in Python | ||
Check failure on line 15 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
- Driving your asset factory with YAML | ||
- Improving usability with Pydantic and Jinja | ||
Check failure on line 17 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
Check failure on line 17 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
--- | ||
|
||
<details> | ||
<summary>Prerequisites</summary> | ||
|
||
To follow the steps in this guide, you'll need: | ||
|
||
- A basic understanding of Dagster and assets. See the [Quick Start](/tutorial/quick-start) tutorial for an overview. | ||
- High-level familiarity with Dagster's [Resources system](/concepts/resources) | ||
- Familiarity with SQL, YAML and AWS S3. | ||
Check warning on line 28 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
- Basic familiarity with [Pydantic](https://docs.pydantic.dev/latest/) and [Jinja2](https://jinja.palletsprojects.com/en/3.1.x/). | ||
Check failure on line 29 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
Check failure on line 29 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
</details> | ||
|
||
--- | ||
|
||
## Building a simple asset factory in Python | ||
Check failure on line 34 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
Let's imagine a team that has to perform the same repetitive ETL task often: they download a CSV file from S3, run a basic SQL query on it, and then upload the result as a new file back to S3. | ||
|
||
To start, let's install the required dependencies: | ||
|
||
```shell | ||
pip install dagster dagster-aws duckdb | ||
``` | ||
|
||
Next, here's how you might define a simple asset factory in Python to automate this ETL process: | ||
Check failure on line 44 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
<CodeExample filePath="guides/data-modeling/asset-factories/python-asset-factory.py" language="python" title="Basic Python asset factory" /> | ||
|
||
As you can see, the asset factory pattern is basically just a function that takes in some configuration and returns `dg.Definitions`. | ||
|
||
## Driving your asset factory with YAML | ||
|
||
Now, let's say that the team wants to be able to configure the asset factory using a YAML file instead of Python. Here's an example of how we might want the YAML file to look: | ||
|
||
<CodeExample filePath="guides/data-modeling/asset-factories/etl_jobs.yaml" language="yaml" title="Example YAML config" /> | ||
|
||
Implementing this is straightforward if we build on the previous example. First, let's install PyYAML: | ||
Check failure on line 56 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
```shell | ||
pip install pyyaml | ||
``` | ||
|
||
Next, we parse the YAML file and use it to create the S3 resource and the ETL jobs: | ||
|
||
<CodeExample filePath="guides/data-modeling/asset-factories/simple-yaml-asset-factory.py" language="python" title="Basic YAML asset factory" /> | ||
|
||
## Improving usability with Pydantic and Jinja | ||
|
||
There are two problems with the simple approach described above: | ||
Check warning on line 68 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
|
||
1. The YAML file is not type-checked, so it's easy to make mistakes that will cause cryptic `KeyError`s. | ||
Check warning on line 70 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
2. The YAML file contains secrets right in the file. Instead, it should reference environment variables. | ||
|
||
To solve these problems, we can use Pydantic to define a schema for the YAML file, and Jinja to template the YAML file with environment variables. | ||
|
||
Here's what the new YAML file might look like. Note how we are using Jinja templating to reference environment variables: | ||
Check warning on line 75 in docs/docs-beta/docs/guides/data-modeling/asset-factories.md GitHub Actions / runner / vale
|
||
<CodeExample filePath="guides/data-modeling/asset-factories/etl_jobs_with_jinja.yaml" language="yaml" title="Example YAML config with Jinja" /> | ||
|
||
And here is the Python implementation: | ||
|
||
<CodeExample filePath="guides/data-modeling/asset-factories/advanced-yaml-asset-factory.py" language="python" title="Advanced YAML asset factory" /> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import dagster as dg | ||
import dagster_aws.s3 as s3 | ||
import yaml | ||
import pydantic | ||
import jinja2 | ||
import os | ||
from typing import List | ||
|
||
|
||
def build_etl_job( | ||
s3_resource: s3.S3Resource, | ||
bucket: str, | ||
source_object: str, | ||
target_object: str, | ||
sql: str, | ||
) -> dg.Definitions: ... | ||
|
||
|
||
# highlight-start | ||
class AwsConfig(pydantic.BaseModel): | ||
access_key_id: str | ||
secret_access_key: str | ||
|
||
def to_resource(self) -> s3.S3Resource: | ||
return s3.S3Resource( | ||
aws_access_key_id=self.access_key_id, | ||
aws_secret_access_key=self.secret_access_key, | ||
) | ||
|
||
|
||
class JobConfig(pydantic.BaseModel): | ||
bucket: str | ||
source: str | ||
target: str | ||
sql: str | ||
|
||
def to_etl_job(self, s3_resource: s3.S3Resource) -> dg.Definitions: | ||
return build_etl_job( | ||
s3_resource=s3_resource, | ||
bucket=self.bucket, | ||
source_object=self.source, | ||
target_object=self.target, | ||
sql=self.sql, | ||
) | ||
|
||
|
||
class EtlJobsConfig(pydantic.BaseModel): | ||
aws: AwsConfig | ||
etl_jobs: List[JobConfig] | ||
|
||
def to_definitions(self) -> dg.Definitions: | ||
s3_resource = self.aws.to_resource() | ||
return dg.Definitions.merge( | ||
*[job.to_etl_job(s3_resource) for job in self.etl_jobs] | ||
) | ||
|
||
|
||
def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions: | ||
yaml_template = jinja2.Environment().from_string(open(yaml_path).read()) | ||
config = yaml.safe_load(yaml_template.render(env=os.environ)) | ||
return EtlJobsConfig.model_validate(config).to_definitions() | ||
|
||
|
||
# highlight-end | ||
|
||
|
||
defs = load_etl_jobs_from_yaml("etl_jobs_with_jinja.yaml") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
aws: | ||
access_key_id: "YOUR_ACCESS_KEY_ID" | ||
secret_access_key: "YOUR_SECRET_ACCESS_KEY" | ||
|
||
etl_jobs: | ||
- bucket: my_bucket | ||
source: raw_transactions.csv | ||
target: cleaned_transactions.csv | ||
sql: SELECT * FROM source WHERE amount IS NOT NULL | ||
|
||
- bucket: my_bucket | ||
source: all_customers.csv | ||
target: risky_customers.csv | ||
sql: SELECT * FROM source WHERE risk_score > 0.8 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
aws: | ||
# highlight-start | ||
access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}" | ||
secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}" | ||
# highlight-end | ||
|
||
etl_jobs: | ||
- bucket: my_bucket | ||
source: raw_transactions.csv | ||
target: cleaned_transactions.csv | ||
sql: SELECT * FROM source WHERE amount IS NOT NULL | ||
|
||
- bucket: my_bucket | ||
source: all_customers.csv | ||
target: risky_customers.csv | ||
sql: SELECT * FROM source WHERE risk_score > 0.8 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import dagster as dg | ||
import dagster_aws.s3 as s3 | ||
import duckdb | ||
import tempfile | ||
|
||
|
||
def build_etl_job( | ||
s3_resource: s3.S3Resource, | ||
bucket: str, | ||
source_object: str, | ||
target_object: str, | ||
sql: str, | ||
) -> dg.Definitions: | ||
@dg.asset(name=f"etl_{bucket}_{target_object}") | ||
def etl_asset(context): | ||
with tempfile.TemporaryDirectory() as root: | ||
source_path = f"{root}/{source_object}" | ||
target_path = f"{root}/{target_object}" | ||
|
||
# these steps could be split into separate assets, but | ||
# for brevity we will keep them together. | ||
# 1. extract | ||
context.s3.download_file(bucket, source_object, source_path) | ||
|
||
# 2. transform | ||
db = duckdb.connect(":memory:") | ||
db.execute( | ||
f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');" | ||
) | ||
db.query(sql).to_csv(target_path) | ||
|
||
# 3. load | ||
context.s3.upload_file(bucket, target_object, target_path) | ||
|
||
return dg.Definitions( | ||
assets=[etl_asset], | ||
resources={"s3": s3_resource}, | ||
) | ||
|
||
|
||
s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...") | ||
|
||
defs = dg.Definitions.merge( | ||
build_etl_job( | ||
s3_resource=s3_resource, | ||
bucket="my_bucket", | ||
source_object="raw_transactions.csv", | ||
target_object="cleaned_transactions.csv", | ||
sql="SELECT * FROM source WHERE amount IS NOT NULL;", | ||
), | ||
build_etl_job( | ||
s3_resource=s3_resource, | ||
bucket="my_bucket", | ||
source_object="all_customers.csv", | ||
target_object="risky_customers.csv", | ||
sql="SELECT * FROM source WHERE risk_score > 0.8;", | ||
), | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import dagster as dg | ||
import dagster_aws.s3 as s3 | ||
import yaml | ||
|
||
|
||
def build_etl_job( | ||
s3_resource: s3.S3Resource, | ||
bucket: str, | ||
source_object: str, | ||
target_object: str, | ||
sql: str, | ||
) -> dg.Definitions: ... | ||
|
||
|
||
# highlight-start | ||
def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions: | ||
config = yaml.safe_load(open(yaml_path)) | ||
s3_resource = s3.S3Resource( | ||
aws_access_key_id=config["aws"]["access_key_id"], | ||
aws_secret_access_key=config["aws"]["secret_access_key"], | ||
) | ||
defs = [] | ||
for job_config in config["etl_jobs"]: | ||
defs.append( | ||
build_etl_job( | ||
s3_resource=s3_resource, | ||
bucket=job_config["bucket"], | ||
source_object=job_config["source"], | ||
target_object=job_config["target"], | ||
sql=job_config["sql"], | ||
) | ||
) | ||
return dg.Definitions.merge(*defs) | ||
|
||
|
||
defs = load_etl_jobs_from_yaml("etl_jobs.yaml") | ||
# highlight-end |