diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py index 84528cbeb9163..9e06834418935 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py @@ -125,6 +125,7 @@ class PackageSpec: queue: Optional[BuildkiteQueue] = None run_pytest: bool = True always_run_if: Optional[Callable[[], bool]] = None + skip_if: Optional[Callable[[], str]] = None def __post_init__(self): if not self.name: @@ -243,13 +244,17 @@ def requirements(self): @property def skip_reason(self) -> Optional[str]: - # Memoize so we don't log twice - if self._should_skip is False: + if self._should_skip is None: return None + # Memoize so we don't log twice if self.always_run_if and self.always_run_if(): self._should_skip = False return None + if self.skip_if and self.skip_if(): + self._skip_reason = self.skip_if() + self._should_skip = True + return self._skip_reason if self._skip_reason: return self._skip_reason diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py index 23dc34b5f87ca..1c59f30872c3c 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py @@ -42,6 +42,16 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]: ], pytest_extra_cmds=k8s_extra_cmds, ), + PackageSpec( + "examples/experimental/dagster-dlift/kitchen-sink", + name="dbt-cloud-live-tests", + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + ), ] ) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py index a93bd7f555483..19418533a3f25 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py @@ -13,6 +13,7 @@ has_dagster_airlift_changes, has_storage_test_fixture_changes, network_buildkite_container, + skip_if_not_dlift_commit, ) @@ -387,6 +388,19 @@ def k8s_extra_cmds(version: str, _) -> List[str]: "examples/experimental/dagster-dlift", name=":dbt: dlift", ), + # Runs against live dbt cloud instance, we only want to run on commits and on the + # nightly build + PackageSpec( + "examples/experimental/dagster-dlift/kitchen-sink", + skip_if=skip_if_not_dlift_commit, + name=":dbt: :sink: Dbt Cloud-Lift Kitchen Sink", + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + ), ] diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py index 9b1bd174d89b6..485cde291d063 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py @@ -316,6 +316,14 @@ def has_storage_test_fixture_changes(): ) +def skip_if_not_dlift_commit(): + return ( + None + if any("dagster-dlift" in str(path) for path in ChangedFiles.all) + else "Not a dlift commit" + ) + + def skip_if_no_helm_changes(): if message_contains("NO_SKIP"): return None diff --git a/examples/experimental/dagster-dlift/dagster_dlift/cloud_instance.py b/examples/experimental/dagster-dlift/dagster_dlift/cloud_instance.py new file mode 100644 index 0000000000000..2c938185a22b5 --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift/cloud_instance.py @@ -0,0 +1,80 @@ +from typing import Any, Mapping, Sequence + +import requests + +from dagster_dlift.gql_queries import VERIFICATION_QUERY + +ENVIRONMENTS_SUBPATH = "environments/" + + +class DbtCloudInstance: + def __init__( + self, + # Can be found on the Account Info page of dbt. + account_id: str, + # Can be either a personal token or a service token. + token: str, + # Can be found on the + access_url: str, + discovery_api_url: str, + ): + self.account_id = account_id + self.token = token + self.access_url = access_url + self.discovery_api_url = discovery_api_url + + def get_api_v2_url(self) -> str: + return f"{self.access_url}/api/v2/accounts/{self.account_id}" + + def get_discovery_api_url(self) -> str: + return f"{self.discovery_api_url}/graphql" + + def get_session(self) -> requests.Session: + session = requests.Session() + session.headers.update( + { + "Accept": "application/json", + "Authorization": f"Token {self.token}", + } + ) + return session + + def make_access_api_request(self, subpath: str) -> Mapping[str, Any]: + session = self.get_session() + return self.ensure_valid_response(session.get(f"{self.get_api_v2_url()}/{subpath}")).json() + + def ensure_valid_response(self, response: requests.Response) -> requests.Response: + if response.status_code != 200: + raise Exception(f"Request to DBT Cloud failed: {response.text}") + return response + + def make_discovery_api_query(self, query: str, variables: Mapping[str, Any]): + session = self.get_session() + return self.ensure_valid_response( + session.post( + self.get_discovery_api_url(), + json={"query": query, "variables": variables}, + ) + ).json() + + def list_environment_ids(self) -> Sequence[int]: + return [ + environment["id"] + for environment in self.make_access_api_request(ENVIRONMENTS_SUBPATH)["data"] + ] + + def verify_connections(self) -> None: + # Verifies connection to both the access and discovery APIs. + for environment_id in self.list_environment_ids(): + response = self.make_discovery_api_query( + VERIFICATION_QUERY, {"environmentId": environment_id} + ) + try: + if response["data"]["environment"]["__typename"] != "Environment": + raise Exception( + f"Failed to verify connection to environment {environment_id}. Response: {response}" + ) + except KeyError: + raise Exception( + f"Failed to verify connection to environment {environment_id}. Response: {response}" + ) diff --git a/examples/experimental/dagster-dlift/dagster_dlift/gql_queries.py b/examples/experimental/dagster-dlift/dagster_dlift/gql_queries.py new file mode 100644 index 0000000000000..4f78db4c06dde --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift/gql_queries.py @@ -0,0 +1,7 @@ +VERIFICATION_QUERY = """ +query VerificationQuery($environmentId: BigInt!) { + environment(id: $environmentId) { + __typename + } +} +""" diff --git a/examples/experimental/dagster-dlift/dagster_dlift/test/instance_fake.py b/examples/experimental/dagster-dlift/dagster_dlift/test/instance_fake.py new file mode 100644 index 0000000000000..eb2799448a613 --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift/test/instance_fake.py @@ -0,0 +1,46 @@ +from typing import Any, Mapping, NamedTuple + +from dagster_dlift.cloud_instance import DbtCloudInstance + + +class ExpectedDiscoveryApiRequest(NamedTuple): + query: str + variables: Mapping[str, Any] + + def __hash__(self) -> int: + return hash((self.query, frozenset(self.variables.items()))) + + +class ExpectedAccessApiRequest(NamedTuple): + subpath: str + + def __hash__(self) -> int: + return hash(self.subpath) + + +class DbtCloudInstanceFake(DbtCloudInstance): + """A version that allows users to fake API responses for testing purposes.""" + + def __init__( + self, + access_api_responses: Mapping[ExpectedAccessApiRequest, Any], + discovery_api_responses: Mapping[ExpectedDiscoveryApiRequest, Any], + ): + self.access_api_responses = access_api_responses + self.discovery_api_responses = discovery_api_responses + + def make_access_api_request(self, subpath: str) -> Mapping[str, Any]: + if ExpectedAccessApiRequest(subpath) not in self.access_api_responses: + raise Exception( + f"ExpectedAccessApiRequest({subpath}) not found in access_api_responses" + ) + return self.access_api_responses[ExpectedAccessApiRequest(subpath)] + + def make_discovery_api_query( + self, query: str, variables: Mapping[str, Any] + ) -> Mapping[str, Any]: + if ExpectedDiscoveryApiRequest(query, variables) not in self.discovery_api_responses: + raise Exception( + f"ExpectedDiscoveryApiRequest({query}, {variables}) not found in discovery_api_responses" + ) + return self.discovery_api_responses[ExpectedDiscoveryApiRequest(query, variables)] diff --git a/examples/experimental/dagster-dlift/dagster_dlift/test/utils.py b/examples/experimental/dagster-dlift/dagster_dlift/test/utils.py new file mode 100644 index 0000000000000..14e76a774846e --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift/test/utils.py @@ -0,0 +1,8 @@ +import os + + +def get_env_var(var_name: str) -> str: + value = os.getenv(var_name) + if not value: + raise Exception(f"{var_name} is not set") + return value diff --git a/examples/experimental/dagster-dlift/dagster_dlift_tests/test_instance.py b/examples/experimental/dagster-dlift/dagster_dlift_tests/test_instance.py new file mode 100644 index 0000000000000..66b265aabd6ec --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift_tests/test_instance.py @@ -0,0 +1,54 @@ +import pytest +from dagster_dlift.cloud_instance import ENVIRONMENTS_SUBPATH +from dagster_dlift.gql_queries import VERIFICATION_QUERY +from dagster_dlift.test.instance_fake import ( + DbtCloudInstanceFake, + ExpectedAccessApiRequest, + ExpectedDiscoveryApiRequest, +) + + +def test_verification() -> None: + """Test proper error states when we can't properly verify the instance.""" + # We get no response back from the discovery api + fake_instance = DbtCloudInstanceFake( + access_api_responses={ + ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]} + }, + discovery_api_responses={ + ExpectedDiscoveryApiRequest( + query=VERIFICATION_QUERY, variables={"environmentId": 1} + ): {} + }, + ) + + with pytest.raises(Exception, match="Failed to verify"): + fake_instance.verify_connections() + + # We get a response back from the discovery api, but it's not what we expect + fake_instance = DbtCloudInstanceFake( + access_api_responses={ + ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]} + }, + discovery_api_responses={ + ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): { + "data": {"environment": {"__typename": "NotEnvironment"}} + } + }, + ) + + with pytest.raises(Exception, match="Failed to verify"): + fake_instance.verify_connections() + + # Finally, we get a valid response back from the discovery api + fake_instance = DbtCloudInstanceFake( + access_api_responses={ + ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]} + }, + discovery_api_responses={ + ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): { + "data": {"environment": {"__typename": "Environment"}} + } + }, + ) + fake_instance.verify_connections() diff --git a/examples/experimental/dagster-dlift/dagster_dlift_tests/test_test_utils.py b/examples/experimental/dagster-dlift/dagster_dlift_tests/test_test_utils.py new file mode 100644 index 0000000000000..bc9c29034a3f0 --- /dev/null +++ b/examples/experimental/dagster-dlift/dagster_dlift_tests/test_test_utils.py @@ -0,0 +1,46 @@ +import pytest +from dagster._core.test_utils import environ +from dagster_dlift.cloud_instance import ENVIRONMENTS_SUBPATH +from dagster_dlift.gql_queries import VERIFICATION_QUERY +from dagster_dlift.test.instance_fake import ( + DbtCloudInstanceFake, + ExpectedAccessApiRequest, + ExpectedDiscoveryApiRequest, +) +from dagster_dlift.test.utils import get_env_var + + +def test_get_env_var() -> None: + """Test we can get an env var, and good error state for lack of env var.""" + with environ({"TEST_ENV_VAR": "test_value"}): + assert get_env_var("TEST_ENV_VAR") == "test_value" + + with pytest.raises(Exception, match="TEST_ENV_VAR"): + get_env_var("TEST_ENV_VAR") + + +def test_cloud_instance_fake() -> None: + """Test that cloud instance fake behaves properly when inducing queries.""" + fake_instance = DbtCloudInstanceFake( + access_api_responses={ + ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): { + "data": {"environments": [{"id": 1}]} + } + }, + discovery_api_responses={ + ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): { + "data": {"environment": {"__typename": "Environment"}} + } + }, + ) + + assert fake_instance.make_access_api_request(ENVIRONMENTS_SUBPATH) == { + "data": {"environments": [{"id": 1}]} + } + assert fake_instance.make_discovery_api_query(VERIFICATION_QUERY, {"environmentId": 1}) == { + "data": {"environment": {"__typename": "Environment"}} + } + with pytest.raises(Exception, match="ExpectedAccessApiRequest"): + fake_instance.make_access_api_request("bad_subpath") + with pytest.raises(Exception, match="ExpectedDiscoveryApiRequest"): + fake_instance.make_discovery_api_query(VERIFICATION_QUERY, {"accountId": "bad"}) diff --git a/examples/experimental/dagster-dlift/kitchen-sink/.gitignore b/examples/experimental/dagster-dlift/kitchen-sink/.gitignore new file mode 100644 index 0000000000000..082a9f8cc048a --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/.gitignore @@ -0,0 +1,2 @@ +.airflow_home +.dagster_home \ No newline at end of file diff --git a/examples/experimental/dagster-dlift/kitchen-sink/Makefile b/examples/experimental/dagster-dlift/kitchen-sink/Makefile new file mode 100644 index 0000000000000..8906a45beab75 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/Makefile @@ -0,0 +1,27 @@ +.PHONY: help + +define GET_MAKEFILE_DIR +$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::') +endef + +export MAKEFILE_DIR := $(GET_MAKEFILE_DIR) +export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home +export DAGSTER_URL := http://localhost:3333 + +help: + @egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + +dev_install: + pip install uv && \ + uv pip install -e ../../../dagster-airlift + uv pip install -e . + +setup_local_env: + $(MAKE) wipe + mkdir -p $(DAGSTER_HOME) + +run_dagster: + dagster dev -m kitchen_sink_dlift.dagster_defs.defs -p 3333 + +wipe: ## Wipe out all the files created by the Makefile + rm -rf $(DAGSTER_HOME) \ No newline at end of file diff --git a/examples/experimental/dagster-dlift/kitchen-sink/README.md b/examples/experimental/dagster-dlift/kitchen-sink/README.md new file mode 100644 index 0000000000000..b487cb110fe50 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/README.md @@ -0,0 +1,22 @@ +## Kitchen Sink + +This is designed to be a testbed for testing specific migration scenarios. + +First: + +```bash +make dev_install +make setup_local_env +``` + +Then in one shell: + +``` +make run_airflow +``` + +Then in another shell: + +``` +make run_dagster +``` diff --git a/examples/experimental/dagster-dlift/kitchen-sink/conftest.py b/examples/experimental/dagster-dlift/kitchen-sink/conftest.py new file mode 100644 index 0000000000000..948fa73a046e3 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["dagster_dlift.test.shared_fixtures"] diff --git a/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink/__init__.py b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink/instance.py b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink/instance.py new file mode 100644 index 0000000000000..d0307ccbcaf52 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink/instance.py @@ -0,0 +1,11 @@ +from dagster_dlift.cloud_instance import DbtCloudInstance +from dagster_dlift.test.utils import get_env_var + + +def get_instance() -> DbtCloudInstance: + return DbtCloudInstance( + account_id=get_env_var("KS_DBT_CLOUD_ACCOUNT_ID"), + token=get_env_var("KS_DBT_CLOUD_TOKEN"), + access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"), + discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"), + ) diff --git a/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink_tests/__init__.py b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink_tests/test_instance_from_env.py b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink_tests/test_instance_from_env.py new file mode 100644 index 0000000000000..6505e5609f6a0 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/dlift_kitchen_sink_tests/test_instance_from_env.py @@ -0,0 +1,10 @@ +from dagster_dlift.cloud_instance import DbtCloudInstance +from dlift_kitchen_sink.instance import get_instance + + +def test_cloud_instance() -> None: + """Test that we can create a DbtCloudInstance and verify connections successfully.""" + instance = get_instance() + assert isinstance(instance, DbtCloudInstance) + + instance.verify_connections() diff --git a/examples/experimental/dagster-dlift/kitchen-sink/pyproject.toml b/examples/experimental/dagster-dlift/kitchen-sink/pyproject.toml new file mode 100644 index 0000000000000..7fd26b970b848 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/examples/experimental/dagster-dlift/kitchen-sink/setup.py b/examples/experimental/dagster-dlift/kitchen-sink/setup.py new file mode 100644 index 0000000000000..c1b672067429b --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/setup.py @@ -0,0 +1,28 @@ +from pathlib import Path +from typing import Dict + +from setuptools import find_packages, setup + + +def get_version() -> str: + version: Dict[str, str] = {} + with open(Path(__file__).parent / ".." / "dagster_dlift/version.py", encoding="utf8") as fp: + exec(fp.read(), version) + + return version["__version__"] + + +ver = get_version() +# dont pin dev installs to avoid pip dep resolver issues +pin = "" if ver == "1!0+dev" else f"=={ver}" + +setup( + name="dlift-kitchen-sink", + packages=find_packages(), + install_requires=[ + f"dagster{pin}", + f"dagster-webserver{pin}", + "dagster-dlift", + ], + extras_require={"test": ["pytest"]}, +) diff --git a/examples/experimental/dagster-dlift/kitchen-sink/tox.ini b/examples/experimental/dagster-dlift/kitchen-sink/tox.ini new file mode 100644 index 0000000000000..5db1bd095aba2 --- /dev/null +++ b/examples/experimental/dagster-dlift/kitchen-sink/tox.ini @@ -0,0 +1,29 @@ +[tox] +skipsdist = true + +[testenv] +download = True +passenv = + CI_* + COVERALLS_REPO_TOKEN + BUILDKITE* + KS_DBT_CLOUD* +install_command = uv pip install {opts} {packages} +deps = + -e ../../../../python_modules/dagster[test] + -e ../../../../python_modules/dagster-webserver + -e ../../../../python_modules/dagster-test + -e ../../../../python_modules/dagster-pipes + -e ../../../../python_modules/dagster-graphql + -e ../../../../python_modules/libraries/dagster-dbt + -e ../../../../python_modules/libraries/dagster-pandas + -e ../../dagster-dlift + -e . +allowlist_externals = + /bin/bash + uv +commands = + # We need to rebuild the UI to ensure that the dagster-webserver can run + make -C ../../../.. rebuild_ui + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' + pytest -c ../../../pyproject.toml ./dlift_kitchen_sink_tests --snapshot-warn-unused -vv {posargs} \ No newline at end of file