Skip to content

Commit

Permalink
[dagster-sigma] Add dagster-sigma snapshot CLI, which allows persisti…
Browse files Browse the repository at this point in the history
…ng the contents of a Sigma workspace to a file (#25913)

## Summary

Introduces a utility CLI for `dagster-sigma` which loads a code location
containing one or more `SigmaOrganizationDefsLoader`s and serializes the
repository load data to a file. Adds the capability for the Sling defs
loader to ingest this snapshot from a file, allowing orgs with large
amounts of Sigma data to ingest at CI time or manually vs each code
location load.

In the future, we expect to subsume this behavior into a framework level
solution which doesn't tie loading external state into code location
load. For now, this is a bit of a workaround.

```shell
dagster-sigma snapshot --python-module my_dagster_package --save-to snapshot.snap
```

```python
...
specs = load_sigma_asset_specs(sigma_workspace, snapshot_path=Path(__file__).parent / "snapshot.snap")
...
```

## How I Tested These Changes

Unit tests.

## Changelog

> [dagster-sigma] Introduced an experimental `dagster-sigma snapshot`
command, allowing Sigma workspaces to be captured to a file for faster
subsequent loading.
  • Loading branch information
benpankow authored Nov 20, 2024
1 parent 026d5cd commit adef348
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 1 deletion.
61 changes: 61 additions & 0 deletions python_modules/libraries/dagster-sigma/dagster_sigma/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import click
from dagster import _check as check
from dagster._cli.workspace.cli_target import (
get_repository_python_origin_from_kwargs,
python_origin_target_argument,
)
from dagster._core.definitions.definitions_load_context import (
DefinitionsLoadContext,
DefinitionsLoadType,
)
from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData
from dagster._serdes.utils import serialize_value
from dagster._utils.env import environ
from dagster._utils.hosted_user_process import recon_repository_from_origin
from dagster._utils.warnings import experimental_warning

SNAPSHOT_ENV_VAR_NAME = "DAGSTER_SIGMA_IS_GENERATING_SNAPSHOT"
SIGMA_RECON_DATA_PREFIX = "sigma_"


@click.group(name="sigma")
def app():
"""Commands for working with the dagster-sigma integration."""


@app.command(name="snapshot", help="Snapshot sigma instance data")
@python_origin_target_argument
@click.option("--output-path", "-o", help="Path to save the snapshot to", required=True)
def sigma_snapshot_command(**kwargs) -> None:
experimental_warning("The `dagster-sigma snapshot` command")
with environ({SNAPSHOT_ENV_VAR_NAME: "1"}):
DefinitionsLoadContext.set(
DefinitionsLoadContext(
load_type=DefinitionsLoadType.INITIALIZATION, repository_load_data=None
)
)

repository_origin = get_repository_python_origin_from_kwargs(kwargs)

pending_data = DefinitionsLoadContext.get().get_pending_reconstruction_metadata()
load_data = (
RepositoryLoadData(reconstruction_metadata=pending_data) if pending_data else None
)
recon_repo = recon_repository_from_origin(repository_origin)
repo_def = recon_repo.get_definition()

load_data = load_data if pending_data else repo_def.repository_load_data
load_data = RepositoryLoadData(
reconstruction_metadata={
k: v
for k, v in check.not_none(load_data).reconstruction_metadata.items()
if k.startswith(SIGMA_RECON_DATA_PREFIX)
}
)
if not load_data.reconstruction_metadata:
raise click.UsageError("No Sigma data found in the repository")
click.echo(f"Saving {len(load_data.reconstruction_metadata)} cached Sigma data")

output_path = kwargs["output_path"]
with open(output_path, "w") as file:
file.write(serialize_value(load_data))
19 changes: 18 additions & 1 deletion python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import contextlib
import os
import urllib.parse
import warnings
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
AbstractSet,
Any,
Expand All @@ -28,12 +30,15 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import deserialize_value
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from pydantic import Field, PrivateAttr
from sqlglot import exp, parse_one

from dagster_sigma.cli import SIGMA_RECON_DATA_PREFIX, SNAPSHOT_ENV_VAR_NAME
from dagster_sigma.translator import (
DagsterSigmaTranslator,
SigmaDataset,
Expand Down Expand Up @@ -532,6 +537,7 @@ def load_sigma_asset_specs(
] = DagsterSigmaTranslator,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
snapshot_path: Optional[Union[str, Path]] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Sigma content in the organization.
Expand All @@ -541,17 +547,24 @@ def load_sigma_asset_specs(
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
snapshot_path (Optional[Union[str, Path]]): Path to a snapshot file to load Sigma data from,
rather than fetching it from the Sigma API.
Returns:
List[AssetSpec]: The set of assets representing the Sigma content in the organization.
"""
snapshot = None
if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME):
snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData)

with organization.process_config_and_initialize_cm() as initialized_organization:
return check.is_list(
SigmaOrganizationDefsLoader(
organization=initialized_organization,
translator_cls=dagster_sigma_translator,
sigma_filter=sigma_filter,
fetch_column_data=fetch_column_data,
snapshot=snapshot,
)
.build_defs()
.assets,
Expand All @@ -576,14 +589,18 @@ def _get_translator_spec_assert_keys_match(
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
organization: SigmaOrganization
translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator]
snapshot: Optional[RepositoryLoadData]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True

@property
def defs_key(self) -> str:
return f"sigma_{self.organization.client_id}"
return f"{SIGMA_RECON_DATA_PREFIX}{self.organization.client_id}"

def fetch_state(self) -> SigmaOrganizationData:
if self.snapshot and self.defs_key in self.snapshot.reconstruction_metadata:
return deserialize_value(self.snapshot.reconstruction_metadata[self.defs_key]) # type: ignore

return asyncio.run(
self.organization.build_organization_data(
sigma_filter=self.sigma_filter, fetch_column_data=self.fetch_column_data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os

from dagster import EnvVar, asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.env import environ
from dagster_sigma import SigmaBaseUrl, SigmaOrganization, load_sigma_asset_specs

fake_client_id = "fake_client_id"
fake_client_secret = "fake_client_secret"
snapshot_path = os.getenv("SIGMA_SNAPSHOT_PATH") or None

with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}):
fake_token = "fake_token"
resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
client_id=EnvVar("SIGMA_CLIENT_ID"),
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

@asset
def my_materializable_asset():
pass

sigma_specs = load_sigma_asset_specs(resource, snapshot_path=snapshot_path)
defs = Definitions(
assets=[my_materializable_asset, *sigma_specs], jobs=[define_asset_job("all_asset_job")]
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,45 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import responses
from click.testing import CliRunner
from dagster._core.code_pointer import CodePointer
from dagster._core.definitions.reconstruct import (
initialize_repository_def_from_pointer,
reconstruct_repository_def_from_pointer,
)
from dagster._core.instance_for_test import instance_for_test
from dagster._utils.env import environ


@responses.activate
def test_snapshot_cli_rehydrate(sigma_auth_token: str, sigma_sample_data: None) -> None:
with instance_for_test() as _instance, TemporaryDirectory() as temp_dir:
from dagster_sigma.cli import sigma_snapshot_command

temp_file = Path(temp_dir) / "snapshot.snap"
out = CliRunner().invoke(
sigma_snapshot_command,
args=[
"-f",
str(Path(__file__).parent / "pending_repo_snapshot.py"),
"--output-path",
str(temp_file),
],
)
assert out.exit_code == 0

calls = len(responses.calls)
# Ensure that we can reconstruct the repository from the snapshot without making any calls
with environ({"SIGMA_SNAPSHOT_PATH": str(temp_file)}):
repository_def = initialize_repository_def_from_pointer(
CodePointer.from_python_file(
str(Path(__file__).parent / "pending_repo_snapshot.py"), "defs", None
),
)
assert len(repository_def.assets_defs_by_key) == 2 + 1

assert len(responses.calls) == calls


@responses.activate
Expand Down
5 changes: 5 additions & 0 deletions python_modules/libraries/dagster-sigma/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ def get_version() -> str:
extras_require={"test": ["aioresponses", "aiohttp<3.11"]},
include_package_data=True,
python_requires=">=3.9,<3.13",
entry_points={
"console_scripts": [
"dagster-sigma = dagster_sigma.cli:app",
]
},
zip_safe=False,
)

0 comments on commit adef348

Please sign in to comment.