From fe90aeec51c4a2c98bd753bf7e787993d284e3a7 Mon Sep 17 00:00:00 2001 From: Ben Pankow Date: Wed, 13 Nov 2024 14:24:12 -0800 Subject: [PATCH] [dagster-sigma] Add option to skip fetching column-level data for workbooks to speed loading (#25905) ## Summary Adds an optional config option to loading Sigma data which allows users to skip loading column lineage. ## How I Tested These Changes New unit test. ## Changelog > `[dagster-sigma]` Add `skip_fetch_column_data` option to skip loading Sigma column lineage, which can speed up loading large instances. --- .../dagster-sigma/dagster_sigma/resource.py | 23 ++++++++-- .../dagster_sigma_tests/test_resource.py | 42 ++++++++++++++++--- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index f9bbb7af2f8e7..7287b937c3062 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -387,7 +387,7 @@ async def safe_fetch_lineage_for_element( @cached_method async def build_organization_data( - self, sigma_filter: Optional[SigmaFilter] + self, sigma_filter: Optional[SigmaFilter], fetch_column_data: bool ) -> SigmaOrganizationData: """Retrieves all workbooks and datasets in the Sigma organization and builds a SigmaOrganizationData object representing the organization's assets. @@ -415,7 +415,10 @@ async def build_organization_data( datasets: List[SigmaDataset] = [] deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode() - columns_by_dataset_inode = await self._fetch_dataset_columns_by_inode() + + columns_by_dataset_inode = ( + await self._fetch_dataset_columns_by_inode() if fetch_column_data else {} + ) for dataset in await self._fetch_datasets(): inode = _inode_from_url(dataset["url"]) @@ -438,6 +441,7 @@ def build_defs( self, dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, + fetch_column_data: bool = True, ) -> Definitions: """Returns a Definitions object representing the Sigma content in the organization. @@ -449,7 +453,9 @@ def build_defs( Definitions: The set of assets representing the Sigma content in the organization. """ return Definitions( - assets=load_sigma_asset_specs(self, dagster_sigma_translator, sigma_filter) + assets=load_sigma_asset_specs( + self, dagster_sigma_translator, sigma_filter, fetch_column_data + ) ) @@ -460,11 +466,16 @@ def load_sigma_asset_specs( [SigmaOrganizationData], DagsterSigmaTranslator ] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, + fetch_column_data: bool = True, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Sigma content in the organization. Args: organization (SigmaOrganization): The Sigma organization to fetch assets from. + dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use + to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator. + sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch. + fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow. Returns: List[AssetSpec]: The set of assets representing the Sigma content in the organization. @@ -475,6 +486,7 @@ def load_sigma_asset_specs( organization=initialized_organization, translator_cls=dagster_sigma_translator, sigma_filter=sigma_filter, + fetch_column_data=fetch_column_data, ) .build_defs() .assets, @@ -500,6 +512,7 @@ class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganization organization: SigmaOrganization translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator] sigma_filter: Optional[SigmaFilter] = None + fetch_column_data: bool = True @property def defs_key(self) -> str: @@ -507,7 +520,9 @@ def defs_key(self) -> str: def fetch_state(self) -> SigmaOrganizationData: return asyncio.run( - self.organization.build_organization_data(sigma_filter=self.sigma_filter) + self.organization.build_organization_data( + sigma_filter=self.sigma_filter, fetch_column_data=self.fetch_column_data + ) ) def defs_from_state(self, state: SigmaOrganizationData) -> Definitions: diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index aa5e0d1718015..b6c14d17663a3 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -75,7 +75,7 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) client_secret=fake_client_secret, ) - data = asyncio.run(resource.build_organization_data(sigma_filter=None)) + data = asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True)) assert len(data.workbooks) == 1 assert data.workbooks[0].properties["name"] == "Sample Workbook" @@ -107,7 +107,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]), + fetch_column_data=True, ) ) @@ -115,7 +116,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]), + fetch_column_data=True, ) ) @@ -124,7 +126,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]), + fetch_column_data=True, ) ) @@ -132,6 +135,31 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data assert data.workbooks[0].properties["name"] == "Sample Workbook" +@responses.activate +def test_model_organization_data_skip_fetch(sigma_auth_token: str, sigma_sample_data: None) -> None: + fake_client_id = uuid.uuid4().hex + fake_client_secret = uuid.uuid4().hex + + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=fake_client_id, + client_secret=fake_client_secret, + ) + + data = asyncio.run( + resource.build_organization_data( + sigma_filter=None, + fetch_column_data=False, + ) + ) + + assert len(data.datasets) == 1 + assert data.workbooks[0].datasets == {_inode_from_url(data.datasets[0].properties["url"])} + + assert data.datasets[0].properties["name"] == "Orders Dataset" + assert data.datasets[0].columns == set() + + @responses.activate def test_model_organization_data_warn_err( sigma_auth_token: str, sigma_sample_data: None, lineage_warn: None @@ -147,7 +175,7 @@ def test_model_organization_data_warn_err( ) with pytest.raises(ClientResponseError): - asyncio.run(resource.build_organization_data(sigma_filter=None)) + asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True)) @responses.activate @@ -164,7 +192,9 @@ def test_model_organization_data_warn_no_err( warn_on_lineage_fetch_error=True, ) - data = asyncio.run(resource_warn.build_organization_data(sigma_filter=None)) + data = asyncio.run( + resource_warn.build_organization_data(sigma_filter=None, fetch_column_data=True) + ) assert len(data.workbooks) == 1 assert data.workbooks[0].properties["name"] == "Sample Workbook"