Skip to content

Commit

Permalink
[dagster-sdf] Enable Caching & Asset Selection (#23750)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR enables caching simply by upgrading the version of the `sdf-cli`
which now outputs cached dependency in std-out in their dependency
order.

It also enables asset selection, via sdf's `--targets-only` flag and
passing in the fully qualified names of selected assets as targets to
the `sdf run` or `sdf test` commands. There are some hoops to jump
through with regards to asset checks, but this logic will be cleaned up
on the `sdf-cli` in future releases.

Bonus:
1. Duplicate Descriptions (this is fixed with the latest release of the
`sdf-cli`)
2. The `--log-form` flag is not necessary with the latest release

## How I Tested These Changes

Tests for caching have been added. Asset selection is covered by
existing tests.
  • Loading branch information
akbog authored Aug 22, 2024
1 parent f529821 commit 407ea5c
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 101 deletions.
38 changes: 36 additions & 2 deletions python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import textwrap
from pathlib import Path
from typing import Any, Dict, Mapping, Optional, Sequence
from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence

from dagster import (
AssetCheckKey,
Expand Down Expand Up @@ -36,7 +36,7 @@ def default_asset_check_key_fn(catalog: str, schema: str, table: str) -> AssetCh
else:
asset_key = AssetKey([catalog, schema, table])
return AssetCheckKey(
name=f"{catalog}.{schema}.{table}",
name="test",
asset_key=asset_key,
)

Expand Down Expand Up @@ -201,3 +201,37 @@ def default_description_fn(
f"#### Materialized SQL:\n```\n{_read_sql_file(path_to_file)}\n```"
)
return "\n\n".join(filter(None, description_sections))


def get_test_prefix(table_dialect: str) -> str:
if table_dialect == "snowflake":
return "TEST_"
else:
return "test_"


def exists_in_selected(
catalog_name: str,
schema_name: str,
table_name: str,
purpose: str,
dialect: str,
selected_output_names: AbstractSet[str],
asset_checks_enabled: bool,
) -> bool:
asset_output_name = dagster_name_fn(catalog_name, schema_name, table_name)
# If asset checks are enabled, ensure tests are only yielded for selected assets
if asset_checks_enabled:
# Strip test_name_prefix from start of table_id if it exists
if purpose == "test":
test_name_prefix = get_test_prefix(dialect)
asset_output_name = (
dagster_name_fn(catalog_name, schema_name, table_name[len(test_name_prefix) :])
if table_name.startswith(test_name_prefix)
else asset_output_name
)

if asset_output_name in selected_output_names:
return True
else:
return False
7 changes: 7 additions & 0 deletions python_modules/libraries/dagster-sdf/dagster_sdf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@
SDF_INFORMATION_SCHEMA_TABLES_STAGE_PARSE = ["table_deps"]
DEFAULT_SDF_WORKSPACE_ENVIRONMENT = "dbg"
SDF_WORKSPACE_YML = "workspace.sdf.yml"

DAGSTER_SDF_TABLE_ID = "dagster_sdf/table_id"
DAGSTER_SDF_CATALOG_NAME = "dagster_sdf/catalog"
DAGSTER_SDF_SCHEMA_NAME = "dagster_sdf/schema"
DAGSTER_SDF_TABLE_NAME = "dagster_sdf/table_name"
DAGSTER_SDF_PURPOSE = "dagster_sdf/purpose"
DAGSTER_SDF_DIALECT = "dagster_sdf/dialect"
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ workspace = SdfWorkspace(workspace_dir=sdf_workspace_dir, target_dir=target_dir,

@sdf_assets(workspace=workspace)
def {{ sdf_assets_name }}(context: AssetExecutionContext, sdf: SdfCliResource):
yield from sdf.cli(["run", "--save", "info-schema", "--cache", "write-only"], target_dir=target_dir, environment=environment, context=context).stream()
yield from sdf.cli(["run", "--save", "info-schema"], target_dir=target_dir, environment=environment, context=context).stream()
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ defs = Definitions(
schedules=schedules,
resources={
"sdf": SdfCliResource(
workspace_dir=sdf_workspace_dir,
global_config_flags=["--log-form=nested"]
workspace_dir=sdf_workspace_dir
),
},
)
94 changes: 92 additions & 2 deletions python_modules/libraries/dagster-sdf/dagster_sdf/resource.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
import os
import re
import shutil
import subprocess
import uuid
from contextlib import suppress
from pathlib import Path
from typing import Any, List, Optional, Sequence, Union

from dagster import AssetExecutionContext, ConfigurableResource, OpExecutionContext
from dagster import (
AssetExecutionContext,
AssetsDefinition,
ConfigurableResource,
DagsterInvariantViolationError,
OpExecutionContext,
get_dagster_logger,
)
from dagster._annotations import experimental, public
from dagster._core.errors import DagsterInvalidPropertyError
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import Field, validator

from .asset_utils import dagster_name_fn, get_test_prefix
from .constants import (
DAGSTER_SDF_CATALOG_NAME,
DAGSTER_SDF_DIALECT,
DAGSTER_SDF_SCHEMA_NAME,
DAGSTER_SDF_TABLE_ID,
DAGSTER_SDF_TABLE_NAME,
DEFAULT_SDF_WORKSPACE_ENVIRONMENT,
SDF_DAGSTER_OUTPUT_DIR,
SDF_EXECUTABLE,
SDF_WORKSPACE_YML,
)
from .dagster_sdf_translator import DagsterSdfTranslator, validate_opt_translator
from .sdf_cli_invocation import SdfCliInvocation
from .sdf_version import SDF_VERSION_LOWER_BOUND, SDF_VERSION_UPPER_BOUND
from .sdf_workspace import SdfWorkspace

logging = get_dagster_logger()


@suppress_dagster_warnings
@experimental
Expand Down Expand Up @@ -131,12 +151,56 @@ def cli(
SdfCliInvocation: A invocation instance that can be used to retrieve the output of the
sdf CLI command.
"""
self._validate_sdf_version()
dagster_sdf_translator = validate_opt_translator(dagster_sdf_translator)
dagster_sdf_translator = dagster_sdf_translator or DagsterSdfTranslator()
assets_def: Optional[AssetsDefinition] = None
with suppress(DagsterInvalidPropertyError):
assets_def = context.assets_def if context else None

context = (
context.op_execution_context if isinstance(context, AssetExecutionContext) else context
)

dagster_sdf_translator = dagster_sdf_translator or DagsterSdfTranslator()
run_args = []
# If the context and assets_def are not None, then we need to pass in the selected targets
# to the sdf CLI invocation.
if context and assets_def is not None:
run_args.append(
"--targets-only"
) # This flag is used to only run the selected targets (and not upstreams)
selected_output_names = context.selected_output_names
for asset_key, asset_metadata in assets_def.metadata_by_key.items():
asset_output_name = dagster_name_fn(
asset_key.path[0], asset_key.path[1], asset_key.path[2]
)
# Check if this output is expected, if so, add the table_id and optionally the expected test name to the run_args
if selected_output_names and asset_output_name in selected_output_names:
table_id: Optional[str] = asset_metadata.get(DAGSTER_SDF_TABLE_ID)
catalog: Optional[str] = asset_metadata.get(DAGSTER_SDF_CATALOG_NAME)
schema: Optional[str] = asset_metadata.get(DAGSTER_SDF_SCHEMA_NAME)
table_name: Optional[str] = asset_metadata.get(DAGSTER_SDF_TABLE_NAME)
table_dialect: Optional[str] = asset_metadata.get(DAGSTER_SDF_DIALECT)
# If any of the metadata is missing, raise an error
if (
table_id is None
or catalog is None
or schema is None
or table_name is None
or table_dialect is None
):
raise DagsterInvariantViolationError(
f"Expected to find sdf table metadata on asset {asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by @sdf_assets?"
" Please ensure that the asset metadata contains the following keys: "
f"{DAGSTER_SDF_TABLE_ID}, {DAGSTER_SDF_CATALOG_NAME}, {DAGSTER_SDF_SCHEMA_NAME}, {DAGSTER_SDF_TABLE_NAME}, {DAGSTER_SDF_DIALECT}"
)
run_args.append(table_id)
test_name_prefix = get_test_prefix(table_dialect)
# If the command is test, then we need to add the test name to the run_args (temporary: until sdf-cli applies --targets-only for test)
if args[0] == "test":
run_args.append(f"{catalog}.{schema}.{test_name_prefix}{table_name}")

# Pass the current environment variables to the sdf CLI invocation.
env = os.environ.copy()

Expand All @@ -155,6 +219,7 @@ def cli(
*args,
*environment_args,
*target_args,
*run_args,
]

return SdfCliInvocation.run(
Expand Down Expand Up @@ -200,3 +265,28 @@ def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> P
current_output_path = Path(self.workspace_dir).joinpath(SDF_DAGSTER_OUTPUT_DIR)

return current_output_path.joinpath(path)

def _validate_sdf_version(self) -> None:
"""Validate that the sdf version is compatible with the current version of the sdf CLI."""
try:
result = subprocess.run(
["sdf", "--version"], capture_output=True, text=True, check=True
)
output = result.stdout.strip()
match = re.search(r"sdf (\d+\.\d+\.\d+)", output)
if match:
version = match.group(1)
if version < SDF_VERSION_LOWER_BOUND or version >= SDF_VERSION_UPPER_BOUND:
logging.warn(
f"The sdf version '{version}' is not within the supported range of"
f" '{SDF_VERSION_LOWER_BOUND}' to '{SDF_VERSION_UPPER_BOUND}'. Check your"
" environment to ensure that the correct version of sdf is being used."
)
else:
logging.warn(
"Failed to extract the sdf version from the output. Check your environment to"
" ensure that the correct version of sdf is being used."
)
except subprocess.CalledProcessError as e:
logging.warn(f"Failed to get the sdf version: {e}")
exit(1)
51 changes: 42 additions & 9 deletions python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Dict, Iterator, Optional
from typing import AbstractSet, Any, Dict, Iterator, Optional

from dagster import (
AssetCheckResult,
Expand All @@ -10,7 +10,15 @@
)
from dagster._annotations import public

from .asset_utils import dagster_name_fn
from .asset_utils import dagster_name_fn, exists_in_selected
from .constants import (
DAGSTER_SDF_CATALOG_NAME,
DAGSTER_SDF_DIALECT,
DAGSTER_SDF_PURPOSE,
DAGSTER_SDF_SCHEMA_NAME,
DAGSTER_SDF_TABLE_ID,
DAGSTER_SDF_TABLE_NAME,
)
from .dagster_sdf_translator import DagsterSdfTranslator
from .sdf_event_iterator import SdfDagsterEventType

Expand All @@ -33,6 +41,7 @@ def is_result_event(self) -> bool:
and bool(self.raw_event.get("ev_tb_schema"))
and bool(self.raw_event.get("ev_tb_table"))
and bool(self.raw_event.get("st_code"))
and bool(self.raw_event.get("st_done"))
and bool(self.raw_event.get("st_dur_ms"))
)

Expand Down Expand Up @@ -64,16 +73,43 @@ def to_default_asset_events(
if not is_success:
return

table_id = self.raw_event["ev_tb"]
selected_output_names: AbstractSet[str] = (
context.selected_output_names if context else set()
)

catalog = self.raw_event["ev_tb_catalog"]
schema = self.raw_event["ev_tb_schema"]
table = self.raw_event["ev_tb_table"]
purpose = self.raw_event["ev_tb_purpose"]
dialect = self.raw_event["ev_tb_dialect"]

# If assets are selected, only yield events for selected assets
if len(selected_output_names) > 0:
exists = exists_in_selected(
catalog,
schema,
table,
purpose,
dialect,
selected_output_names,
dagster_sdf_translator.settings.enable_asset_checks,
)
if not exists:
return

table_id = self.raw_event["ev_tb"]
default_metadata = {
"table_id": table_id,
DAGSTER_SDF_TABLE_ID: table_id,
DAGSTER_SDF_CATALOG_NAME: catalog,
DAGSTER_SDF_SCHEMA_NAME: schema,
DAGSTER_SDF_TABLE_NAME: table,
DAGSTER_SDF_PURPOSE: purpose,
DAGSTER_SDF_DIALECT: dialect,
"Execution Duration": self.raw_event["st_dur_ms"] / 1000,
"Materialized From Cache": True if self.raw_event["st_done"] == "cached" else False,
}
asset_key = dagster_sdf_translator.get_asset_key(catalog, schema, table)
if self.raw_event["ev_tb_purpose"] == "model":
if purpose == "model":
has_asset_def = bool(context and context.has_assets_def)
event = (
Output(
Expand All @@ -93,10 +129,7 @@ def to_default_asset_events(
)

yield event
elif (
self.raw_event["ev_tb_purpose"] == "test"
and dagster_sdf_translator.settings.enable_asset_checks
):
elif purpose == "test" and dagster_sdf_translator.settings.enable_asset_checks:
passed = self.raw_event["st_verdict"] == "passed"
asset_check_key = dagster_sdf_translator.get_check_key_for_test(catalog, schema, table)
# if the asset key is the same as the asset check key, then the test is not a table / column test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def _stream_asset_events(
workspace_dir=self.workspace_dir,
target_dir=self.target_dir,
environment=self.environment,
).stream_asset_observations(dagster_sdf_translator=self.dagster_sdf_translator)
).stream_asset_observations(
dagster_sdf_translator=self.dagster_sdf_translator, context=self.context
)

@public
def stream(
Expand Down
Loading

0 comments on commit 407ea5c

Please sign in to comment.