From adef348db594384dc872fbbf750b0a76040c8aad Mon Sep 17 00:00:00 2001 From: Ben Pankow Date: Wed, 20 Nov 2024 10:43:25 -0800 Subject: [PATCH] [dagster-sigma] Add dagster-sigma snapshot CLI, which allows persisting the contents of a Sigma workspace to a file (#25913) ## Summary Introduces a utility CLI for `dagster-sigma` which loads a code location containing one or more `SigmaOrganizationDefsLoader`s and serializes the repository load data to a file. Adds the capability for the Sling defs loader to ingest this snapshot from a file, allowing orgs with large amounts of Sigma data to ingest at CI time or manually vs each code location load. In the future, we expect to subsume this behavior into a framework level solution which doesn't tie loading external state into code location load. For now, this is a bit of a workaround. ```shell dagster-sigma snapshot --python-module my_dagster_package --save-to snapshot.snap ``` ```python ... specs = load_sigma_asset_specs(sigma_workspace, snapshot_path=Path(__file__).parent / "snapshot.snap") ... ``` ## How I Tested These Changes Unit tests. ## Changelog > [dagster-sigma] Introduced an experimental `dagster-sigma snapshot` command, allowing Sigma workspaces to be captured to a file for faster subsequent loading. --- .../dagster-sigma/dagster_sigma/cli.py | 61 +++++++++++++++++++ .../dagster-sigma/dagster_sigma/resource.py | 19 +++++- .../pending_repo_snapshot.py | 27 ++++++++ .../dagster_sigma_tests/test_asset_specs.py | 33 ++++++++++ .../libraries/dagster-sigma/setup.py | 5 ++ 5 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 python_modules/libraries/dagster-sigma/dagster_sigma/cli.py create mode 100644 python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_snapshot.py diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py b/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py new file mode 100644 index 0000000000000..f234e51c6c991 --- /dev/null +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py @@ -0,0 +1,61 @@ +import click +from dagster import _check as check +from dagster._cli.workspace.cli_target import ( + get_repository_python_origin_from_kwargs, + python_origin_target_argument, +) +from dagster._core.definitions.definitions_load_context import ( + DefinitionsLoadContext, + DefinitionsLoadType, +) +from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData +from dagster._serdes.utils import serialize_value +from dagster._utils.env import environ +from dagster._utils.hosted_user_process import recon_repository_from_origin +from dagster._utils.warnings import experimental_warning + +SNAPSHOT_ENV_VAR_NAME = "DAGSTER_SIGMA_IS_GENERATING_SNAPSHOT" +SIGMA_RECON_DATA_PREFIX = "sigma_" + + +@click.group(name="sigma") +def app(): + """Commands for working with the dagster-sigma integration.""" + + +@app.command(name="snapshot", help="Snapshot sigma instance data") +@python_origin_target_argument +@click.option("--output-path", "-o", help="Path to save the snapshot to", required=True) +def sigma_snapshot_command(**kwargs) -> None: + experimental_warning("The `dagster-sigma snapshot` command") + with environ({SNAPSHOT_ENV_VAR_NAME: "1"}): + DefinitionsLoadContext.set( + DefinitionsLoadContext( + load_type=DefinitionsLoadType.INITIALIZATION, repository_load_data=None + ) + ) + + repository_origin = get_repository_python_origin_from_kwargs(kwargs) + + pending_data = DefinitionsLoadContext.get().get_pending_reconstruction_metadata() + load_data = ( + RepositoryLoadData(reconstruction_metadata=pending_data) if pending_data else None + ) + recon_repo = recon_repository_from_origin(repository_origin) + repo_def = recon_repo.get_definition() + + load_data = load_data if pending_data else repo_def.repository_load_data + load_data = RepositoryLoadData( + reconstruction_metadata={ + k: v + for k, v in check.not_none(load_data).reconstruction_metadata.items() + if k.startswith(SIGMA_RECON_DATA_PREFIX) + } + ) + if not load_data.reconstruction_metadata: + raise click.UsageError("No Sigma data found in the repository") + click.echo(f"Saving {len(load_data.reconstruction_metadata)} cached Sigma data") + + output_path = kwargs["output_path"] + with open(output_path, "w") as file: + file.write(serialize_value(load_data)) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 7ba1f092248ba..693e72dafa471 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -1,10 +1,12 @@ import asyncio import contextlib +import os import urllib.parse import warnings from collections import defaultdict from dataclasses import dataclass from enum import Enum +from pathlib import Path from typing import ( AbstractSet, Any, @@ -28,12 +30,15 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData from dagster._record import IHaveNew, record_custom +from dagster._serdes.serdes import deserialize_value from dagster._utils.cached_method import cached_method from dagster._utils.log import get_dagster_logger from pydantic import Field, PrivateAttr from sqlglot import exp, parse_one +from dagster_sigma.cli import SIGMA_RECON_DATA_PREFIX, SNAPSHOT_ENV_VAR_NAME from dagster_sigma.translator import ( DagsterSigmaTranslator, SigmaDataset, @@ -532,6 +537,7 @@ def load_sigma_asset_specs( ] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, fetch_column_data: bool = True, + snapshot_path: Optional[Union[str, Path]] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Sigma content in the organization. @@ -541,10 +547,16 @@ def load_sigma_asset_specs( to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator. sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch. fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow. + snapshot_path (Optional[Union[str, Path]]): Path to a snapshot file to load Sigma data from, + rather than fetching it from the Sigma API. Returns: List[AssetSpec]: The set of assets representing the Sigma content in the organization. """ + snapshot = None + if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME): + snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData) + with organization.process_config_and_initialize_cm() as initialized_organization: return check.is_list( SigmaOrganizationDefsLoader( @@ -552,6 +564,7 @@ def load_sigma_asset_specs( translator_cls=dagster_sigma_translator, sigma_filter=sigma_filter, fetch_column_data=fetch_column_data, + snapshot=snapshot, ) .build_defs() .assets, @@ -576,14 +589,18 @@ def _get_translator_spec_assert_keys_match( class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]): organization: SigmaOrganization translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator] + snapshot: Optional[RepositoryLoadData] sigma_filter: Optional[SigmaFilter] = None fetch_column_data: bool = True @property def defs_key(self) -> str: - return f"sigma_{self.organization.client_id}" + return f"{SIGMA_RECON_DATA_PREFIX}{self.organization.client_id}" def fetch_state(self) -> SigmaOrganizationData: + if self.snapshot and self.defs_key in self.snapshot.reconstruction_metadata: + return deserialize_value(self.snapshot.reconstruction_metadata[self.defs_key]) # type: ignore + return asyncio.run( self.organization.build_organization_data( sigma_filter=self.sigma_filter, fetch_column_data=self.fetch_column_data diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_snapshot.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_snapshot.py new file mode 100644 index 0000000000000..f03e02f2dfc06 --- /dev/null +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_snapshot.py @@ -0,0 +1,27 @@ +import os + +from dagster import EnvVar, asset, define_asset_job +from dagster._core.definitions.definitions_class import Definitions +from dagster._utils.env import environ +from dagster_sigma import SigmaBaseUrl, SigmaOrganization, load_sigma_asset_specs + +fake_client_id = "fake_client_id" +fake_client_secret = "fake_client_secret" +snapshot_path = os.getenv("SIGMA_SNAPSHOT_PATH") or None + +with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}): + fake_token = "fake_token" + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=EnvVar("SIGMA_CLIENT_ID"), + client_secret=EnvVar("SIGMA_CLIENT_SECRET"), + ) + + @asset + def my_materializable_asset(): + pass + + sigma_specs = load_sigma_asset_specs(resource, snapshot_path=snapshot_path) + defs = Definitions( + assets=[my_materializable_asset, *sigma_specs], jobs=[define_asset_job("all_asset_job")] + ) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py index 6a1d0e18e1d2a..47e76cea6497f 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py @@ -1,12 +1,45 @@ from pathlib import Path +from tempfile import TemporaryDirectory import responses +from click.testing import CliRunner from dagster._core.code_pointer import CodePointer from dagster._core.definitions.reconstruct import ( initialize_repository_def_from_pointer, reconstruct_repository_def_from_pointer, ) from dagster._core.instance_for_test import instance_for_test +from dagster._utils.env import environ + + +@responses.activate +def test_snapshot_cli_rehydrate(sigma_auth_token: str, sigma_sample_data: None) -> None: + with instance_for_test() as _instance, TemporaryDirectory() as temp_dir: + from dagster_sigma.cli import sigma_snapshot_command + + temp_file = Path(temp_dir) / "snapshot.snap" + out = CliRunner().invoke( + sigma_snapshot_command, + args=[ + "-f", + str(Path(__file__).parent / "pending_repo_snapshot.py"), + "--output-path", + str(temp_file), + ], + ) + assert out.exit_code == 0 + + calls = len(responses.calls) + # Ensure that we can reconstruct the repository from the snapshot without making any calls + with environ({"SIGMA_SNAPSHOT_PATH": str(temp_file)}): + repository_def = initialize_repository_def_from_pointer( + CodePointer.from_python_file( + str(Path(__file__).parent / "pending_repo_snapshot.py"), "defs", None + ), + ) + assert len(repository_def.assets_defs_by_key) == 2 + 1 + + assert len(responses.calls) == calls @responses.activate diff --git a/python_modules/libraries/dagster-sigma/setup.py b/python_modules/libraries/dagster-sigma/setup.py index 5371e8af045b0..b6d158a5f6f5c 100644 --- a/python_modules/libraries/dagster-sigma/setup.py +++ b/python_modules/libraries/dagster-sigma/setup.py @@ -39,5 +39,10 @@ def get_version() -> str: extras_require={"test": ["aioresponses", "aiohttp<3.11"]}, include_package_data=True, python_requires=">=3.9,<3.13", + entry_points={ + "console_scripts": [ + "dagster-sigma = dagster_sigma.cli:app", + ] + }, zip_safe=False, )