diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index f9bbb7af2f8e7..7287b937c3062 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -387,7 +387,7 @@ async def safe_fetch_lineage_for_element( @cached_method async def build_organization_data( - self, sigma_filter: Optional[SigmaFilter] + self, sigma_filter: Optional[SigmaFilter], fetch_column_data: bool ) -> SigmaOrganizationData: """Retrieves all workbooks and datasets in the Sigma organization and builds a SigmaOrganizationData object representing the organization's assets. @@ -415,7 +415,10 @@ async def build_organization_data( datasets: List[SigmaDataset] = [] deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode() - columns_by_dataset_inode = await self._fetch_dataset_columns_by_inode() + + columns_by_dataset_inode = ( + await self._fetch_dataset_columns_by_inode() if fetch_column_data else {} + ) for dataset in await self._fetch_datasets(): inode = _inode_from_url(dataset["url"]) @@ -438,6 +441,7 @@ def build_defs( self, dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, + fetch_column_data: bool = True, ) -> Definitions: """Returns a Definitions object representing the Sigma content in the organization. @@ -449,7 +453,9 @@ def build_defs( Definitions: The set of assets representing the Sigma content in the organization. """ return Definitions( - assets=load_sigma_asset_specs(self, dagster_sigma_translator, sigma_filter) + assets=load_sigma_asset_specs( + self, dagster_sigma_translator, sigma_filter, fetch_column_data + ) ) @@ -460,11 +466,16 @@ def load_sigma_asset_specs( [SigmaOrganizationData], DagsterSigmaTranslator ] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, + fetch_column_data: bool = True, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Sigma content in the organization. Args: organization (SigmaOrganization): The Sigma organization to fetch assets from. + dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use + to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator. + sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch. + fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow. Returns: List[AssetSpec]: The set of assets representing the Sigma content in the organization. @@ -475,6 +486,7 @@ def load_sigma_asset_specs( organization=initialized_organization, translator_cls=dagster_sigma_translator, sigma_filter=sigma_filter, + fetch_column_data=fetch_column_data, ) .build_defs() .assets, @@ -500,6 +512,7 @@ class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganization organization: SigmaOrganization translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator] sigma_filter: Optional[SigmaFilter] = None + fetch_column_data: bool = True @property def defs_key(self) -> str: @@ -507,7 +520,9 @@ def defs_key(self) -> str: def fetch_state(self) -> SigmaOrganizationData: return asyncio.run( - self.organization.build_organization_data(sigma_filter=self.sigma_filter) + self.organization.build_organization_data( + sigma_filter=self.sigma_filter, fetch_column_data=self.fetch_column_data + ) ) def defs_from_state(self, state: SigmaOrganizationData) -> Definitions: diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index aa5e0d1718015..b6c14d17663a3 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -75,7 +75,7 @@ def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) client_secret=fake_client_secret, ) - data = asyncio.run(resource.build_organization_data(sigma_filter=None)) + data = asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True)) assert len(data.workbooks) == 1 assert data.workbooks[0].properties["name"] == "Sample Workbook" @@ -107,7 +107,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "Test Folder")]), + fetch_column_data=True, ) ) @@ -115,7 +116,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents", "My Subfolder")]), + fetch_column_data=True, ) ) @@ -124,7 +126,8 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data data = asyncio.run( resource.build_organization_data( - sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]) + sigma_filter=SigmaFilter(workbook_folders=[("My Documents",)]), + fetch_column_data=True, ) ) @@ -132,6 +135,31 @@ def test_model_organization_data_filter(sigma_auth_token: str, sigma_sample_data assert data.workbooks[0].properties["name"] == "Sample Workbook" +@responses.activate +def test_model_organization_data_skip_fetch(sigma_auth_token: str, sigma_sample_data: None) -> None: + fake_client_id = uuid.uuid4().hex + fake_client_secret = uuid.uuid4().hex + + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=fake_client_id, + client_secret=fake_client_secret, + ) + + data = asyncio.run( + resource.build_organization_data( + sigma_filter=None, + fetch_column_data=False, + ) + ) + + assert len(data.datasets) == 1 + assert data.workbooks[0].datasets == {_inode_from_url(data.datasets[0].properties["url"])} + + assert data.datasets[0].properties["name"] == "Orders Dataset" + assert data.datasets[0].columns == set() + + @responses.activate def test_model_organization_data_warn_err( sigma_auth_token: str, sigma_sample_data: None, lineage_warn: None @@ -147,7 +175,7 @@ def test_model_organization_data_warn_err( ) with pytest.raises(ClientResponseError): - asyncio.run(resource.build_organization_data(sigma_filter=None)) + asyncio.run(resource.build_organization_data(sigma_filter=None, fetch_column_data=True)) @responses.activate @@ -164,7 +192,9 @@ def test_model_organization_data_warn_no_err( warn_on_lineage_fetch_error=True, ) - data = asyncio.run(resource_warn.build_organization_data(sigma_filter=None)) + data = asyncio.run( + resource_warn.build_organization_data(sigma_filter=None, fetch_column_data=True) + ) assert len(data.workbooks) == 1 assert data.workbooks[0].properties["name"] == "Sample Workbook"