Skip to content

Commit

Permalink
updated to use backfill polices and added backin the dbt table schema…
Browse files Browse the repository at this point in the history
… metadata
  • Loading branch information
slopp committed Oct 9, 2023
1 parent 1ba1a4c commit 5a34404
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
6 changes: 4 additions & 2 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Mapping
from dagster._utils import file_relative_path
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import load_assets_from_dbt_project
from dagster_dbt import load_assets_from_dbt_project, default_metadata_from_dbt_resource_props
from dagster_dbt.asset_decorator import dbt_assets
from dagster import AssetKey, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output, MetadataValue
from dateutil import parser
Expand Down Expand Up @@ -63,7 +63,9 @@ def get_metadata(
if dbt_resource_props['name'] == 'users_cleaned':
metadata = {"partition_expr": "created_at"}

return metadata
default_metadata = default_metadata_from_dbt_resource_props(dbt_resource_props)

return {**default_metadata, **metadata}

def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
# map partition key range to dbt vars
Expand Down
4 changes: 3 additions & 1 deletion hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AssetCheckSeverity,
AssetCheckResult,
AssetKey,
BackfillPolicy,
Backoff,
DailyPartitionsDefinition,
Jitter,
Expand Down Expand Up @@ -34,6 +35,7 @@ def _daily_partition_seq(start, end):
compute_kind="api",
partitions_def=daily_partitions,
metadata={"partition_expr": "created_at"},
backfill_policy=BackfillPolicy.single_run()
)
def users(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all users data"""
Expand All @@ -53,7 +55,6 @@ def users(context, api: RawDataAPI) -> pd.DataFrame:
@asset_check(
asset=AssetKey(["RAW_DATA", "users"]),
description="check that users are from expected companies",
#severity=AssetCheckSeverity.WARN,
)
def check_users(context, users: pd.DataFrame):
unique_companies = pd.unique(users['company']).tolist()
Expand All @@ -73,6 +74,7 @@ def check_users(context, users: pd.DataFrame):
backoff=Backoff.LINEAR,
jitter=Jitter.FULL
),
backfill_policy=BackfillPolicy.single_run()
)
def orders(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
Expand Down

0 comments on commit 5a34404

Please sign in to comment.