Skip to content

Commit

Permalink
[dagster-sigma] Add filter option to hide unused datasets (#25988)
Browse files Browse the repository at this point in the history
## Summary

Adds an optional config field to `SigmaFilter` to hide datasets which
are unused by any workbook.

## How I Tested These Changes

Update unit test.

## Changelog

> [dagster-sigma] Added include_unused_datasets field to `SigmaFilter`
to disable pulling datasets that aren't used by a downstream workbook.
  • Loading branch information
benpankow authored Nov 18, 2024
1 parent 8ffbc7f commit f43a95f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
32 changes: 24 additions & 8 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,22 @@ class SigmaFilter(IHaveNew):
workbook_folders (Optional[Sequence[Sequence[str]]]): A list of folder paths to fetch workbooks from.
Each folder path is a list of folder names, starting from the root folder. All workbooks
contained in the specified folders will be fetched. If not provided, all workbooks will be fetched.
include_unused_datasets (bool): Whether to include datasets that are not used in any workbooks.
Defaults to True.
"""

workbook_folders: Optional[Sequence[Sequence[str]]] = None
include_unused_datasets: bool = True

def __new__(cls, workbook_folders: Optional[Sequence[Sequence[str]]] = None):
def __new__(
cls,
workbook_folders: Optional[Sequence[Sequence[str]]] = None,
include_unused_datasets: bool = True,
):
return super().__new__(
cls, workbook_folders=tuple([tuple(folder) for folder in workbook_folders or []])
cls,
workbook_folders=tuple([tuple(folder) for folder in workbook_folders or []]),
include_unused_datasets=include_unused_datasets,
)


Expand Down Expand Up @@ -444,16 +453,23 @@ async def build_organization_data(
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
)

used_datasets = None
if _sigma_filter and not _sigma_filter.include_unused_datasets:
used_datasets = set()
for workbook in workbooks:
used_datasets.update(workbook.datasets)

logger.info("Fetching dataset data")
for dataset in await self._fetch_datasets():
inode = _inode_from_url(dataset["url"])
datasets.append(
SigmaDataset(
properties=dataset,
columns=columns_by_dataset_inode.get(inode, set()),
inputs=deps_by_dataset_inode[inode],
if used_datasets is None or inode in used_datasets:
datasets.append(
SigmaDataset(
properties=dataset,
columns=columns_by_dataset_inode.get(inode, set()),
inputs=deps_by_dataset_inode[inode],
)
)
)

return SigmaOrganizationData(workbooks=workbooks, datasets=datasets)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,30 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data
fetch_column_data=True,
)
)

assert len(data.workbooks) == 0
assert len(data.datasets) == 1
data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "Test Folder")], include_unused_datasets=False
),
fetch_column_data=True,
)
)
assert len(data.workbooks) == 0
assert len(data.datasets) == 0

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]),
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "My Subfolder")], include_unused_datasets=False
),
fetch_column_data=True,
)
)

assert len(data.workbooks) == 1
assert len(data.datasets) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"

data = asyncio.run(
Expand Down

0 comments on commit f43a95f

Please sign in to comment.