Skip to content

Commit

Permalink
[dagster-sigma] Add support for direct workbook->table deps (#25991)
Browse files Browse the repository at this point in the history
## Summary

Sigma workbooks typically depend on datasets, which passthrough data
from a database to structured tables that users can refresh or rename.
However, you can also point workbooks directly at tables in your
warehouse. This PR allows our integration to be aware of the latter,
creating upstream external assets for each table.

## Test Plan

Update unit test, test locally against real Sigma instance.

## Changelog

> [dagster-sigma] Added support for direct workbook->warehouse table
dependencies.
  • Loading branch information
benpankow authored Nov 19, 2024
1 parent 5312639 commit 9367501
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 16 deletions.
43 changes: 33 additions & 10 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DagsterSigmaTranslator,
SigmaDataset,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
_inode_from_url,
)
Expand Down Expand Up @@ -203,6 +204,12 @@ async def _fetch_workbooks(self) -> List[Dict[str, Any]]:
async def _fetch_datasets(self) -> List[Dict[str, Any]]:
return await self._fetch_json_async_paginated_entries("datasets")

@cached_method
async def _fetch_tables(self) -> List[Dict[str, Any]]:
return await self._fetch_json_async_paginated_entries(
"files", query_params={"typeFilters": "table"}
)

@cached_method
async def _fetch_pages_for_workbook(self, workbook_id: str) -> List[Dict[str, Any]]:
return await self._fetch_json_async_paginated_entries(f"workbooks/{workbook_id}/pages")
Expand Down Expand Up @@ -356,7 +363,7 @@ async def process_workbook(workbook: Dict[str, Any]) -> None:
split = column["columnId"].split("/")
if len(split) == 2:
inode, column_name = split
columns_by_dataset_inode[inode].add(column_name)
columns_by_dataset_inode[inode].add(column_name)

await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks])

Expand All @@ -371,7 +378,8 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]:
return {member["memberId"]: member["email"] for member in members}

async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook:
workbook_deps = set()
dataset_deps = set()
direct_table_deps = set()

logger.info("Fetching data for workbook %s", raw_workbook_data["workbookId"])

