diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index c7014a4e81195..bbb939a58345f 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -38,6 +38,7 @@ DagsterSigmaTranslator, SigmaDataset, SigmaOrganizationData, + SigmaTable, SigmaWorkbook, _inode_from_url, ) @@ -203,6 +204,12 @@ async def _fetch_workbooks(self) -> List[Dict[str, Any]]: async def _fetch_datasets(self) -> List[Dict[str, Any]]: return await self._fetch_json_async_paginated_entries("datasets") + @cached_method + async def _fetch_tables(self) -> List[Dict[str, Any]]: + return await self._fetch_json_async_paginated_entries( + "files", query_params={"typeFilters": "table"} + ) + @cached_method async def _fetch_pages_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]: return await self._fetch_json_async_paginated_entries(f"workbooks/{workbook_id}/pages") @@ -356,7 +363,7 @@ async def process_workbook(workbook: Dict[str, Any]) -> None: split = column["columnId"].split("/") if len(split) == 2: inode, column_name = split - columns_by_dataset_inode[inode].add(column_name) + columns_by_dataset_inode[inode].add(column_name) await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks]) @@ -371,7 +378,8 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]: return {member["memberId"]: member["email"] for member in members} async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook: - workbook_deps = set() + dataset_deps = set() + direct_table_deps = set() logger.info("Fetching data for workbook %s", raw_workbook_data["workbookId"]) @@ -409,11 +417,14 @@ async def safe_fetch_lineage_for_element( for lineage in lineages: for item in lineage["dependencies"].values(): if item.get("type") == "dataset": - workbook_deps.add(item["nodeId"]) + dataset_deps.add(item["nodeId"]) + if item.get("type") == "table": + direct_table_deps.add(item["nodeId"]) return SigmaWorkbook( properties=raw_workbook_data, - datasets=workbook_deps, + datasets=dataset_deps, + direct_table_deps=direct_table_deps, owner_email=None, ) @@ -453,16 +464,17 @@ async def build_organization_data( await self._fetch_dataset_columns_by_inode() if fetch_column_data else {} ) - used_datasets = None - if _sigma_filter and not _sigma_filter.include_unused_datasets: - used_datasets = set() - for workbook in workbooks: + used_datasets = set() + used_tables = set() + for workbook in workbooks: + if _sigma_filter and not _sigma_filter.include_unused_datasets: used_datasets.update(workbook.datasets) + used_tables.update(workbook.direct_table_deps) logger.info("Fetching dataset data") for dataset in await self._fetch_datasets(): inode = _inode_from_url(dataset["url"]) - if used_datasets is None or inode in used_datasets: + if _sigma_filter.include_unused_datasets or inode in used_datasets: datasets.append( SigmaDataset( properties=dataset, @@ -471,7 +483,18 @@ async def build_organization_data( ) ) - return SigmaOrganizationData(workbooks=workbooks, datasets=datasets) + tables: List[SigmaTable] = [] + logger.info("Fetching table data") + for table in await self._fetch_tables(): + inode = _inode_from_url(table["urlId"]) + if inode in used_tables: + tables.append( + SigmaTable( + properties=table, + ) + ) + + return SigmaOrganizationData(workbooks=workbooks, datasets=datasets, tables=tables) @public @deprecated( diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py index 36d831e615d3a..62a2039af0fb4 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py @@ -36,6 +36,7 @@ class SigmaWorkbook: properties: Dict[str, Any] datasets: AbstractSet[str] + direct_table_deps: AbstractSet[str] owner_email: Optional[str] @@ -53,16 +54,35 @@ class SigmaDataset: inputs: AbstractSet[str] +@whitelist_for_serdes +@record +class SigmaTable: + """Represents a table loaded into Sigma.""" + + properties: Dict[str, Any] + + def get_table_path(self) -> List[str]: + """Extracts the qualified table path from the name and path properties, + e.g. ["MY_DB", "MY_SCHEMA", "MY_TABLE"]. + """ + return self.properties["path"].split("/")[1:] + [self.properties["name"]] + + @whitelist_for_serdes @record class SigmaOrganizationData: workbooks: List[SigmaWorkbook] datasets: List[SigmaDataset] + tables: List[SigmaTable] @cached_method def get_datasets_by_inode(self) -> Dict[str, SigmaDataset]: return {_inode_from_url(dataset.properties["url"]): dataset for dataset in self.datasets} + @cached_method + def get_tables_by_inode(self) -> Dict[str, SigmaTable]: + return {_inode_from_url(table.properties["urlId"]): table for table in self.tables} + class DagsterSigmaTranslator: """Translator class which converts raw response data from the Sigma API into AssetSpecs. @@ -91,11 +111,21 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: ), } datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets] + tables = [ + self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps + ] + return AssetSpec( key=self.get_asset_key(data), metadata=metadata, kinds={"sigma"}, - deps={self.get_asset_key(dataset) for dataset in datasets}, + deps={ + *[self.get_asset_key(dataset) for dataset in datasets], + *[ + asset_key_from_table_name(".".join(table.get_table_path()).lower()) + for table in tables + ], + }, owners=[data.owner_email] if data.owner_email else None, ) elif isinstance(data, SigmaDataset): diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py index 3245bcffdcdc7..d1b95e65ddeb3 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py @@ -76,6 +76,13 @@ def _build_paginated_response( "url": "https://app.sigmacomputing.com/dagster-labs/b/Iq557kfHN8KRu76HdGSWi", } +SAMPLE_TABLE_INODE = "inode-C5ZiVq1GkaANMaD334AnA" +SAMPLE_TABLE_DATA = { + "path": "Content Root/My Database/My Schema", + "urlId": SAMPLE_TABLE_INODE.split("-")[-1], + "name": "Payments", +} + @pytest.fixture(name="lineage_warn") def lineage_warn_fixture(responses: aioresponses) -> None: @@ -113,6 +120,12 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: body=json.dumps(_build_paginated_response([SAMPLE_DATASET_DATA])), status=200, ) + responses.add( + method=hdrs.METH_GET, + url="https://aws-api.sigmacomputing.com/v2/files?limit=1000&typeFilters=table", + body=json.dumps(_build_paginated_response([SAMPLE_TABLE_DATA])), + status=200, + ) responses.add( method=hdrs.METH_GET, @@ -147,6 +160,13 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: ], "vizualizationType": "bar", }, + { + "elementId": "A49pknzHb1", + "type": "table", + "name": "Payments", + "columns": [], + "vizualizationType": "levelTable", + }, ] responses.add( method=hdrs.METH_GET, @@ -157,7 +177,7 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: responses.add( method=hdrs.METH_GET, url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages/qwMyyHBCuC/elements?limit=1000&page=1", - body=json.dumps(_build_paginated_response(elements, 1, 2)), + body=json.dumps(_build_paginated_response(elements, 1, 3)), status=200, ) responses.add( @@ -210,6 +230,29 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: ), status=200, ) + responses.add( + method=hdrs.METH_GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/A49pknzHb1", + body=json.dumps( + { + "dependencies": { + SAMPLE_TABLE_INODE: { + "nodeId": SAMPLE_TABLE_INODE, + "type": "table", + "name": "Payments", + }, + }, + } + ), + status=200, + ) + responses.add( + method=hdrs.METH_GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/A49pknzHb1/columns", + body=json.dumps(_build_paginated_response([])), + status=200, + ) + responses.add( method=hdrs.METH_GET, url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/V29pknzHb6", @@ -239,6 +282,7 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: ), status=200, ) + responses.add( method=hdrs.METH_GET, url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/V29pknzHb6/columns", diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py index e7751298b814c..35762fffcf452 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py @@ -6,12 +6,15 @@ DagsterSigmaTranslator, SigmaDataset, SigmaOrganizationData, + SigmaTable, SigmaWorkbook, ) from dagster_sigma_tests.conftest import ( SAMPLE_DATASET_DATA, SAMPLE_DATASET_INODE, + SAMPLE_TABLE_DATA, + SAMPLE_TABLE_INODE, SAMPLE_WORKBOOK_DATA, ) @@ -21,12 +24,17 @@ def test_workbook_translation() -> None: properties=SAMPLE_WORKBOOK_DATA, datasets={SAMPLE_DATASET_INODE}, owner_email="ben@dagsterlabs.com", + direct_table_deps={SAMPLE_TABLE_INODE}, ) sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set()) translator = DagsterSigmaTranslator( - SigmaOrganizationData(workbooks=[sample_workbook], datasets=[sample_dataset]) + SigmaOrganizationData( + workbooks=[sample_workbook], + datasets=[sample_dataset], + tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)], + ) ) asset_spec = translator.get_asset_spec(sample_workbook) @@ -36,8 +44,11 @@ def test_workbook_translation() -> None: assert asset_spec.metadata["dagster_sigma/version"] == 5 assert asset_spec.metadata["dagster_sigma/created_at"].value == 1726176169.072 assert build_kind_tag_key("sigma") in asset_spec.tags - assert {dep.asset_key for dep in asset_spec.deps} == {AssetKey(["Orders_Dataset"])} assert asset_spec.owners == ["ben@dagsterlabs.com"] + assert {dep.asset_key for dep in asset_spec.deps} == { + AssetKey(["Orders_Dataset"]), + AssetKey(["my_database", "my_schema", "payments"]), + } def test_dataset_translation() -> None: @@ -48,7 +59,7 @@ def test_dataset_translation() -> None: ) translator = DagsterSigmaTranslator( - SigmaOrganizationData(workbooks=[], datasets=[sample_dataset]) + SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[]) ) asset_spec = translator.get_asset_spec(sample_dataset) @@ -89,7 +100,9 @@ def get_asset_spec(self, data: SigmaDataset) -> AssetSpec: inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"}, ) - translator = MyCustomTranslator(SigmaOrganizationData(workbooks=[], datasets=[sample_dataset])) + translator = MyCustomTranslator( + SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[]) + ) asset_spec = translator.get_asset_spec(sample_dataset)