From 407ea5cd63fedbef96abdb3e9a798c938cce3a23 Mon Sep 17 00:00:00 2001 From: Alexander Bogdanowicz <55267935+akbog@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:14:35 -0700 Subject: [PATCH] [dagster-sdf] Enable Caching & Asset Selection (#23750) ## Summary & Motivation This PR enables caching simply by upgrading the version of the `sdf-cli` which now outputs cached dependency in std-out in their dependency order. It also enables asset selection, via sdf's `--targets-only` flag and passing in the fully qualified names of selected assets as targets to the `sdf run` or `sdf test` commands. There are some hoops to jump through with regards to asset checks, but this logic will be cleaned up on the `sdf-cli` in future releases. Bonus: 1. Duplicate Descriptions (this is fixed with the latest release of the `sdf-cli`) 2. The `--log-form` flag is not necessary with the latest release ## How I Tested These Changes Tests for caching have been added. Asset selection is covered by existing tests. --- .../dagster-sdf/dagster_sdf/asset_utils.py | 38 +++++++- .../dagster-sdf/dagster_sdf/constants.py | 7 ++ .../include/scaffold/assets.py.jinja | 2 +- .../include/scaffold/definitions.py.jinja | 3 +- .../dagster-sdf/dagster_sdf/resource.py | 94 +++++++++++++++++- .../dagster-sdf/dagster_sdf/sdf_cli_event.py | 51 ++++++++-- .../dagster_sdf/sdf_cli_invocation.py | 4 +- .../dagster_sdf/sdf_information_schema.py | 97 ++++++++++++++++--- .../dagster-sdf/dagster_sdf/sdf_version.py | 3 +- .../dagster-sdf/dagster_sdf/sdf_workspace.py | 8 +- .../dagster-sdf/dagster_sdf_tests/conftest.py | 4 +- .../dagster_sdf_tests/test_asset_checks.py | 50 ++++++++-- .../dagster_sdf_tests/test_asset_decorator.py | 97 +++++++++++-------- .../dagster_sdf_tests/test_resource.py | 29 ++++-- python_modules/libraries/dagster-sdf/setup.py | 12 ++- 15 files changed, 398 insertions(+), 101 deletions(-) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py index ca7f66a1f9fbf..aa39b7d7b91ce 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py @@ -1,6 +1,6 @@ import textwrap from pathlib import Path -from typing import Any, Dict, Mapping, Optional, Sequence +from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence from dagster import ( AssetCheckKey, @@ -36,7 +36,7 @@ def default_asset_check_key_fn(catalog: str, schema: str, table: str) -> AssetCh else: asset_key = AssetKey([catalog, schema, table]) return AssetCheckKey( - name=f"{catalog}.{schema}.{table}", + name="test", asset_key=asset_key, ) @@ -201,3 +201,37 @@ def default_description_fn( f"#### Materialized SQL:\n```\n{_read_sql_file(path_to_file)}\n```" ) return "\n\n".join(filter(None, description_sections)) + + +def get_test_prefix(table_dialect: str) -> str: + if table_dialect == "snowflake": + return "TEST_" + else: + return "test_" + + +def exists_in_selected( + catalog_name: str, + schema_name: str, + table_name: str, + purpose: str, + dialect: str, + selected_output_names: AbstractSet[str], + asset_checks_enabled: bool, +) -> bool: + asset_output_name = dagster_name_fn(catalog_name, schema_name, table_name) + # If asset checks are enabled, ensure tests are only yielded for selected assets + if asset_checks_enabled: + # Strip test_name_prefix from start of table_id if it exists + if purpose == "test": + test_name_prefix = get_test_prefix(dialect) + asset_output_name = ( + dagster_name_fn(catalog_name, schema_name, table_name[len(test_name_prefix) :]) + if table_name.startswith(test_name_prefix) + else asset_output_name + ) + + if asset_output_name in selected_output_names: + return True + else: + return False diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/constants.py b/python_modules/libraries/dagster-sdf/dagster_sdf/constants.py index 734199ce52df0..c435d4be36641 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/constants.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/constants.py @@ -10,3 +10,10 @@ SDF_INFORMATION_SCHEMA_TABLES_STAGE_PARSE = ["table_deps"] DEFAULT_SDF_WORKSPACE_ENVIRONMENT = "dbg" SDF_WORKSPACE_YML = "workspace.sdf.yml" + +DAGSTER_SDF_TABLE_ID = "dagster_sdf/table_id" +DAGSTER_SDF_CATALOG_NAME = "dagster_sdf/catalog" +DAGSTER_SDF_SCHEMA_NAME = "dagster_sdf/schema" +DAGSTER_SDF_TABLE_NAME = "dagster_sdf/table_name" +DAGSTER_SDF_PURPOSE = "dagster_sdf/purpose" +DAGSTER_SDF_DIALECT = "dagster_sdf/dialect" diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja index 72f957a8b3094..2c48f129e7905 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja @@ -10,4 +10,4 @@ workspace = SdfWorkspace(workspace_dir=sdf_workspace_dir, target_dir=target_dir, @sdf_assets(workspace=workspace) def {{ sdf_assets_name }}(context: AssetExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run", "--save", "info-schema", "--cache", "write-only"], target_dir=target_dir, environment=environment, context=context).stream() + yield from sdf.cli(["run", "--save", "info-schema"], target_dir=target_dir, environment=environment, context=context).stream() diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/definitions.py.jinja b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/definitions.py.jinja index d46229e6f931f..8519b83307db4 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/definitions.py.jinja +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/definitions.py.jinja @@ -10,8 +10,7 @@ defs = Definitions( schedules=schedules, resources={ "sdf": SdfCliResource( - workspace_dir=sdf_workspace_dir, - global_config_flags=["--log-form=nested"] + workspace_dir=sdf_workspace_dir ), }, ) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py b/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py index f893351e37d87..7e8694b0269df 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py @@ -1,15 +1,32 @@ import os +import re import shutil +import subprocess import uuid +from contextlib import suppress from pathlib import Path from typing import Any, List, Optional, Sequence, Union -from dagster import AssetExecutionContext, ConfigurableResource, OpExecutionContext +from dagster import ( + AssetExecutionContext, + AssetsDefinition, + ConfigurableResource, + DagsterInvariantViolationError, + OpExecutionContext, + get_dagster_logger, +) from dagster._annotations import experimental, public +from dagster._core.errors import DagsterInvalidPropertyError from dagster._utils.warnings import suppress_dagster_warnings from pydantic import Field, validator +from .asset_utils import dagster_name_fn, get_test_prefix from .constants import ( + DAGSTER_SDF_CATALOG_NAME, + DAGSTER_SDF_DIALECT, + DAGSTER_SDF_SCHEMA_NAME, + DAGSTER_SDF_TABLE_ID, + DAGSTER_SDF_TABLE_NAME, DEFAULT_SDF_WORKSPACE_ENVIRONMENT, SDF_DAGSTER_OUTPUT_DIR, SDF_EXECUTABLE, @@ -17,8 +34,11 @@ ) from .dagster_sdf_translator import DagsterSdfTranslator, validate_opt_translator from .sdf_cli_invocation import SdfCliInvocation +from .sdf_version import SDF_VERSION_LOWER_BOUND, SDF_VERSION_UPPER_BOUND from .sdf_workspace import SdfWorkspace +logging = get_dagster_logger() + @suppress_dagster_warnings @experimental @@ -131,12 +151,56 @@ def cli( SdfCliInvocation: A invocation instance that can be used to retrieve the output of the sdf CLI command. """ + self._validate_sdf_version() dagster_sdf_translator = validate_opt_translator(dagster_sdf_translator) - dagster_sdf_translator = dagster_sdf_translator or DagsterSdfTranslator() + assets_def: Optional[AssetsDefinition] = None + with suppress(DagsterInvalidPropertyError): + assets_def = context.assets_def if context else None + context = ( context.op_execution_context if isinstance(context, AssetExecutionContext) else context ) + dagster_sdf_translator = dagster_sdf_translator or DagsterSdfTranslator() + run_args = [] + # If the context and assets_def are not None, then we need to pass in the selected targets + # to the sdf CLI invocation. + if context and assets_def is not None: + run_args.append( + "--targets-only" + ) # This flag is used to only run the selected targets (and not upstreams) + selected_output_names = context.selected_output_names + for asset_key, asset_metadata in assets_def.metadata_by_key.items(): + asset_output_name = dagster_name_fn( + asset_key.path[0], asset_key.path[1], asset_key.path[2] + ) + # Check if this output is expected, if so, add the table_id and optionally the expected test name to the run_args + if selected_output_names and asset_output_name in selected_output_names: + table_id: Optional[str] = asset_metadata.get(DAGSTER_SDF_TABLE_ID) + catalog: Optional[str] = asset_metadata.get(DAGSTER_SDF_CATALOG_NAME) + schema: Optional[str] = asset_metadata.get(DAGSTER_SDF_SCHEMA_NAME) + table_name: Optional[str] = asset_metadata.get(DAGSTER_SDF_TABLE_NAME) + table_dialect: Optional[str] = asset_metadata.get(DAGSTER_SDF_DIALECT) + # If any of the metadata is missing, raise an error + if ( + table_id is None + or catalog is None + or schema is None + or table_name is None + or table_dialect is None + ): + raise DagsterInvariantViolationError( + f"Expected to find sdf table metadata on asset {asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @sdf_assets?" + " Please ensure that the asset metadata contains the following keys: " + f"{DAGSTER_SDF_TABLE_ID}, {DAGSTER_SDF_CATALOG_NAME}, {DAGSTER_SDF_SCHEMA_NAME}, {DAGSTER_SDF_TABLE_NAME}, {DAGSTER_SDF_DIALECT}" + ) + run_args.append(table_id) + test_name_prefix = get_test_prefix(table_dialect) + # If the command is test, then we need to add the test name to the run_args (temporary: until sdf-cli applies --targets-only for test) + if args[0] == "test": + run_args.append(f"{catalog}.{schema}.{test_name_prefix}{table_name}") + # Pass the current environment variables to the sdf CLI invocation. env = os.environ.copy() @@ -155,6 +219,7 @@ def cli( *args, *environment_args, *target_args, + *run_args, ] return SdfCliInvocation.run( @@ -200,3 +265,28 @@ def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> P current_output_path = Path(self.workspace_dir).joinpath(SDF_DAGSTER_OUTPUT_DIR) return current_output_path.joinpath(path) + + def _validate_sdf_version(self) -> None: + """Validate that the sdf version is compatible with the current version of the sdf CLI.""" + try: + result = subprocess.run( + ["sdf", "--version"], capture_output=True, text=True, check=True + ) + output = result.stdout.strip() + match = re.search(r"sdf (\d+\.\d+\.\d+)", output) + if match: + version = match.group(1) + if version < SDF_VERSION_LOWER_BOUND or version >= SDF_VERSION_UPPER_BOUND: + logging.warn( + f"The sdf version '{version}' is not within the supported range of" + f" '{SDF_VERSION_LOWER_BOUND}' to '{SDF_VERSION_UPPER_BOUND}'. Check your" + " environment to ensure that the correct version of sdf is being used." + ) + else: + logging.warn( + "Failed to extract the sdf version from the output. Check your environment to" + " ensure that the correct version of sdf is being used." + ) + except subprocess.CalledProcessError as e: + logging.warn(f"Failed to get the sdf version: {e}") + exit(1) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py index b8e81df60a4fb..84b951edeec8b 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Dict, Iterator, Optional +from typing import AbstractSet, Any, Dict, Iterator, Optional from dagster import ( AssetCheckResult, @@ -10,7 +10,15 @@ ) from dagster._annotations import public -from .asset_utils import dagster_name_fn +from .asset_utils import dagster_name_fn, exists_in_selected +from .constants import ( + DAGSTER_SDF_CATALOG_NAME, + DAGSTER_SDF_DIALECT, + DAGSTER_SDF_PURPOSE, + DAGSTER_SDF_SCHEMA_NAME, + DAGSTER_SDF_TABLE_ID, + DAGSTER_SDF_TABLE_NAME, +) from .dagster_sdf_translator import DagsterSdfTranslator from .sdf_event_iterator import SdfDagsterEventType @@ -33,6 +41,7 @@ def is_result_event(self) -> bool: and bool(self.raw_event.get("ev_tb_schema")) and bool(self.raw_event.get("ev_tb_table")) and bool(self.raw_event.get("st_code")) + and bool(self.raw_event.get("st_done")) and bool(self.raw_event.get("st_dur_ms")) ) @@ -64,16 +73,43 @@ def to_default_asset_events( if not is_success: return - table_id = self.raw_event["ev_tb"] + selected_output_names: AbstractSet[str] = ( + context.selected_output_names if context else set() + ) + catalog = self.raw_event["ev_tb_catalog"] schema = self.raw_event["ev_tb_schema"] table = self.raw_event["ev_tb_table"] + purpose = self.raw_event["ev_tb_purpose"] + dialect = self.raw_event["ev_tb_dialect"] + + # If assets are selected, only yield events for selected assets + if len(selected_output_names) > 0: + exists = exists_in_selected( + catalog, + schema, + table, + purpose, + dialect, + selected_output_names, + dagster_sdf_translator.settings.enable_asset_checks, + ) + if not exists: + return + + table_id = self.raw_event["ev_tb"] default_metadata = { - "table_id": table_id, + DAGSTER_SDF_TABLE_ID: table_id, + DAGSTER_SDF_CATALOG_NAME: catalog, + DAGSTER_SDF_SCHEMA_NAME: schema, + DAGSTER_SDF_TABLE_NAME: table, + DAGSTER_SDF_PURPOSE: purpose, + DAGSTER_SDF_DIALECT: dialect, "Execution Duration": self.raw_event["st_dur_ms"] / 1000, + "Materialized From Cache": True if self.raw_event["st_done"] == "cached" else False, } asset_key = dagster_sdf_translator.get_asset_key(catalog, schema, table) - if self.raw_event["ev_tb_purpose"] == "model": + if purpose == "model": has_asset_def = bool(context and context.has_assets_def) event = ( Output( @@ -93,10 +129,7 @@ def to_default_asset_events( ) yield event - elif ( - self.raw_event["ev_tb_purpose"] == "test" - and dagster_sdf_translator.settings.enable_asset_checks - ): + elif purpose == "test" and dagster_sdf_translator.settings.enable_asset_checks: passed = self.raw_event["st_verdict"] == "passed" asset_check_key = dagster_sdf_translator.get_check_key_for_test(catalog, schema, table) # if the asset key is the same as the asset check key, then the test is not a table / column test diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py index ed9eb984d59d5..bdb3dcae2f398 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py @@ -133,7 +133,9 @@ def _stream_asset_events( workspace_dir=self.workspace_dir, target_dir=self.target_dir, environment=self.environment, - ).stream_asset_observations(dagster_sdf_translator=self.dagster_sdf_translator) + ).stream_asset_observations( + dagster_sdf_translator=self.dagster_sdf_translator, context=self.context + ) @public def stream( diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py index f91b30edbf736..61f2af3ebd66f 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py @@ -1,10 +1,30 @@ import os from pathlib import Path -from typing import Any, Dict, Iterator, List, Literal, Sequence, Set, Tuple, Union +from typing import ( + AbstractSet, + Any, + Dict, + Iterator, + List, + Literal, + Optional, + Sequence, + Set, + Tuple, + Union, +) import dagster._check as check import polars as pl -from dagster import AssetCheckSpec, AssetKey, AssetObservation, AssetSpec, TableColumn +from dagster import ( + AssetCheckSpec, + AssetKey, + AssetObservation, + AssetSpec, + OpExecutionContext, + TableColumn, + get_dagster_logger, +) from dagster._core.definitions.metadata import ( CodeReferencesMetadataSet, CodeReferencesMetadataValue, @@ -15,8 +35,14 @@ ) from dagster._record import IHaveNew, record_custom -from .asset_utils import get_info_schema_dir, get_output_dir +from .asset_utils import exists_in_selected, get_info_schema_dir, get_output_dir from .constants import ( + DAGSTER_SDF_CATALOG_NAME, + DAGSTER_SDF_DIALECT, + DAGSTER_SDF_PURPOSE, + DAGSTER_SDF_SCHEMA_NAME, + DAGSTER_SDF_TABLE_ID, + DAGSTER_SDF_TABLE_NAME, DEFAULT_SDF_WORKSPACE_ENVIRONMENT, SDF_INFORMATION_SCHEMA_TABLES_STAGE_COMPILE, SDF_INFORMATION_SCHEMA_TABLES_STAGE_PARSE, @@ -24,6 +50,8 @@ from .dagster_sdf_translator import DagsterSdfTranslator from .sdf_event_iterator import SdfDagsterEventType +logger = get_dagster_logger() + @record_custom(checked=False) class SdfInformationSchema(IHaveNew): @@ -111,6 +139,11 @@ def build_sdf_multi_asset_args( table_deps = self.read_table("table_deps").filter( ~pl.col("purpose").is_in(["system", "external-system"]) ) + table_columns: Dict[str, List[TableColumn]] = {} + try: + table_columns = self.get_columns() + except Exception: + logger.warn("Column information schema table could not be read.") # Step 1: Build Map of Table Deps to Rows table_rows_deps = {row["table_id"]: row for row in table_deps.rows(named=True)} @@ -142,7 +175,21 @@ def build_sdf_multi_asset_args( code_references = None if dagster_sdf_translator.settings.enable_code_references: code_references = self._extract_code_ref(table_row) - metadata = {**(code_references if code_references else {})} + metadata = { + **TableMetadataSet( + column_schema=TableSchema( + columns=table_columns.get(table_row["table_id"], []), + ), + relation_identifier=table_row["table_id"], + ), + **(code_references if code_references else {}), + DAGSTER_SDF_TABLE_ID: table_row["table_id"], + DAGSTER_SDF_CATALOG_NAME: table_row["catalog_name"], + DAGSTER_SDF_SCHEMA_NAME: table_row["schema_name"], + DAGSTER_SDF_TABLE_NAME: table_row["table_name"], + DAGSTER_SDF_PURPOSE: table_row["purpose"], + DAGSTER_SDF_DIALECT: table_row["dialect"], + } # If the table is a annotated as a dependency, we don't need to create an output for it if ( table_row["table_id"] not in table_id_to_dep @@ -176,8 +223,7 @@ def build_sdf_multi_asset_args( # This registers an asset check on all inner tables, since SDF will execute all tests as a single query (greedy approach) # If no table or column tests are registered, they will simply be skipped if dagster_sdf_translator.settings.enable_asset_checks: - test_name_prefix = "TEST_" if table_row["dialect"] == "snowflake" else "test_" - test_name = f"{table_row['catalog_name']}.{table_row['schema_name']}.{test_name_prefix}{table_row['table_name']}" + test_name = "test" asset_checks.append( AssetCheckSpec( name=test_name, @@ -197,9 +243,11 @@ def get_columns(self) -> Dict[str, List[TableColumn]]: table_columns[row["table_id"]].append( TableColumn( name=row["column_name"], - type=row["datatype"], + type=row["datatype"] if row["datatype"] else "unknown", description=row["description"], - constraints=TableColumnConstraints(other=row["classifiers"]), + constraints=TableColumnConstraints( + other=row["classifiers"] if row["classifiers"] else [] + ), ) ) return table_columns @@ -238,14 +286,31 @@ def _extract_code_ref( return code_references def stream_asset_observations( - self, dagster_sdf_translator: DagsterSdfTranslator + self, + dagster_sdf_translator: DagsterSdfTranslator, + context: Optional[OpExecutionContext] = None, ) -> Iterator[SdfDagsterEventType]: - table_columns = self.get_columns() - tables = self.read_table("tables").filter( - ~pl.col("purpose").is_in(["system", "external-system"]) + selected_output_names: AbstractSet[str] = ( + context.selected_output_names if context else set() ) - + tables = self.read_table("tables") + tables = tables.filter(~pl.col("purpose").is_in(["system", "external-system"])) + table_columns = self.get_columns() for table_row in tables.rows(named=True): + # If selected outputs are expected, only yield events for selected outputs + if len(selected_output_names) > 0: + exists = exists_in_selected( + table_row["catalog_name"], + table_row["schema_name"], + table_row["table_name"], + table_row["purpose"], + table_row["dialect"], + selected_output_names, + dagster_sdf_translator.settings.enable_asset_checks, + ) + if not exists: + continue + asset_key = dagster_sdf_translator.get_asset_key( table_row["catalog_name"], table_row["schema_name"], table_row["table_name"] ) @@ -260,6 +325,12 @@ def stream_asset_observations( relation_identifier=table_row["table_id"], ), **(code_references if code_references else {}), + DAGSTER_SDF_TABLE_ID: table_row["table_id"], + DAGSTER_SDF_CATALOG_NAME: table_row["catalog_name"], + DAGSTER_SDF_SCHEMA_NAME: table_row["schema_name"], + DAGSTER_SDF_TABLE_NAME: table_row["table_name"], + DAGSTER_SDF_PURPOSE: table_row["purpose"], + DAGSTER_SDF_DIALECT: table_row["dialect"], } yield AssetObservation( asset_key=asset_key, diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py index 03c6ea57ae3ab..f804033930add 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py @@ -1 +1,2 @@ -SDF_VERSION_UPPER_BOUND = "0.3.22" +SDF_VERSION_UPPER_BOUND = "0.3.24" +SDF_VERSION_LOWER_BOUND = "0.3.23" diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py index e4e9c47dbbb2b..b23ddcf873d48 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py @@ -51,9 +51,13 @@ def __init__( Args: generate_cli_args (Sequence[str]): The arguments to pass to the sdf cli to prepare the workspace. - Default: ["compile", "--save==table-deps"] + Default: ["compile", "--save==table-deps,info-schema"] """ - self._generate_cli_args = generate_cli_args or ["compile", "--save", "table-deps"] + self._generate_cli_args = generate_cli_args or [ + "compile", + "--save", + "table-deps,info-schema", + ] def on_load(self, workspace: "SdfWorkspace"): if self.using_dagster_dev() or self.compile_on_load_opt_in(): diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py index 58287bb9f9e3f..b54b9d0ed2c38 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py @@ -18,9 +18,7 @@ def _create_sdf_invocation( run_workspace: bool = False, environment: str = DEFAULT_SDF_WORKSPACE_ENVIRONMENT, ): - sdf = SdfCliResource( - workspace_dir=os.fspath(workspace_dir), global_config_flags=["--log-form=nested"] - ) + sdf = SdfCliResource(workspace_dir=os.fspath(workspace_dir)) sdf_invocation = sdf.cli( ["compile", "--save", "table-deps"], environment=environment, raise_on_error=False diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_checks.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_checks.py index 12a61e09b027d..1cdce1cc948c4 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_checks.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_checks.py @@ -12,7 +12,6 @@ def test_asset_checks_passing() -> None: sdf = SdfCliResource( workspace_dir=os.fspath(lineage_asset_checks_path), - global_config_flags=["--log-form=nested"], ) environment = "passing_tests" sdf_cli_invocation = sdf.cli(["compile", "--save", "table-deps"], environment=environment) @@ -37,14 +36,30 @@ def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource): context=context, ).stream() - result = materialize( + first_result = materialize( [my_sdf_assets], resources={"sdf": SdfCliResource(workspace_dir=lineage_asset_checks_path)}, ) - assert result.success - assert len(result.get_asset_check_evaluations()) > 0 - evaluation = result.get_asset_check_evaluations()[0] + first_num_asset_check_evaluations = len(first_result.get_asset_check_evaluations()) + + assert first_result.success + assert first_num_asset_check_evaluations > 0 + evaluation = first_result.get_asset_check_evaluations()[0] + assert evaluation.asset_key == AssetKey(["lineage", "pub", "middle"]) + assert evaluation.passed + + cache_result = materialize( + [my_sdf_assets], + resources={"sdf": SdfCliResource(workspace_dir=lineage_asset_checks_path)}, + ) + + cached_num_asset_check_evaluations = len(cache_result.get_asset_check_evaluations()) + + assert cache_result.success + assert cached_num_asset_check_evaluations > 0 + assert first_num_asset_check_evaluations == cached_num_asset_check_evaluations + evaluation = cache_result.get_asset_check_evaluations()[0] assert evaluation.asset_key == AssetKey(["lineage", "pub", "middle"]) assert evaluation.passed @@ -52,7 +67,6 @@ def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource): def test_asset_checks_failing() -> None: sdf = SdfCliResource( workspace_dir=os.fspath(lineage_asset_checks_path), - global_config_flags=["--log-form=nested"], ) dagster_sdf_translator = DagsterSdfTranslator( settings=DagsterSdfTranslatorSettings(enable_asset_checks=True) @@ -78,13 +92,29 @@ def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource): raise_on_error=False, ).stream() - result = materialize( + first_result = materialize( [my_sdf_assets], resources={"sdf": SdfCliResource(workspace_dir=lineage_asset_checks_path)}, ) - assert result.success - assert len(result.get_asset_check_evaluations()) > 0 - evaluation = result.get_asset_check_evaluations()[0] + first_num_asset_check_evaluations = len(first_result.get_asset_check_evaluations()) + + assert first_result.success + assert first_num_asset_check_evaluations > 0 + evaluation = first_result.get_asset_check_evaluations()[0] + assert evaluation.asset_key == AssetKey(["lineage", "pub", "middle"]) + assert not evaluation.passed + + cache_result = materialize( + [my_sdf_assets], + resources={"sdf": SdfCliResource(workspace_dir=lineage_asset_checks_path)}, + ) + + cached_num_asset_check_evaluations = len(cache_result.get_asset_check_evaluations()) + + assert cache_result.success + assert cached_num_asset_check_evaluations > 0 + assert first_num_asset_check_evaluations == cached_num_asset_check_evaluations + evaluation = cache_result.get_asset_check_evaluations()[0] assert evaluation.asset_key == AssetKey(["lineage", "pub", "middle"]) assert not evaluation.passed diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py index 175fe93ee5499..9cc4f63cd0ea8 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py @@ -96,13 +96,30 @@ def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource): context=context, ).stream() - result = materialize( + first_result = materialize( [my_sdf_assets], resources={"sdf": SdfCliResource(workspace_dir=moms_flower_shop_path)}, ) - assert result.success - assert result.get_asset_materialization_events() + assert first_result.success + first_num_asset_materialization_events = len(first_result.get_asset_materialization_events()) + assert first_num_asset_materialization_events > 0 + + cached_result = materialize( + [my_sdf_assets], + resources={"sdf": SdfCliResource(workspace_dir=moms_flower_shop_path)}, + ) + + assert cached_result.success + materialization_events = cached_result.get_asset_materialization_events() + assert not any( + [ + not event.materialization.metadata["Materialized From Cache"] + for event in materialization_events + ] + ) + cached_num_asset_materialization_events = len(cached_result.get_asset_materialization_events()) + assert first_num_asset_materialization_events == cached_num_asset_materialization_events def test_with_custom_translater_asset_key_fn(moms_flower_shop_target_dir: Path) -> None: @@ -214,45 +231,43 @@ def test_asset_descriptions(moms_flower_shop_target_dir: str) -> None: ) def my_flower_shop_assets(): ... - # Currently, the expected output is duplicated, but this is a bug in the sdf cli and will be - # fixed in a future release. assert my_flower_shop_assets.descriptions_by_key == { - AssetKey( - ["moms_flower_shop", "staging", "app_installs"] - ): "This table is a staging table which adds campaign information to app install events\n\nThis table is a staging table which adds campaign information to app install events\n", AssetKey( ["moms_flower_shop", "analytics", "agg_installs_and_campaigns"] ): "sdf view moms_flower_shop.analytics.agg_installs_and_campaigns", AssetKey( - ["moms_flower_shop", "raw", "raw_inapp_events"] - ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.\nLogged actions (events) that users perform inside the mobile app of mom's flower shop.", + ["moms_flower_shop", "staging", "inapp_events"] + ): "sdf view moms_flower_shop.staging.inapp_events", AssetKey( ["moms_flower_shop", "analytics", "dim_marketing_campaigns"] ): "sdf view moms_flower_shop.analytics.dim_marketing_campaigns", AssetKey( - ["moms_flower_shop", "raw", "raw_marketing_campaign_events"] - ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\nAn hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n", + ["moms_flower_shop", "staging", "app_installs"] + ): "This table is a staging table which adds campaign information to app install events\n", AssetKey( - ["moms_flower_shop", "raw", "raw_customers"] - ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n", + ["moms_flower_shop", "raw", "raw_addresses"] + ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n", AssetKey( - ["moms_flower_shop", "staging", "inapp_events"] - ): "sdf view moms_flower_shop.staging.inapp_events", + ["moms_flower_shop", "raw", "raw_marketing_campaign_events"] + ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n", AssetKey( - ["moms_flower_shop", "raw", "raw_addresses"] - ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n", + ["moms_flower_shop", "raw", "raw_inapp_events"] + ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.", AssetKey( - ["moms_flower_shop", "staging", "stg_installs_per_campaign"] - ): "sdf view moms_flower_shop.staging.stg_installs_per_campaign", + ["moms_flower_shop", "raw", "raw_customers"] + ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n", AssetKey( ["moms_flower_shop", "staging", "app_installs_v2"] ): "sdf view moms_flower_shop.staging.app_installs_v2", AssetKey( - ["moms_flower_shop", "staging", "customers"] - ): "sdf view moms_flower_shop.staging.customers", + ["moms_flower_shop", "staging", "stg_installs_per_campaign"] + ): "sdf view moms_flower_shop.staging.stg_installs_per_campaign", AssetKey( ["moms_flower_shop", "staging", "marketing_campaigns"] ): "sdf view moms_flower_shop.staging.marketing_campaigns", + AssetKey( + ["moms_flower_shop", "staging", "customers"] + ): "sdf view moms_flower_shop.staging.customers", } @@ -268,43 +283,41 @@ def test_asset_descriptions_with_raw_sql(moms_flower_shop_target_dir: str) -> No ) def my_flower_shop_assets(): ... - # Currently, the expected output is duplicated, but this is a bug in the sdf cli and will be - # fixed in a future release. assert my_flower_shop_assets.descriptions_by_key == { - AssetKey( - ["moms_flower_shop", "analytics", "dim_marketing_campaigns"] - ): "sdf view moms_flower_shop.analytics.dim_marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n -- marketing campaigns dimensions\n m.campaign_id,\n m.campaign_name,\n -- metrics\n i.total_num_installs,\n total_campaign_spent / \n NULLIF(i.total_num_installs, 0) AS avg_customer_acquisition_cost,\n campaign_duration / \n NULLIF(i.total_num_installs, 0) AS install_duration_ratio\n FROM staging.marketing_campaigns m\n LEFT OUTER JOIN staging.stg_installs_per_campaign i\n ON (m.campaign_id = i.campaign_id)\n ORDER BY total_num_installs DESC NULLS LAST\n```", - AssetKey( - ["moms_flower_shop", "raw", "raw_customers"] - ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_customers \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/customers.parquet');\n```", - AssetKey( - ["moms_flower_shop", "raw", "raw_inapp_events"] - ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.\nLogged actions (events) that users perform inside the mobile app of mom's flower shop.\n\n#### Raw SQL:\n```\n CREATE TABLE raw_inapp_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/inapp_events.parquet');\n```", AssetKey( ["moms_flower_shop", "analytics", "agg_installs_and_campaigns"] ): "sdf view moms_flower_shop.analytics.agg_installs_and_campaigns\n\n#### Raw SQL:\n```\n SELECT \n -- install events data\n DATE_FORMAT(install_time, '%Y-%m-%d') AS install_date,\n campaign_name,\n platform,\n COUNT(DISTINCT customer_id) AS distinct_installs\n FROM staging.app_installs_v2\n GROUP BY 1,2,3\n```", AssetKey( - ["moms_flower_shop", "raw", "raw_addresses"] - ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_addresses \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/addresses.parquet');\n```", + ["moms_flower_shop", "staging", "app_installs"] + ): "This table is a staging table which adds campaign information to app install events\n\n\n#### Raw SQL:\n```\n SELECT \n -- install events data\n COALESCE(m.event_id, i.event_id) AS event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.event_id = m.event_id) \n WHERE event_name = 'install'\n```", + AssetKey( + ["moms_flower_shop", "analytics", "dim_marketing_campaigns"] + ): "sdf view moms_flower_shop.analytics.dim_marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n -- marketing campaigns dimensions\n m.campaign_id,\n m.campaign_name,\n -- metrics\n i.total_num_installs,\n total_campaign_spent / \n NULLIF(i.total_num_installs, 0) AS avg_customer_acquisition_cost,\n campaign_duration / \n NULLIF(i.total_num_installs, 0) AS install_duration_ratio\n FROM staging.marketing_campaigns m\n LEFT OUTER JOIN staging.stg_installs_per_campaign i\n ON (m.campaign_id = i.campaign_id)\n ORDER BY total_num_installs DESC NULLS LAST\n```", AssetKey( ["moms_flower_shop", "raw", "raw_marketing_campaign_events"] - ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\nAn hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_marketing_campaign_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/marketing_campaign_events.parquet');\n```", + ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_marketing_campaign_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/marketing_campaign_events.parquet');\n```", + AssetKey( + ["moms_flower_shop", "raw", "raw_customers"] + ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_customers \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/customers.parquet');\n```", AssetKey( ["moms_flower_shop", "staging", "inapp_events"] ): "sdf view moms_flower_shop.staging.inapp_events\n\n#### Raw SQL:\n```\n SELECT \n event_id,\n customer_id,\n FROM_UNIXTIME(event_time/1000) AS event_time, \n event_name,\n event_value,\n additional_details,\n platform,\n campaign_id\n FROM raw.raw_inapp_events\n```", AssetKey( - ["moms_flower_shop", "staging", "app_installs"] - ): "This table is a staging table which adds campaign information to app install events\n\nThis table is a staging table which adds campaign information to app install events\n\n\n#### Raw SQL:\n```\n SELECT \n -- install events data\n COALESCE(m.event_id, i.event_id) AS event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.event_id = m.event_id) \n WHERE event_name = 'install'\n```", + ["moms_flower_shop", "raw", "raw_addresses"] + ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_addresses \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/addresses.parquet');\n```", AssetKey( - ["moms_flower_shop", "staging", "app_installs_v2"] - ): "sdf view moms_flower_shop.staging.app_installs_v2\n\n#### Raw SQL:\n```\n SELECT \n DISTINCT\n -- install events data\n i.event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.campaign_id = m.campaign_id) \n WHERE event_name = 'install'\n```", + ["moms_flower_shop", "raw", "raw_inapp_events"] + ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.\n\n#### Raw SQL:\n```\n CREATE TABLE raw_inapp_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/inapp_events.parquet');\n```", AssetKey( ["moms_flower_shop", "staging", "customers"] ): "sdf view moms_flower_shop.staging.customers\n\n#### Raw SQL:\n```\n SELECT \n c.id AS customer_id,\n c.first_name,\n c.last_name,\n c.first_name || ' ' || c.last_name AS full_name,\n c.email,\n c.gender,\n \n -- Marketing info\n i.campaign_id,\n i.campaign_name,\n i.campaign_type,\n\n -- Address info\n c.address_id,\n a.full_address,\n a.state\n FROM raw.raw_customers c \n\n LEFT OUTER JOIN app_installs_v2 i\n ON (c.id = i.customer_id)\n\n LEFT OUTER JOIN raw.raw_addresses a\n ON (c.address_id = a.address_id)\n```", AssetKey( - ["moms_flower_shop", "staging", "marketing_campaigns"] - ): "sdf view moms_flower_shop.staging.marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n campaign_id,\n campaign_name,\n SUBSTR(c_name, 1, LENGTH(c_name)-1) AS campaign_type,\n MIN(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS start_time,\n MAX(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS end_time,\n COUNT(event_time) AS campaign_duration,\n SUM(cost) AS total_campaign_spent,\n ARRAY_AGG(event_id) AS event_ids\n FROM raw.raw_marketing_campaign_events\n GROUP BY \n campaign_id,\n campaign_name,\n campaign_type\n```", + ["moms_flower_shop", "staging", "app_installs_v2"] + ): "sdf view moms_flower_shop.staging.app_installs_v2\n\n#### Raw SQL:\n```\n SELECT \n DISTINCT\n -- install events data\n i.event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.campaign_id = m.campaign_id) \n WHERE event_name = 'install'\n```", AssetKey( ["moms_flower_shop", "staging", "stg_installs_per_campaign"] ): "sdf view moms_flower_shop.staging.stg_installs_per_campaign\n\n#### Raw SQL:\n```\n SELECT \n campaign_id,\n COUNT(event_id) AS total_num_installs\n FROM app_installs_v2\n GROUP BY 1\n```", + AssetKey( + ["moms_flower_shop", "staging", "marketing_campaigns"] + ): "sdf view moms_flower_shop.staging.marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n campaign_id,\n campaign_name,\n SUBSTR(c_name, 1, LENGTH(c_name)-1) AS campaign_type,\n MIN(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS start_time,\n MAX(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS end_time,\n COUNT(event_time) AS campaign_duration,\n SUM(cost) AS total_campaign_spent,\n ARRAY_AGG(event_id) AS event_ids\n FROM raw.raw_marketing_campaign_events\n GROUP BY \n campaign_id,\n campaign_name,\n campaign_type\n```", } diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py index ae9673c83a5b1..cee148488d560 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py @@ -1,14 +1,23 @@ import os import shutil from pathlib import Path -from typing import List, cast +from typing import cast import pydantic import pytest from dagster import In, Nothing, Out, job, op from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.execution.context.compute import OpExecutionContext -from dagster_sdf.constants import SDF_DAGSTER_OUTPUT_DIR, SDF_TARGET_DIR +from dagster_sdf.constants import ( + DAGSTER_SDF_CATALOG_NAME, + DAGSTER_SDF_DIALECT, + DAGSTER_SDF_PURPOSE, + DAGSTER_SDF_SCHEMA_NAME, + DAGSTER_SDF_TABLE_ID, + DAGSTER_SDF_TABLE_NAME, + SDF_DAGSTER_OUTPUT_DIR, + SDF_TARGET_DIR, +) from dagster_sdf.resource import SdfCliResource from pydantic import ValidationError from pytest_mock import MockerFixture @@ -21,11 +30,9 @@ def sdf_fixture() -> SdfCliResource: return SdfCliResource(workspace_dir=os.fspath(moms_flower_shop_path)) -@pytest.mark.parametrize("global_config_flags", [["--log-form=nested"]]) -def test_sdf_cli(global_config_flags: List[str]) -> None: +def test_sdf_cli() -> None: expected_sdf_cli_args = [ "sdf", - *global_config_flags, "--log-level", "info", "compile", @@ -36,9 +43,7 @@ def test_sdf_cli(global_config_flags: List[str]) -> None: "--target-dir", ] - sdf = SdfCliResource( - workspace_dir=os.fspath(moms_flower_shop_path), global_config_flags=global_config_flags - ) + sdf = SdfCliResource(workspace_dir=os.fspath(moms_flower_shop_path)) sdf_cli_invocation = sdf.cli(["compile", "--save", "table-deps"]) *_, target_dir = sdf_cli_invocation.process.args # type: ignore @@ -215,5 +220,11 @@ def my_sdf_job_yield_events(): for event in materialization_events: metadata = event.materialization.metadata - assert metadata["table_id"] + assert metadata[DAGSTER_SDF_TABLE_ID] + assert metadata[DAGSTER_SDF_CATALOG_NAME] + assert metadata[DAGSTER_SDF_SCHEMA_NAME] + assert metadata[DAGSTER_SDF_TABLE_NAME] + assert metadata[DAGSTER_SDF_PURPOSE] + assert metadata[DAGSTER_SDF_DIALECT] assert metadata["Execution Duration"] + assert metadata["Materialized From Cache"] is not None diff --git a/python_modules/libraries/dagster-sdf/setup.py b/python_modules/libraries/dagster-sdf/setup.py index 190bfae8f9c10..67d1ed2b92b8e 100644 --- a/python_modules/libraries/dagster-sdf/setup.py +++ b/python_modules/libraries/dagster-sdf/setup.py @@ -4,7 +4,7 @@ from setuptools import find_packages, setup -def get_version() -> Tuple[str, str]: +def get_version() -> Tuple[str, str, str]: version: Dict[str, str] = {} sdf_version: Dict[str, str] = {} with open(Path(__file__).parent / "dagster_sdf/version.py", encoding="utf8") as fp: @@ -13,10 +13,14 @@ def get_version() -> Tuple[str, str]: with open(Path(__file__).parent / "dagster_sdf/sdf_version.py", encoding="utf8") as fp: exec(fp.read(), sdf_version) - return version["__version__"], sdf_version["SDF_VERSION_UPPER_BOUND"] + return ( + version["__version__"], + sdf_version["SDF_VERSION_UPPER_BOUND"], + sdf_version["SDF_VERSION_LOWER_BOUND"], + ) -dagster_sdf_version, SDF_VERSION_UPPER_BOUND = get_version() +dagster_sdf_version, SDF_VERSION_UPPER_BOUND, SDF_VERSION_LOWER_BOUND = get_version() # dont pin dev installs to avoid pip dep resolver issues pin = "" if dagster_sdf_version == "1!0+dev" else f"=={dagster_sdf_version}" setup( @@ -41,7 +45,7 @@ def get_version() -> Tuple[str, str]: python_requires=">=3.8,<3.13", install_requires=[ f"dagster{pin}", - f"sdf-cli>=0.3.21,<{SDF_VERSION_UPPER_BOUND}", + f"sdf-cli>={SDF_VERSION_LOWER_BOUND},<{SDF_VERSION_UPPER_BOUND}", "orjson", "polars", "typer>=0.9.0",