Expand Down Expand Up @@ -409,11 +417,14 @@ async def safe_fetch_lineage_for_element(
for lineage in lineages:
for item in lineage["dependencies"].values():
if item.get("type") == "dataset":
workbook_deps.add(item["nodeId"])
dataset_deps.add(item["nodeId"])
if item.get("type") == "table":
direct_table_deps.add(item["nodeId"])

return SigmaWorkbook(
properties=raw_workbook_data,
datasets=workbook_deps,
datasets=dataset_deps,
direct_table_deps=direct_table_deps,
owner_email=None,
)

Expand Down Expand Up @@ -453,16 +464,17 @@ async def build_organization_data(
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
)

used_datasets = None
if _sigma_filter and not _sigma_filter.include_unused_datasets:
used_datasets = set()
for workbook in workbooks:
used_datasets = set()
used_tables = set()
for workbook in workbooks:
if _sigma_filter and not _sigma_filter.include_unused_datasets:
used_datasets.update(workbook.datasets)
used_tables.update(workbook.direct_table_deps)

logger.info("Fetching dataset data")
for dataset in await self._fetch_datasets():
inode = _inode_from_url(dataset["url"])
if used_datasets is None or inode in used_datasets:
if _sigma_filter.include_unused_datasets or inode in used_datasets:
datasets.append(
SigmaDataset(
properties=dataset,
Expand All @@ -471,7 +483,18 @@ async def build_organization_data(
)
)

return SigmaOrganizationData(workbooks=workbooks, datasets=datasets)
tables: List[SigmaTable] = []
logger.info("Fetching table data")
for table in await self._fetch_tables():
inode = _inode_from_url(table["urlId"])
if inode in used_tables:
tables.append(
SigmaTable(
properties=table,
)
)

return SigmaOrganizationData(workbooks=workbooks, datasets=datasets, tables=tables)

@public
@deprecated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SigmaWorkbook:

properties: Dict[str, Any]
datasets: AbstractSet[str]
direct_table_deps: AbstractSet[str]
owner_email: Optional[str]


Expand All @@ -53,16 +54,35 @@ class SigmaDataset:
inputs: AbstractSet[str]


@whitelist_for_serdes
@record
class SigmaTable:
"""Represents a table loaded into Sigma."""

properties: Dict[str, Any]

def get_table_path(self) -> List[str]:
"""Extracts the qualified table path from the name and path properties,
e.g. ["MY_DB", "MY_SCHEMA", "MY_TABLE"].
"""
return self.properties["path"].split("/")[1:] + [self.properties["name"]]


@whitelist_for_serdes
@record
class SigmaOrganizationData:
workbooks: List[SigmaWorkbook]
datasets: List[SigmaDataset]
tables: List[SigmaTable]

@cached_method
def get_datasets_by_inode(self) -> Dict[str, SigmaDataset]:
return {_inode_from_url(dataset.properties["url"]): dataset for dataset in self.datasets}

@cached_method
def get_tables_by_inode(self) -> Dict[str, SigmaTable]:
return {_inode_from_url(table.properties["urlId"]): table for table in self.tables}


class DagsterSigmaTranslator:
"""Translator class which converts raw response data from the Sigma API into AssetSpecs.
Expand Down Expand Up @@ -91,11 +111,21 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
),
}
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
tables = [
self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps
]

return AssetSpec(
key=self.get_asset_key(data),
metadata=metadata,
kinds={"sigma"},
deps={self.get_asset_key(dataset) for dataset in datasets},
deps={
*[self.get_asset_key(dataset) for dataset in datasets],
*[
asset_key_from_table_name(".".join(table.get_table_path()).lower())
for table in tables
],
},
owners=[data.owner_email] if data.owner_email else None,
)
elif isinstance(data, SigmaDataset):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def _build_paginated_response(
"url": "https://app.sigmacomputing.com/dagster-labs/b/Iq557kfHN8KRu76HdGSWi",
}

SAMPLE_TABLE_INODE = "inode-C5ZiVq1GkaANMaD334AnA"
SAMPLE_TABLE_DATA = {
"path": "Content Root/My Database/My Schema",
"urlId": SAMPLE_TABLE_INODE.split("-")[-1],
"name": "Payments",
}


@pytest.fixture(name="lineage_warn")
def lineage_warn_fixture(responses: aioresponses) -> None:
Expand Down Expand Up @@ -113,6 +120,12 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
body=json.dumps(_build_paginated_response([SAMPLE_DATASET_DATA])),
status=200,
)
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/files?limit=1000&typeFilters=table",
body=json.dumps(_build_paginated_response([SAMPLE_TABLE_DATA])),
status=200,
)

responses.add(
method=hdrs.METH_GET,
Expand Down Expand Up @@ -147,6 +160,13 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
],
"vizualizationType": "bar",
},
{
"elementId": "A49pknzHb1",
"type": "table",
"name": "Payments",
"columns": [],
"vizualizationType": "levelTable",
},
]
responses.add(
method=hdrs.METH_GET,
Expand All @@ -157,7 +177,7 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/pages/qwMyyHBCuC/elements?limit=1000&page=1",
body=json.dumps(_build_paginated_response(elements, 1, 2)),
body=json.dumps(_build_paginated_response(elements, 1, 3)),
status=200,
)
responses.add(
Expand Down Expand Up @@ -210,6 +230,29 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
),
status=200,
)
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/A49pknzHb1",
body=json.dumps(
{
"dependencies": {
SAMPLE_TABLE_INODE: {
"nodeId": SAMPLE_TABLE_INODE,
"type": "table",
"name": "Payments",
},
},
}
),
status=200,
)
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/A49pknzHb1/columns",
body=json.dumps(_build_paginated_response([])),
status=200,
)

responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/lineage/elements/V29pknzHb6",
Expand Down Expand Up @@ -239,6 +282,7 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
),
status=200,
)

responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/elements/V29pknzHb6/columns",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
DagsterSigmaTranslator,
SigmaDataset,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
)

from dagster_sigma_tests.conftest import (
SAMPLE_DATASET_DATA,
SAMPLE_DATASET_INODE,
SAMPLE_TABLE_DATA,
SAMPLE_TABLE_INODE,
SAMPLE_WORKBOOK_DATA,
)

Expand All @@ -21,12 +24,17 @@ def test_workbook_translation() -> None:
properties=SAMPLE_WORKBOOK_DATA,
datasets={SAMPLE_DATASET_INODE},
owner_email="[email protected]",
direct_table_deps={SAMPLE_TABLE_INODE},
)

sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set())

translator = DagsterSigmaTranslator(
SigmaOrganizationData(workbooks=[sample_workbook], datasets=[sample_dataset])
SigmaOrganizationData(
workbooks=[sample_workbook],
datasets=[sample_dataset],
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
)
)

asset_spec = translator.get_asset_spec(sample_workbook)
Expand All @@ -36,8 +44,11 @@ def test_workbook_translation() -> None:
assert asset_spec.metadata["dagster_sigma/version"] == 5
assert asset_spec.metadata["dagster_sigma/created_at"].value == 1726176169.072
assert build_kind_tag_key("sigma") in asset_spec.tags
assert {dep.asset_key for dep in asset_spec.deps} == {AssetKey(["Orders_Dataset"])}
assert asset_spec.owners == ["[email protected]"]
assert {dep.asset_key for dep in asset_spec.deps} == {
AssetKey(["Orders_Dataset"]),
AssetKey(["my_database", "my_schema", "payments"]),
}


def test_dataset_translation() -> None:
Expand All @@ -48,7 +59,7 @@ def test_dataset_translation() -> None:
)

translator = DagsterSigmaTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset])
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)

asset_spec = translator.get_asset_spec(sample_dataset)
Expand Down Expand Up @@ -89,7 +100,9 @@ def get_asset_spec(self, data: SigmaDataset) -> AssetSpec:
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
)

translator = MyCustomTranslator(SigmaOrganizationData(workbooks=[], datasets=[sample_dataset]))
translator = MyCustomTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)

asset_spec = translator.get_asset_spec(sample_dataset)

Expand Down

0 comments on commit 9367501

Please sign in to comment.