Skip to content

Commit

Permalink
[3/n][dagster-sigma] Build out organization parsing logic (#24477)
Browse files Browse the repository at this point in the history
## Summary

Introduces  `build_organization_data`, which does the under-the-hood heavy lifting of building the Sigma object model.

There are two main object types we care about in Sigma's ontology:  [datasets](https://help.sigmacomputing.com/docs/datasets).
 and [workbooks](https://help.sigmacomputing.com/docs/workbooks-overview).

Datasets are the Sigma equivalent of Power BI Semantic Models, they are views (optionally materializable, which we might want to handle later) which sit on top of tables or other data sources.

Workbooks consist of several pages composed of elements, each of which is a table or other visualization built from a dataset.

Sigma's APIs are a little cumbersome here - we'd like to build a relationship of dependencies from workbooks to datasets and datasets to raw source data, but few APIs exist for datasets. It sounds like these will be available in the future once the migration to [data models](https://help.sigmacomputing.com/docs/intro-to-data-models) is complete, but these don't have APIs yet.

For now, our procedure is:

- Build linkages from workbooks to the datasets they depend on
- Inspect the data columns used in workbooks to assemble a partial view of datasets' column schema (which is not directly available via API)
- Inspect the SQL queries used in each "element" in a workbook using `sqlglot`. Each source table referred to by the SQL query is a source table used by the dataset linked to that element.

## Test Plan

Unit test validating that our model of organization data matches expectations. Plan to add more complex unit tests.


## Changelog

> NOCHANGELOG
  • Loading branch information
benpankow authored Sep 19, 2024
1 parent 6fdd8d4 commit fa475a1
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 2 deletions.
163 changes: 161 additions & 2 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, Optional
from typing import AbstractSet, Any, Dict, List, Mapping, Optional

import requests
from dagster import ConfigurableResource
from dagster._record import record
from dagster._utils.cached_method import cached_method
from pydantic import Field, PrivateAttr
from sqlglot import exp, parse_one

SIGMA_PARTNER_ID_TAG = {"X-Sigma-Partner-Id": "dagster"}


class SigmaBaseUrl(str, Enum):
"""Enumeration of Sigma API base URLs for different cloud providers.
https://help.sigmacomputing.com/reference/get-started-sigma-api#identify-your-api-request-url
"""

AWS_US = "https://aws-api.sigmacomputing.com"
AWS_CANADA = "https://api.ca.aws.sigmacomputing.com"
AWS_EUROPE = "https://api.eu.aws.sigmacomputing.com"
Expand All @@ -15,6 +26,42 @@ class SigmaBaseUrl(str, Enum):
GCP = "https://api.sigmacomputing.com"


@record
class SigmaWorkbook:
"""Represents a Sigma workbook, a collection of visualizations and queries
for data exploration and analysis.
https://help.sigmacomputing.com/docs/workbooks
"""

properties: Dict[str, Any]
datasets: AbstractSet[str]


@record
class SigmaDataset:
"""Represents a Sigma dataset, a centralized data definition which can
contain aggregations or other manipulations.
https://help.sigmacomputing.com/docs/datasets
"""

properties: Dict[str, Any]
columns: AbstractSet[str]
inputs: AbstractSet[str]


def _inode_from_url(url: str) -> str:
"""Builds a Sigma internal inode value from a Sigma URL."""
return f'inode-{url.split("/")[-1]}'


@record
class SigmaOrganizationData:
workbooks: List[SigmaWorkbook]
datasets: List[SigmaDataset]


class SigmaOrganization(ConfigurableResource):
"""Represents a workspace in PowerBI and provides utilities
to interact with the PowerBI API.
Expand All @@ -38,6 +85,7 @@ def _fetch_api_token(self) -> str:
headers={
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
**SIGMA_PARTNER_ID_TAG,
},
data={
"grant_type": "client_credentials",
Expand All @@ -59,7 +107,118 @@ def fetch_json(self, endpoint: str, method: str = "GET") -> Dict[str, Any]:
response = requests.request(
method=method,
url=f"{self.base_url}/v2/{endpoint}",
headers={"Accept": "application/json", "Authorization": f"Bearer {self.api_token}"},
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.api_token}",
**SIGMA_PARTNER_ID_TAG,
},
)
response.raise_for_status()

return response.json()

@cached_method
def fetch_workbooks(self) -> List[Dict[str, Any]]:
return self.fetch_json("workbooks")["entries"]

@cached_method
def fetch_datasets(self) -> List[Dict[str, Any]]:
return self.fetch_json("datasets")["entries"]

@cached_method
def fetch_pages_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]:
return self.fetch_json(f"workbooks/{workbook_id}/pages")["entries"]

@cached_method
def fetch_elements_for_page(self, workbook_id: str, page_id: str) -> List[Dict[str, Any]]:
return self.fetch_json(f"workbooks/{workbook_id}/pages/{page_id}/elements")["entries"]

@cached_method
def fetch_lineage_for_element(self, workbook_id: str, element_id: str) -> Dict[str, Any]:
return self.fetch_json(f"workbooks/{workbook_id}/lineage/elements/{element_id}")

@cached_method
def fetch_columns_for_element(self, workbook_id: str, element_id: str) -> List[Dict[str, Any]]:
return self.fetch_json(f"workbooks/{workbook_id}/elements/{element_id}/columns")["entries"]

@cached_method
def fetch_queries_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]:
return self.fetch_json(f"workbooks/{workbook_id}/queries")["entries"]

@cached_method
def build_organization_data(self) -> SigmaOrganizationData:
"""Retrieves all workbooks and datasets in the Sigma organization and builds a
SigmaOrganizationData object representing the organization's assets.
"""
raw_workbooks = self.fetch_workbooks()

dataset_inode_to_name: Mapping[str, str] = {}
columns_by_dataset_inode = defaultdict(set)
deps_by_dataset_inode = defaultdict(set)
dataset_element_to_inode: Mapping[str, str] = {}

workbooks: List[SigmaWorkbook] = []

# Unfortunately, Sigma's API does not nicely model the relationship between various assets.
# We have to do some manual work to infer these relationships ourselves.
for workbook in raw_workbooks:
workbook_deps = set()
pages = self.fetch_pages_for_workbook(workbook["workbookId"])
for page in pages:
elements = self.fetch_elements_for_page(workbook["workbookId"], page["pageId"])
for element in elements:
# We extract the list of dataset dependencies from the lineage of each workbook.
lineage = self.fetch_lineage_for_element(
workbook["workbookId"], element["elementId"]
)
for inode, item in lineage["dependencies"].items():
if item.get("type") == "dataset":
workbook_deps.add(item["nodeId"])
dataset_inode_to_name[inode] = item["name"]
dataset_element_to_inode[element["elementId"]] = item["nodeId"]

# We can't query the list of columns in a dataset directly, so we have to build a partial
# list from the columns which appear in any workbook.
columns = self.fetch_columns_for_element(
workbook["workbookId"], element["elementId"]
)
for column in columns:
split = column["columnId"].split("/")
if len(split) == 2:
inode, column_name = split
columns_by_dataset_inode[inode].add(column_name)

# Finally, we extract the list of tables used in each query in the workbook with
# the help of sqlglot. Each query is associated with an "element", or section of the
# workbook. We know which dataset each element is associated with, so we can infer
# the dataset for each query, and from there build a list of tables which the dataset
# depends on.
queries = self.fetch_queries_for_workbook(workbook["workbookId"])
for query in queries:
element_id = query["elementId"]
table_deps = set(
[
f"{table.catalog}.{table.db}.{table.this}"
for table in list(parse_one(query["sql"]).find_all(exp.Table))
if table.catalog
]
)

deps_by_dataset_inode[dataset_element_to_inode[element_id]] = deps_by_dataset_inode[
dataset_element_to_inode[element_id]
].union(table_deps)

workbooks.append(SigmaWorkbook(properties=workbook, datasets=workbook_deps))

datasets: List[SigmaDataset] = []
for dataset in self.fetch_datasets():
inode = _inode_from_url(dataset["url"])
datasets.append(
SigmaDataset(
properties=dataset,
columns=columns_by_dataset_inode.get(inode, set()),
inputs=deps_by_dataset_inode[inode],
)
)

return SigmaOrganizationData(workbooks=workbooks, datasets=datasets)
194 changes: 194 additions & 0 deletions python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from typing import Any, Dict, List

import pytest
import responses
Expand All @@ -17,3 +18,196 @@ def sigma_auth_fixture() -> str:
)

return fake_access_token


def _build_paginated_response(items: List[Dict[str, Any]]) -> Dict[str, Any]:
return {
"entries": items,
"hasMore": False,
"total": len(items),
"nextPage": None,
}


@pytest.fixture(name="sigma_sample_data")
def sigma_sample_data_fixture() -> None:
# Single workbook, dataset
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks",
json=_build_paginated_response(
[
{
"workbookId": "4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e",
"workbookUrlId": "2opi6VLEne4BaPyj00US50",
"createdBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
"updatedBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
"createdAt": "2024-09-12T21:22:49.072Z",
"updatedAt": "2024-09-12T22:20:39.848Z",
"name": "Sample Workbook",
"url": "https://app.sigmacomputing.com/dagster-labs/workbook/2opi6VLEne4BaPyj00US50",
"path": "My Documents",
"latestVersion": 5,
"ownerId": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
}
]
),
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/datasets",
json=_build_paginated_response(
[
{
"datasetId": "178a6bb5-c0e7-4bef-a739-f12710492f16",
"createdBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
"updatedBy": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
"createdAt": "2024-09-12T21:16:17.830Z",
"updatedAt": "2024-09-12T21:19:31.000Z",
"name": "Orders Dataset",
"description": "",
"url": "https://app.sigmacomputing.com/dagster-labs/b/Iq557kfHN8KRu76HdGSWi",
}
]
),
status=200,
)

responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages",
json=_build_paginated_response([{"pageId": "qwMyyHBCuC", "name": "Page 1"}]),
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages/qwMyyHBCuC/elements",
json=_build_paginated_response(
[
{
"elementId": "_MuHPbskp0",
"type": "table",
"name": "sample elementl",
"columns": [
"Order Id",
"Customer Id",
"workbook renamed date",
"Modified Date",
],
"vizualizationType": "levelTable",
},
{
"elementId": "V29pknzHb6",
"type": "visualization",
"name": "Count of Order Date by Status",
"columns": [
"Order Id",
"Customer Id",
"Order Date",
"Status",
"Modified Date",
"Count of Order Date",
],
"vizualizationType": "bar",
},
]
),
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/_MuHPbskp0",
json={
"dependencies": {
"qy_ARjTKcT": {
"nodeId": "qy_ARjTKcT",
"type": "sheet",
"name": "sample elementl",
"elementId": "_MuHPbskp0",
},
"inode-Iq557kfHN8KRu76HdGSWi": {
"nodeId": "inode-Iq557kfHN8KRu76HdGSWi",
"type": "dataset",
"name": "Orders Dataset",
},
},
"edges": [
{"source": "inode-Iq557kfHN8KRu76HdGSWi", "target": "qy_ARjTKcT", "type": "source"}
],
},
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/_MuHPbskp0/columns",
json=_build_paginated_response(
[
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Id", "label": "Order Id"},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Customer Id", "label": "Customer Id"},
{
"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Date",
"label": "workbook renamed date",
},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Modified Date", "label": "Modified Date"},
]
),
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/V29pknzHb6",
json={
"dependencies": {
"SBvQYKT6ui": {
"nodeId": "SBvQYKT6ui",
"type": "sheet",
"name": "Count of Order Date by Status",
"elementId": "V29pknzHb6",
},
"inode-Iq557kfHN8KRu76HdGSWi": {
"nodeId": "inode-Iq557kfHN8KRu76HdGSWi",
"type": "dataset",
"name": "Orders Dataset",
},
},
"edges": [
{"source": "inode-Iq557kfHN8KRu76HdGSWi", "target": "SBvQYKT6ui", "type": "source"}
],
},
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/V29pknzHb6/columns",
json=_build_paginated_response(
[
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Id", "label": "Order Id"},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Customer Id", "label": "Customer Id"},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Order Date", "label": "Order Date"},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Status", "label": "Status"},
{"columnId": "inode-Iq557kfHN8KRu76HdGSWi/Modified Date", "label": "Modified Date"},
{"columnId": "VBKAHQgx58", "label": "Count of Order Date"},
]
),
status=200,
)
responses.add(
method=responses.GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/queries",
json=_build_paginated_response(
[
{
"elementId": "_MuHPbskp0",
"name": "sample elementl",
"sql": 'select ORDER_ID_7 "Order Id", CUSTOMER_ID_8 "Customer Id", CAST_DATE_TO_DATETIME_9 "workbook renamed date", CAST_DATE_TO_DATETIME_11 "Modified Date" from (select ORDER_ID ORDER_ID_7, CUSTOMER_ID CUSTOMER_ID_8, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_9, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_11 from (select * from TESTDB.JAFFLE_SHOP.STG_ORDERS STG_ORDERS limit 1000) Q1) Q2 limit 1000\n\n-- Sigma \u03a3 {"request-id":"69ac9a35-64b3-4840-a53d-3aed43a575ec","email":"[email protected]"}',
},
{
"elementId": "V29pknzHb6",
"name": "Count of Order Date by Status",
"sql": 'with Q1 as (select ORDER_ID, CUSTOMER_ID, STATUS, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_5, ORDER_DATE::timestamp_ltz CAST_DATE_TO_DATETIME_6 from TESTDB.JAFFLE_SHOP.STG_ORDERS STG_ORDERS) select STATUS_10 "Status", COUNT_22 "Count of Order Date", ORDER_ID_7 "Order Id", CUSTOMER_ID_8 "Customer Id", CAST_DATE_TO_DATETIME_7 "Order Date", CAST_DATE_TO_DATETIME_8 "Modified Date" from (select Q3.ORDER_ID_7 ORDER_ID_7, Q3.CUSTOMER_ID_8 CUSTOMER_ID_8, Q3.CAST_DATE_TO_DATETIME_7 CAST_DATE_TO_DATETIME_7, Q3.STATUS_10 STATUS_10, Q3.CAST_DATE_TO_DATETIME_8 CAST_DATE_TO_DATETIME_8, Q6.COUNT_22 COUNT_22, Q6.STATUS_11 STATUS_11 from (select ORDER_ID ORDER_ID_7, CUSTOMER_ID CUSTOMER_ID_8, CAST_DATE_TO_DATETIME_6 CAST_DATE_TO_DATETIME_7, STATUS STATUS_10, CAST_DATE_TO_DATETIME_5 CAST_DATE_TO_DATETIME_8 from Q1 Q2 order by STATUS_10 asc limit 1000) Q3 left join (select count(CAST_DATE_TO_DATETIME_7) COUNT_22, STATUS_10 STATUS_11 from (select CAST_DATE_TO_DATETIME_6 CAST_DATE_TO_DATETIME_7, STATUS STATUS_10 from Q1 Q4) Q5 group by STATUS_10) Q6 on equal_null(Q3.STATUS_10, Q6.STATUS_11)) Q8 order by STATUS_10 asc limit 1000\n\n-- Sigma \u03a3 {"request-id":"69ac9a35-64b3-4840-a53d-3aed43a575ec","email":"[email protected]"}',
},
]
),
status=200,
)
Loading

0 comments on commit fa475a1

Please sign in to comment.