This project demonstrates dagster's data-aware orchestration capability.
- dbt cross-project lineage: dagster's ability to create a global dependency graph between different dbt projects. This is currently not available in dbt.
- Object-level dependencies between different assets: dagster's ability to create object-level dependencies between different assets like an airbyte table materialization, and a dbt model materialization.
- Freshness policy triggers: Most data orchestration tools use cron schedules to trigger an entire DAG. Dagster reverses this approach and allows developers to define freshness policies on nodes so that upstream nodes can be triggered to deliver data to the target node on time.
This project has the following data assets to orchestrate:
This project forks code from a demo prepared by airbytehq's open-data-stack repo, and adds additional code to demonstrate newer concepts.
A Pipfile has been provided for use with pipenv to define the python version to use for this virtual environment.
Assuming you already have pipenv installed, to launch the virtual environment for this project, run the following commands:
cd my-dbt-dagster
pipenv shell
If you wish to instead use your local python installation, just make sure that it is at least python 3.8 and above.
To install the python dependencies, run:
cd stargazer
pip install -e ".[dev]"
We'll use a local postgres instance as the destination for our data. You can imagine the "destination" as a data warehouse (something like Snowflake).
To get a postgres instance with the required source and destination databases running on your machine, you can run:
docker pull postgres
docker run --name local-postgres -p 5433:5432 -e POSTGRES_PASSWORD=postgres -d postgres
Note: I am mapping local port 5433 to the container's port 5432 as my local port 5432 is already in use.
Now, you'll want to get Airbyte running locally. The full instructions can be found here.
The steps are pretty simple. Run the following in a new terminal:
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
This should take a couple of minutes to pull the images and run them.
Now that airbyte is running locally, let's create the source, destination, and connection for a data integration pipeline on airbyte.
First we set the environment variables we need:
export AIRBYTE_PASSWORD=password
export AIRBYTE_PERSONAL_GITHUB_TOKEN=<your-token-goes-here>
Note:
- The default password for airbyte is
password
. - We'll need to create a token
AIRBYTE_PERSONAL_GITHUB_TOKEN
for fetching the stargazers from the public repositories.
After setting the environment variables, we can check if we have everything we need to let dagster create the airbyte source, destination, and connection by running:
cd stargazer
dagster-me check --module assets_modern_data_stack.my_asset:airbyte_reconciler
This will print out the assets that dagster will create in airbyte. For example:
+ fetch_stargazer:
+ source: gh_awesome_de_list
+ normalize data: True
+ destination: postgres
+ destination namespace: SAME_AS_SOURCE
+ streams:
+ stargazers:
+ destinationSyncMode: append_dedup
+ syncMode: incremental
If you are happy with those assets being created in airbyte, then run the following to apply it:
dagster-me apply --module assets_modern_data_stack.my_asset:airbyte_reconciler
We have 2 dbt projects in the stargazer folder:
Install the dbt dependencies required by both projects by running:
cd stargazer/dbt_project_1
dbt deps
cd stargazer/dbt_project_2
dbt deps
We're now ready to get dagster started. Dagster has two services that we need to run:
- dagit: The web-based interface for viewing and interacting with Dagster objects.
- dagster daemon: The service that manages schdeules, sensors, and run queuing.
For both services to communicate and have shared resources with one another, we need to create a shared directory:
mkdir ~"/dagster_home"
We named our shared directory as dagster_home
for simplicity.
To run the dagster daemon service, create a new terminal and run:
export DAGSTER_HOME=~"/dagster_home"
export AIRBYTE_PASSWORD=password
export POSTGRES_PASSWORD=postgres
dagster-daemon run -m assets_modern_data_stack.my_asset
To run the dagit service, create a new terminal and run:
export DAGSTER_HOME=~"/dagster_home"
export AIRBYTE_PASSWORD=password
export POSTGRES_PASSWORD=postgres
dagit -m assets_modern_data_stack.my_asset
Launch the dagit UI by going to http://localhost:3000/.
You'll see the assets of airbyte, dbt that are created automatically in this demo.
Activate the schedule:
Activate the sensor:
Now you can sit back and watch the global asset lineage trigger based on the schedule and/or sensor trigger.
You'll notice the following behaviours:
- The airbyte assets will materialize every 30 minutes based on a schedule.
- The two dbt projects dbt_project_1 and dbt_project_2, are now seen as part of the same global asset lineage in dagster without any separation between dbt projects.
mart_gh_cumulative
will materialize every 5 minutes because it's dbt model mart_gh_cumulative.sql has a freshness policy ofdagster_freshness_policy={"maximum_lag_minutes": 5}
. This in turn will also trigger the airbyte assets to be materialized first.
mart_gh_join
andmart_gh_stargazer
: by 09:00 AM UTC, these assets should incorporate all data up to 9 hours before that time. This is because a dbt project-level configuration has been set for project 1 and project 2 with a freshness policy ofmaximum_lag_minutes: 540
andcron_schedule: "0 9 * * *"
.