Skip to content

Commit

Permalink
[dagster-airlift] fix demo code (#23853)
Browse files Browse the repository at this point in the history
Disable asset checks and fix path to iris file.
  • Loading branch information
dpeng817 authored Aug 23, 2024
1 parent d415364 commit 39b74cc
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
build_dbt_asset_specs,
dbt_assets,
)
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslatorSettings
from dagster_dbt.dbt_manifest import DbtManifestParam, validate_manifest

from dagster_airlift.core import DefsFactory
Expand Down Expand Up @@ -41,14 +42,12 @@ def __init__(
self,
dbt_manifest: DbtManifestParam,
name: str,
translator: Optional[DagsterDbtTranslator] = None,
select: str = "fqn:*",
exclude: Optional[str] = None,
project: Optional[DbtProject] = None,
):
self.dbt_manifest = validate_manifest(dbt_manifest)
self.name = name
self.translator = translator
self.select = select
self.exclude = exclude
self.project = project
Expand All @@ -60,7 +59,6 @@ def build_defs(self) -> Definitions:
name=self.name,
specs=build_dbt_asset_specs(
manifest=self.dbt_manifest,
dagster_dbt_translator=self.translator,
select=self.select,
exclude=self.exclude,
project=self.project,
Expand All @@ -78,7 +76,9 @@ def _multi_asset():
manifest=self.dbt_manifest,
name=self.name,
project=self.project,
dagster_dbt_translator=self.translator,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_asset_checks=False)
),
select=self.select,
exclude=self.exclude,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 7, 18),
"retries": 1,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
DBT_DIR = os.getenv("DBT_PROJECT_DIR")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def execute(self, context) -> None:
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
"retries": 0,
}

dag = DAG("load_lakehouse", default_args=default_args, schedule_interval=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dagster_dbt import DbtProject

from dbt_example.dagster_defs.lakehouse import CSVToDuckdbDefs
from dbt_example.shared.load_iris import iris_path

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME, dbt_project_path

Expand All @@ -23,7 +24,7 @@
orchestrated_defs=defs_from_factories(
CSVToDuckdbDefs(
name="load_lakehouse__load_iris",
csv_path=Path("iris.csv"),
csv_path=iris_path(),
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
columns=[
"sepal_length_cm",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ def id_from_path(csv_path: Path) -> str:
return csv_path.stem


def iris_path() -> Path:
return Path(__file__).parent.parent / "shared" / "iris.csv"


def load_csv_to_duckdb(
*,
csv_path: Path,
Expand Down

0 comments on commit 39b74cc

Please sign in to comment.