diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 5e8ca4df3ad84..051851b4d2fd1 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -1,12 +1,23 @@ +from collections import defaultdict from enum import Enum -from typing import Any, Dict, Optional +from typing import AbstractSet, Any, Dict, List, Mapping, Optional import requests from dagster import ConfigurableResource +from dagster._record import record +from dagster._utils.cached_method import cached_method from pydantic import Field, PrivateAttr +from sqlglot import exp, parse_one + +SIGMA_PARTNER_ID_TAG = {"X-Sigma-Partner-Id": "dagster"} class SigmaBaseUrl(str, Enum): + """Enumeration of Sigma API base URLs for different cloud providers. + + https://help.sigmacomputing.com/reference/get-started-sigma-api#identify-your-api-request-url + """ + AWS_US = "https://aws-api.sigmacomputing.com" AWS_CANADA = "https://api.ca.aws.sigmacomputing.com" AWS_EUROPE = "https://api.eu.aws.sigmacomputing.com" @@ -15,6 +26,42 @@ class SigmaBaseUrl(str, Enum): GCP = "https://api.sigmacomputing.com" +@record +class SigmaWorkbook: + """Represents a Sigma workbook, a collection of visualizations and queries + for data exploration and analysis. + + https://help.sigmacomputing.com/docs/workbooks + """ + + properties: Dict[str, Any] + datasets: AbstractSet[str] + + +@record +class SigmaDataset: + """Represents a Sigma dataset, a centralized data definition which can + contain aggregations or other manipulations. + + https://help.sigmacomputing.com/docs/datasets + """ + + properties: Dict[str, Any] + columns: AbstractSet[str] + inputs: AbstractSet[str] + + +def _inode_from_url(url: str) -> str: + """Builds a Sigma internal inode value from a Sigma URL.""" + return f'inode-{url.split("/")[-1]}' + + +@record +class SigmaOrganizationData: + workbooks: List[SigmaWorkbook] + datasets: List[SigmaDataset] + + class SigmaOrganization(ConfigurableResource): """Represents a workspace in PowerBI and provides utilities to interact with the PowerBI API. @@ -38,6 +85,7 @@ def _fetch_api_token(self) -> str: headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", + **SIGMA_PARTNER_ID_TAG, }, data={ "grant_type": "client_credentials", @@ -59,7 +107,118 @@ def fetch_json(self, endpoint: str, method: str = "GET") -> Dict[str, Any]: response = requests.request( method=method, url=f"{self.base_url}/v2/{endpoint}", - headers={"Accept": "application/json", "Authorization": f"Bearer {self.api_token}"}, + headers={ + "Accept": "application/json", + "Authorization": f"Bearer {self.api_token}", + **SIGMA_PARTNER_ID_TAG, + }, ) response.raise_for_status() + return response.json() + + @cached_method + def fetch_workbooks(self) -> List[Dict[str, Any]]: + return self.fetch_json("workbooks")["entries"] + + @cached_method + def fetch_datasets(self) -> List[Dict[str, Any]]: + return self.fetch_json("datasets")["entries"] + + @cached_method + def fetch_pages_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]: + return self.fetch_json(f"workbooks/{workbook_id}/pages")["entries"] + + @cached_method + def fetch_elements_for_page(self, workbook_id: str, page_id: str) -> List[Dict[str, Any]]: + return self.fetch_json(f"workbooks/{workbook_id}/pages/{page_id}/elements")["entries"] + + @cached_method + def fetch_lineage_for_element(self, workbook_id: str, element_id: str) -> Dict[str, Any]: + return self.fetch_json(f"workbooks/{workbook_id}/lineage/elements/{element_id}") + + @cached_method + def fetch_columns_for_element(self, workbook_id: str, element_id: str) -> List[Dict[str, Any]]: + return self.fetch_json(f"workbooks/{workbook_id}/elements/{element_id}/columns")["entries"] + + @cached_method + def fetch_queries_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]: + return self.fetch_json(f"workbooks/{workbook_id}/queries")["entries"] + + @cached_method + def build_organization_data(self) -> SigmaOrganizationData: + """Retrieves all workbooks and datasets in the Sigma organization and builds a + SigmaOrganizationData object representing the organization's assets. + """ + raw_workbooks = self.fetch_workbooks() + + dataset_inode_to_name: Mapping[str, str] = {} + columns_by_dataset_inode = defaultdict(set) + deps_by_dataset_inode = defaultdict(set) + dataset_element_to_inode: Mapping[str, str] = {} + + workbooks: List[SigmaWorkbook] = [] + + # Unfortunately, Sigma's API does not nicely model the relationship between various assets. + # We have to do some manual work to infer these relationships ourselves. + for workbook in raw_workbooks: + workbook_deps = set() + pages = self.fetch_pages_for_workbook(workbook["workbookId"]) + for page in pages: + elements = self.fetch_elements_for_page(workbook["workbookId"], page["pageId"]) + for element in elements: + # We extract the list of dataset dependencies from the lineage of each workbook. + lineage = self.fetch_lineage_for_element( + workbook["workbookId"], element["elementId"] + ) + for inode, item in lineage["dependencies"].items(): + if item.get("type") == "dataset": + workbook_deps.add(item["nodeId"]) + dataset_inode_to_name[inode] = item["name"] + dataset_element_to_inode[element["elementId"]] = item["nodeId"] + + # We can't query the list of columns in a dataset directly, so we have to build a partial + # list from the columns which appear in any workbook. + columns = self.fetch_columns_for_element( + workbook["workbookId"], element["elementId"] + ) + for column in columns: + split = column["columnId"].split("/") + if len(split) == 2: + inode, column_name = split + columns_by_dataset_inode[inode].add(column_name) + + # Finally, we extract the list of tables used in each query in the workbook with + # the help of sqlglot. Each query is associated with an "element", or section of the + # workbook. We know which dataset each element is associated with, so we can infer + # the dataset for each query, and from there build a list of tables which the dataset + # depends on. + queries = self.fetch_queries_for_workbook(workbook["workbookId"]) + for query in queries: + element_id = query["elementId"] + table_deps = set( + [ + f"{table.catalog}.{table.db}.{table.this}" + for table in list(parse_one(query["sql"]).find_all(exp.Table)) + if table.catalog + ] + ) + + deps_by_dataset_inode[dataset_element_to_inode[element_id]] = deps_by_dataset_inode[ + dataset_element_to_inode[element_id] + ].union(table_deps) + + workbooks.append(SigmaWorkbook(properties=workbook, datasets=workbook_deps)) + + datasets: List[SigmaDataset] = [] + for dataset in self.fetch_datasets(): + inode = _inode_from_url(dataset["url"]) + datasets.append( + SigmaDataset( + properties=dataset, + columns=columns_by_dataset_inode.get(inode, set()), + inputs=deps_by_dataset_inode[inode], + ) + ) + + return SigmaOrganizationData(workbooks=workbooks, datasets=datasets) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py index f39f2263f60de..4514b9807a0f5 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py @@ -1,4 +1,5 @@ import uuid +from typing import Any, Dict, List import pytest import responses @@ -17,3 +18,196 @@ def sigma_auth_fixture() -> str: ) return fake_access_token + + +def _build_paginated_response(items: List[Dict[str, Any]]) -> Dict[str, Any]: + return { + "entries": items, + "hasMore": False, + "total": len(items), + "nextPage": None, + } + + +@pytest.fixture(name="sigma_sample_data") +def sigma_sample_data_fixture() -> None: + # Single workbook, dataset + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks", + json=_build_paginated_response( + [ + { + "workbookId": "4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e", + "workbookUrlId": "2opi6VLEne4BaPyj00US50", + "createdBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD", + "updatedBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD", + "createdAt": "2024-09-12T21:22:49.072Z", + "updatedAt": "2024-09-12T22:20:39.848Z", + "name": "Sample Workbook", + "url": "https://app.sigmacomputing.com/dagster-labs/workbook/2opi6VLEne4BaPyj00US50", + "path": "My Documents", + "latestVersion": 5, + "ownerId": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD", + } + ] + ), + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/datasets", + json=_build_paginated_response( + [ + { + "datasetId": "178a6bb5-c0e7-4bef-a739-f12710492f16", + "createdBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD", + "updatedBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD", + "createdAt": "2024-09-12T21:16:17.830Z", + "updatedAt": "2024-09-12T21:19:31.000Z", + "name": "Orders Dataset", + "description": "", + "url": "https://app.sigmacomputing.com/dagster-labs/b/Iq557kfHN8KRu76HdGSWi", + } + ] + ), + status=200, + ) + + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages", + json=_build_paginated_response([{"pageId": "qwMyyHBCuC", "name": "Page 1"}]), + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages/qwMyyHBCuC/elements", + json=_build_paginated_response( + [ + { + "elementId": "_MuHPbskp0", + "type": "table", + "name": "sample elementl", + "columns": [ + "Order Id", + "Customer Id", + "workbook renamed date", + "Modified Date", + ], + "vizualizationType": "levelTable", + }, + { + "elementId": "V29pknzHb6", + "type": "visualization", + "name": "Count of Order Date by Status", + "columns": [ + "Order Id", + "Customer Id", + "Order Date", + "Status", + "Modified Date", + "Count of Order Date", + ], + "vizualizationType": "bar", + }, + ] + ), + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/_MuHPbskp0", + json={ + "dependencies": { + "qy_ARjTKcT": { + "nodeId": "qy_ARjTKcT", + "type": "sheet", + "name": "sample elementl", + "elementId": "_MuHPbskp0", + }, + "inode-Iq557kfHN8KRu76HdGSWi": { + "nodeId": "inode-Iq557kfHN8KRu76HdGSWi", + "type": "dataset", + "name": "Orders Dataset", + }, + }, + "edges": [ + {"source": "inode-Iq557kfHN8KRu76HdGSWi", "target": "qy_ARjTKcT", "type": "source"} + ], + }, + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/_MuHPbskp0/columns", + json=_build_paginated_response( + [ + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Id", "label": "Order Id"}, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Customer Id", "label": "Customer Id"}, + { + "columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Date", + "label": "workbook renamed date", + }, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Modified Date", "label": "Modified Date"}, + ] + ), + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/V29pknzHb6", + json={ + "dependencies": { + "SBvQYKT6ui": { + "nodeId": "SBvQYKT6ui", + "type": "sheet", + "name": "Count of Order Date by Status", + "elementId": "V29pknzHb6", + }, + "inode-Iq557kfHN8KRu76HdGSWi": { + "nodeId": "inode-Iq557kfHN8KRu76HdGSWi", + "type": "dataset", + "name": "Orders Dataset", + }, + }, + "edges": [ + {"source": "inode-Iq557kfHN8KRu76HdGSWi", "target": "SBvQYKT6ui", "type": "source"} + ], + }, + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/V29pknzHb6/columns", + json=_build_paginated_response( + [ + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Id", "label": "Order Id"}, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Customer Id", "label": "Customer Id"}, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Date", "label": "Order Date"}, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Status", "label": "Status"}, + {"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Modified Date", "label": "Modified Date"}, + {"columnId": "VBKAHQgx58", "label": "Count of Order Date"}, + ] + ), + status=200, + ) + responses.add( + method=responses.GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/queries", + json=_build_paginated_response( + [ + { + "elementId": "_MuHPbskp0", + "name": "sample elementl", + "sql": 'select ORDER_ID_7 "Order Id", CUSTOMER_ID_8 "Customer Id", CAST_DATE_TO_DATETIME_9 "workbook renamed date", CAST_DATE_TO_DATETIME_11 "Modified Date" from (select ORDER_ID ORDER_ID_7, CUSTOMER_ID CUSTOMER_ID_8, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_9, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_11 from (select * from TESTDB.JAFFLE_SHOP.STG_ORDERS STG_ORDERS limit 1000) Q1) Q2 limit 1000\n\n-- Sigma \u03a3 {"request-id":"69ac9a35-64b3-4840-a53d-3aed43a575ec","email":"ben@dagsterlabs.com"}', + }, + { + "elementId": "V29pknzHb6", + "name": "Count of Order Date by Status", + "sql": 'with Q1 as (select ORDER_ID, CUSTOMER_ID, STATUS, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_5, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_6 from TESTDB.JAFFLE_SHOP.STG_ORDERS STG_ORDERS) select STATUS_10 "Status", COUNT_22 "Count of Order Date", ORDER_ID_7 "Order Id", CUSTOMER_ID_8 "Customer Id", CAST_DATE_TO_DATETIME_7 "Order Date", CAST_DATE_TO_DATETIME_8 "Modified Date" from (select Q3.ORDER_ID_7 ORDER_ID_7, Q3.CUSTOMER_ID_8 CUSTOMER_ID_8, Q3.CAST_DATE_TO_DATETIME_7 CAST_DATE_TO_DATETIME_7, Q3.STATUS_10 STATUS_10, Q3.CAST_DATE_TO_DATETIME_8 CAST_DATE_TO_DATETIME_8, Q6.COUNT_22 COUNT_22, Q6.STATUS_11 STATUS_11 from (select ORDER_ID ORDER_ID_7, CUSTOMER_ID CUSTOMER_ID_8, CAST_DATE_TO_DATETIME_6 CAST_DATE_TO_DATETIME_7, STATUS STATUS_10, CAST_DATE_TO_DATETIME_5 CAST_DATE_TO_DATETIME_8 from Q1 Q2 order by STATUS_10 asc limit 1000) Q3 left join (select count(CAST_DATE_TO_DATETIME_7) COUNT_22, STATUS_10 STATUS_11 from (select CAST_DATE_TO_DATETIME_6 CAST_DATE_TO_DATETIME_7, STATUS STATUS_10 from Q1 Q4) Q5 group by STATUS_10) Q6 on equal_null(Q3.STATUS_10, Q6.STATUS_11)) Q8 order by STATUS_10 asc limit 1000\n\n-- Sigma \u03a3 {"request-id":"69ac9a35-64b3-4840-a53d-3aed43a575ec","email":"ben@dagsterlabs.com"}', + }, + ] + ), + status=200, + ) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py index 1addf036dc2b5..edaac34e1240c 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_resource.py @@ -2,6 +2,7 @@ import responses from dagster_sigma import SigmaBaseUrl, SigmaOrganization +from dagster_sigma.resource import _inode_from_url @responses.activate @@ -53,3 +54,33 @@ def test_basic_fetch(sigma_auth_token: str) -> None: assert len(responses.calls) == 2 assert responses.calls[1].request.headers["Authorization"] == f"Bearer {sigma_auth_token}" + + +@responses.activate +def test_model_organization_data(sigma_auth_token: str, sigma_sample_data: None) -> None: + fake_client_id = uuid.uuid4().hex + fake_client_secret = uuid.uuid4().hex + + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=fake_client_id, + client_secret=fake_client_secret, + ) + + data = resource.build_organization_data() + + assert len(data.workbooks) == 1 + assert data.workbooks[0].properties["name"] == "Sample Workbook" + + assert len(data.datasets) == 1 + assert data.workbooks[0].datasets == {_inode_from_url(data.datasets[0].properties["url"])} + + assert data.datasets[0].properties["name"] == "Orders Dataset" + assert data.datasets[0].columns == { + "Customer Id", + "Order Date", + "Order Id", + "Modified Date", + "Status", + } + assert data.datasets[0].inputs == {"TESTDB.JAFFLE_SHOP.STG_ORDERS"} diff --git a/python_modules/libraries/dagster-sigma/setup.py b/python_modules/libraries/dagster-sigma/setup.py index 66241fea45092..0b53c980cee75 100644 --- a/python_modules/libraries/dagster-sigma/setup.py +++ b/python_modules/libraries/dagster-sigma/setup.py @@ -37,6 +37,7 @@ def get_version() -> str: packages=find_packages(exclude=["dagster_sigma_tests*"]), install_requires=[ f"dagster{pin}", + "sqlglot", ], include_package_data=True, python_requires=">=3.8,<3.13",