Skip to content

Commit

Permalink
[dagster-sigma] Add option to skip fetching column-level data for wor…
Browse files Browse the repository at this point in the history
…kbooks to speed loading (#25905)

## Summary

Adds an optional config option to loading Sigma data which allows users
to skip loading column lineage.

## How I Tested These Changes

New unit test.

## Changelog

> `[dagster-sigma]` Add `skip_fetch_column_data` option to skip loading
Sigma column lineage, which can speed up loading large instances.
  • Loading branch information
benpankow authored Nov 13, 2024
1 parent 3764fea commit fe90aee
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 10 deletions.
23 changes: 19 additions & 4 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ async def safe_fetch_lineage_for_element(

@cached_method
async def build_organization_data(
self, sigma_filter: Optional[SigmaFilter]
self, sigma_filter: Optional[SigmaFilter], fetch_column_data: bool
) -> SigmaOrganizationData:
"""Retrieves all workbooks and datasets in the Sigma organization and builds a
SigmaOrganizationData object representing the organization's assets.
Expand Down Expand Up @@ -415,7 +415,10 @@ async def build_organization_data(

datasets: List[SigmaDataset] = []
deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode()
columns_by_dataset_inode = await self._fetch_dataset_columns_by_inode()

columns_by_dataset_inode = (
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
)

for dataset in await self._fetch_datasets():
inode = _inode_from_url(dataset["url"])
Expand All @@ -438,6 +441,7 @@ def build_defs(
self,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
) -> Definitions:
"""Returns a Definitions object representing the Sigma content in the organization.
Expand All @@ -449,7 +453,9 @@ def build_defs(
Definitions: The set of assets representing the Sigma content in the organization.
"""
return Definitions(
assets=load_sigma_asset_specs(self, dagster_sigma_translator, sigma_filter)
assets=load_sigma_asset_specs(
self, dagster_sigma_translator, sigma_filter, fetch_column_data
)
)


Expand All @@ -460,11 +466,16 @@ def load_sigma_asset_specs(
[SigmaOrganizationData], DagsterSigmaTranslator
] = DagsterSigmaTranslator,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Sigma content in the organization.
Args:
organization (SigmaOrganization): The Sigma organization to fetch assets from.
dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
Returns:
List[AssetSpec]: The set of assets representing the Sigma content in the organization.
Expand All @@ -475,6 +486,7 @@ def load_sigma_asset_specs(
organization=initialized_organization,
translator_cls=dagster_sigma_translator,
sigma_filter=sigma_filter,
fetch_column_data=fetch_column_data,
)
.build_defs()
.assets,
Expand All @@ -500,14 +512,17 @@ class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganization
organization: SigmaOrganization
translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True

@property
def defs_key(self) -> str:
return f"sigma_{self.organization.client_id}"

def fetch_state(self) -> SigmaOrganizationData:
return asyncio.run(
self.organization.build_organization_data(sigma_filter=self.sigma_filter)
self.organization.build_organization_data(
sigma_filter=self.sigma_filter, fetch_column_data=self.fetch_column_data
)
)

def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None)
client_secret=fake_client_secret,
)

data = asyncio.run(resource.build_organization_data(sigma_filter=None))
data = asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True))

assert len(data.workbooks) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"
Expand Down Expand Up @@ -107,15 +107,17 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")])
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]),
fetch_column_data=True,
)
)

assert len(data.workbooks) == 0

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")])
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]),
fetch_column_data=True,
)
)

Expand All @@ -124,14 +126,40 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)])
sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]),
fetch_column_data=True,
)
)

assert len(data.workbooks) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"


@responses.activate
def test_model_organization_data_skip_fetch(sigma_auth_token: str, sigma_sample_data: None) -> None:
fake_client_id = uuid.uuid4().hex
fake_client_secret = uuid.uuid4().hex

resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
client_id=fake_client_id,
client_secret=fake_client_secret,
)

data = asyncio.run(
resource.build_organization_data(
sigma_filter=None,
fetch_column_data=False,
)
)

assert len(data.datasets) == 1
assert data.workbooks[0].datasets == {_inode_from_url(data.datasets[0].properties["url"])}

assert data.datasets[0].properties["name"] == "Orders Dataset"
assert data.datasets[0].columns == set()


@responses.activate
def test_model_organization_data_warn_err(
sigma_auth_token: str, sigma_sample_data: None, lineage_warn: None
Expand All @@ -147,7 +175,7 @@ def test_model_organization_data_warn_err(
)

with pytest.raises(ClientResponseError):
asyncio.run(resource.build_organization_data(sigma_filter=None))
asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True))


@responses.activate
Expand All @@ -164,7 +192,9 @@ def test_model_organization_data_warn_no_err(
warn_on_lineage_fetch_error=True,
)

data = asyncio.run(resource_warn.build_organization_data(sigma_filter=None))
data = asyncio.run(
resource_warn.build_organization_data(sigma_filter=None, fetch_column_data=True)
)

assert len(data.workbooks) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"
Expand Down

0 comments on commit fe90aee

Please sign in to comment.