Skip to content

Commit

Permalink
[dagster-sigma] avoid fetching lineage info for filtered workbooks (d…
Browse files Browse the repository at this point in the history
…agster-io#26146)

## Summary

Ensure we don't fetch lineage, schema information for workbooks that a
user has indicated they want to filter out - previously even though we
wouldn't return these objects we'd still perform some fetched for them.

## Test Plan

Mock & check call count to make sure we only fetch workbook data when
appropriate.

## Changelog

> [dagster-sigma] Workbooks filtered using a `SigmaFilter` no longer
fetch lineage information.
  • Loading branch information
benpankow authored Nov 26, 2024
1 parent 85f9cd0 commit 395a8f3
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 53 deletions.
47 changes: 28 additions & 19 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ def try_except_http_warn(self, should_catch: bool, msg: str) -> Iterator[None]:
raise

@cached_method
async def _fetch_dataset_upstreams_by_inode(self) -> Mapping[str, AbstractSet[str]]:
async def _fetch_dataset_upstreams_by_inode(
self, sigma_filter: SigmaFilter
) -> Mapping[str, AbstractSet[str]]:
"""Builds a mapping of dataset inodes to the upstream inputs they depend on.
Sigma does not expose this information directly, so we have to infer it from
the lineage of workbooks and the workbook queries.
Expand All @@ -265,7 +267,7 @@ async def _fetch_dataset_upstreams_by_inode(self) -> Mapping[str, AbstractSet[st

logger.debug("Fetching dataset dependencies")

raw_workbooks = await self._fetch_workbooks()
workbooks_to_fetch = await self._fetch_workbooks_and_filter(sigma_filter)

async def process_workbook(workbook: Dict[str, Any]) -> None:
logger.info("Inferring dataset dependencies for workbook %s", workbook["workbookId"])
Expand Down Expand Up @@ -325,19 +327,21 @@ async def build_deps_from_element(element: Dict[str, Any]) -> None:
]
)

await asyncio.gather(*[process_workbook(workbook) for workbook in raw_workbooks])
await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks_to_fetch])

return deps_by_dataset_inode

@cached_method
async def _fetch_dataset_columns_by_inode(self) -> Mapping[str, AbstractSet[str]]:
async def _fetch_dataset_columns_by_inode(
self, sigma_filter: SigmaFilter
) -> Mapping[str, AbstractSet[str]]:
"""Builds a mapping of dataset inodes to the columns they contain. Note that
this is a partial list and will only include columns which are referenced in
workbooks, since Sigma does not expose a direct API for querying dataset columns.
"""
columns_by_dataset_inode = defaultdict(set)

workbooks = await self._fetch_workbooks()
workbooks_to_fetch = await self._fetch_workbooks_and_filter(sigma_filter)

async def process_workbook(workbook: Dict[str, Any]) -> None:
logger.info("Fetching column data from workbook %s", workbook["workbookId"])
Expand Down Expand Up @@ -370,7 +374,7 @@ async def process_workbook(workbook: Dict[str, Any]) -> None:
inode, column_name = split
columns_by_dataset_inode[inode].add(column_name)

await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks])
await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks_to_fetch])

return columns_by_dataset_inode

Expand Down Expand Up @@ -435,20 +439,12 @@ async def safe_fetch_lineage_for_element(
)

@cached_method
async def build_organization_data(
self, sigma_filter: Optional[SigmaFilter], fetch_column_data: bool
) -> SigmaOrganizationData:
"""Retrieves all workbooks and datasets in the Sigma organization and builds a
SigmaOrganizationData object representing the organization's assets.
"""
_sigma_filter = sigma_filter or SigmaFilter()

logger.info("Beginning Sigma organization data fetch")
async def _fetch_workbooks_and_filter(self, sigma_filter: SigmaFilter) -> List[Dict[str, Any]]:
raw_workbooks = await self._fetch_workbooks()
workbooks_to_fetch = []
if _sigma_filter.workbook_folders:
if sigma_filter.workbook_folders:
workbook_filter_strings = [
"/".join(folder).lower() for folder in _sigma_filter.workbook_folders
"/".join(folder).lower() for folder in sigma_filter.workbook_folders
]
for workbook in raw_workbooks:
workbook_path = str(workbook["path"]).lower()
Expand All @@ -458,16 +454,29 @@ async def build_organization_data(
workbooks_to_fetch.append(workbook)
else:
workbooks_to_fetch = raw_workbooks
return workbooks_to_fetch

@cached_method
async def build_organization_data(
self, sigma_filter: Optional[SigmaFilter], fetch_column_data: bool
) -> SigmaOrganizationData:
"""Retrieves all workbooks and datasets in the Sigma organization and builds a
SigmaOrganizationData object representing the organization's assets.
"""
_sigma_filter = sigma_filter or SigmaFilter()

logger.info("Beginning Sigma organization data fetch")
workbooks_to_fetch = await self._fetch_workbooks_and_filter(_sigma_filter)

workbooks: List[SigmaWorkbook] = await asyncio.gather(
*[self.load_workbook_data(workbook) for workbook in workbooks_to_fetch]
)

datasets: List[SigmaDataset] = []
deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode()
deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode(_sigma_filter)

columns_by_dataset_inode = (
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
await self._fetch_dataset_columns_by_inode(_sigma_filter) if fetch_column_data else {}
)

used_datasets = set()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import uuid
from unittest import mock

import pytest
import responses
Expand Down Expand Up @@ -105,47 +106,66 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data
client_secret=fake_client_secret,
)

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]),
fetch_column_data=True,
with mock.patch.object(
SigmaOrganization,
"_fetch_pages_for_workbook",
wraps=resource._fetch_pages_for_workbook, # noqa: SLF001
) as mock_fetch_pages:
data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]),
fetch_column_data=True,
)
)
)
assert len(data.workbooks) == 0
assert len(data.datasets) == 1
data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "Test Folder")], include_unused_datasets=False
),
fetch_column_data=True,
assert len(data.workbooks) == 0
assert len(data.datasets) == 1

# We don't fetch the workbooks, so we shouldn't have made any calls
assert len(mock_fetch_pages.mock_calls) == 0

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "Test Folder")],
include_unused_datasets=False,
),
fetch_column_data=True,
)
)
)
assert len(data.workbooks) == 0
assert len(data.datasets) == 0

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "My Subfolder")], include_unused_datasets=False
),
fetch_column_data=True,
assert len(data.workbooks) == 0
assert len(data.datasets) == 0

# We still don't fetch the workbooks, so we shouldn't have made any calls
assert len(mock_fetch_pages.mock_calls) == 0

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(
workbook_folders=[("My Documents", "My Subfolder")],
include_unused_datasets=False,
),
fetch_column_data=True,
)
)
)

assert len(data.workbooks) == 1
assert len(data.datasets) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"
assert len(data.workbooks) == 1
assert len(data.datasets) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]),
fetch_column_data=True,
# We fetch the workbook thrice, once for generating the workbook object,
# once for fetching column data, and once for fetching lineage data
# (the results are cached, so we don't actually make three calls out to the API)
assert len(mock_fetch_pages.mock_calls) == 3

data = asyncio.run(
resource.build_organization_data(
sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]),
fetch_column_data=True,
)
)
)

assert len(data.workbooks) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"
assert len(data.workbooks) == 1
assert data.workbooks[0].properties["name"] == "Sample Workbook"


@responses.activate
Expand Down

0 comments on commit 395a8f3

Please sign in to comment.