Skip to content

Commit

Permalink
[components] DbtProjectComponent (#26281)
Browse files Browse the repository at this point in the history
## Summary & Motivation

As title -- this adds a component that loads a dbt project into dbt assets

## How I Tested These Changes

```
# this creates a components/my_dbt_project directory, empty except for a defs.yml file
dg component generate dbt_project my_dbt_project

# get a sample project into this directory
cp -R path/to/jaffle_shop components/my_dbt_project
```

then, you edit `defs.yml` to be:

```yaml
component_type: ...

component_params:
  dbt:
    project_dir: my_dbt_project 
```

This is at least a bit complicated at the moment, so the next step here is to let all of that business get sorted in the generate step, by adding parameterization to it

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 6, 2024
1 parent e341385 commit 29e4731
Show file tree
Hide file tree
Showing 23 changed files with 848 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster_components.core.component_defs_builder import (
build_defs_from_toplevel_components_folder as build_defs_from_toplevel_components_folder,
)
from dagster_components.impls.dbt_project import DbtProjectComponent
from dagster_components.impls.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
)
Expand All @@ -14,4 +15,5 @@
__component_registry__ = {
"pipes_subprocess_script_collection": PipesSubprocessScriptCollection,
"sling_replication": SlingReplicationComponent,
"dbt_project": DbtProjectComponent,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils import pushd
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
from dagster_embedded_elt.sling.resources import AssetExecutionContext
from pydantic import BaseModel, TypeAdapter
from typing_extensions import Self

from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component_decl_builder import ComponentDeclNode, YamlComponentDecl


class DbtProjectParams(BaseModel):
dbt: DbtCliResource


class DbtProjectComponent(Component):
params_schema = DbtProjectParams

def __init__(self, dbt_resource: DbtCliResource):
self.dbt_resource = dbt_resource

@classmethod
def registered_name(cls) -> str:
return "dbt_project"

@classmethod
def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclNode) -> Self:
assert isinstance(decl_node, YamlComponentDecl)

# all paths should be resolved relative to the directory we're in
with pushd(str(decl_node.path)):
loaded_params = TypeAdapter(cls.params_schema).validate_python(
decl_node.defs_file_model.component_params
)
return cls(dbt_resource=loaded_params.dbt)

def build_defs(self, context: ComponentLoadContext) -> Definitions:
project = DbtProject(self.dbt_resource.project_dir)
project.prepare_if_dev()

@dbt_assets(manifest=project.manifest_path, project=project)
def _fn(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

return Definitions(assets=[_fn], resources={"dbt": self.dbt_resource})
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
component_type: dbt_project

component_params:
dbt:
project_dir: jaffle_shop
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

target/
dbt_packages/
dbt_modules/
logs/
**/.DS_Store
.user.yml
venv/
env/
**/*.duckdb
**/*.duckdb.wal
tmp/
state/
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: "jaffle_shop"

config-version: 2
version: "0.1"

profile: "jaffle_shop"

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
jaffle_shop:
materialized: table
staging:
materialized: view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
with customers as (

select * from {{ ref('stg_customers') }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final

{% if var('break_customer_build', 'false') == 'true' %}

does not work

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{% docs orders_status %}

Orders can be one of the following statuses:

| status | description |
| -------------- | ---------------------------------------------------------------------------------------------------------------------- |
| placed | The order has been placed but has not yet left the warehouse |
| shipped | The order has ben shipped to the customer and is currently in transit |
| completed | The order has been received by the customer |
| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse |
| returned | The order has been returned by the customer and received at the warehouse |

{% enddocs %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}

with orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

order_payments as (

select
order_id,

{% for payment_method in payment_methods -%}
sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount,
{% endfor -%}

sum(amount) as total_amount

from payments

group by order_id

),

final as (

select
orders.order_id,
orders.customer_id,
orders.order_date,
orders.status,

{% for payment_method in payment_methods -%}

order_payments.{{ payment_method }}_amount,

{% endfor -%}

order_payments.total_amount as amount

from orders


left join order_payments
on orders.order_id = order_payments.order_id

)

select * from final
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{% docs __overview__ %}

## Data Documentation for Jaffle Shop

`jaffle_shop` is a fictional ecommerce store.

This [dbt](https://www.getdbt.com/) project is for testing out code.

The source code can be found [here](https://github.com/clrcrl/jaffle_shop).

{% enddocs %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
version: 2

models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders

columns:
- name: customer_id
description: This is a unique identifier for a customer
tests:
- unique
- not_null

- name: first_name
description: Customer's first name. PII.

- name: last_name
description: Customer's last name. PII.

- name: first_order
description: Date (UTC) of a customer's first order

- name: most_recent_order
description: Date (UTC) of a customer's most recent order

- name: number_of_orders
description: Count of the number of orders a customer has placed

- name: total_order_amount
description: Total value (AUD) of a customer's orders

- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments

columns:
- name: order_id
tests:
- unique
- not_null
description: This is a unique identifier for an order

- name: customer_id
description: Foreign key to the customers table
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id

- name: order_date
description: Date (UTC) that the order was placed

- name: status
description: '{{ doc("orders_status") }}'
tests:
- accepted_values:
values:
["placed", "shipped", "completed", "return_pending", "returned"]

- name: amount
description: Total amount (AUD) of the order
tests:
- not_null

- name: credit_card_amount
description: Amount of the order (AUD) paid for by credit card
tests:
- not_null

- name: coupon_amount
description: Amount of the order (AUD) paid for by coupon
tests:
- not_null

- name: bank_transfer_amount
description: Amount of the order (AUD) paid for by bank transfer
tests:
- not_null

- name: gift_card_amount
description: Amount of the order (AUD) paid for by gift card
tests:
- not_null
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: 2

models:
- name: stg_customers
columns:
- name: customer_id
tests:
- unique
- not_null

- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values:
["placed", "shipped", "completed", "return_pending", "returned"]

- name: stg_payments
columns:
- name: payment_id
tests:
- unique
- not_null
- name: payment_method
tests:
- accepted_values:
values: ["credit_card", "coupon", "bank_transfer", "gift_card"]
Loading

0 comments on commit 29e4731

Please sign in to comment.