Skip to content

Commit

Permalink
[dagster-powerbi] Gracefully handle datasets without datasource IDs (#…
Browse files Browse the repository at this point in the history
…25136)

## Summary

Some semantic model types hold references to data sources that lack an
ID - add some code to gracefully handle this case.

## Test Plan

Updated unit test to include sample data source without an ID.
  • Loading branch information
benpankow authored Oct 8, 2024
1 parent 66ff88c commit 4fe6fda
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import json
import re
import time
from dataclasses import dataclass
Expand All @@ -13,6 +14,7 @@
from dagster._core.definitions.events import Failure
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._utils.cached_method import cached_method
from dagster._utils.security import non_secure_md5_hash_str
from pydantic import Field, PrivateAttr

from dagster_powerbi.translator import (
Expand All @@ -31,6 +33,14 @@ def _clean_op_name(name: str) -> str:
return re.sub(r"[^a-z0-9A-Z]+", "_", name)


def generate_data_source_id(data_source: Dict[str, Any]) -> str:
"""Generates a unique ID for a data source based on its properties.
We use this for cases where the API does not provide a unique ID for a data source.
This ID is never surfaced to the user and is only used internally to track dependencies.
"""
return non_secure_md5_hash_str(json.dumps(data_source, sort_keys=True).encode())


class PowerBICredentials(ConfigurableResource, abc.ABC):
@property
def api_token(self) -> str: ...
Expand Down Expand Up @@ -224,21 +234,30 @@ def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData:
for data in self._get_reports()["value"]
]
semantic_models_data = self._get_semantic_models()["value"]
data_sources_by_id = {}
data_sources = []
for dataset in semantic_models_data:
dataset_sources = self._get_semantic_model_sources(dataset["id"])["value"]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources]
for data_source in dataset_sources:
data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=data_source

dataset_sources_with_id = [
source
if "datasourceId" in source
else {"datasourceId": generate_data_source_id(source), **source}
for source in dataset_sources
]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources_with_id]
for data_source in dataset_sources_with_id:
data_sources.append(
PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=data_source
)
)
semantic_models = [
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData.from_content_data(
self.workspace_id,
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
dashboards + reports + semantic_models + data_sources,
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.tags import build_kind_tag
from dagster._core.definitions.tags.tag_set import NamespacedTagSet
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -157,7 +158,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_dashboard_asset_key(data),
deps=report_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="dashboard")},
tags={**PowerBITagSet(asset_type="dashboard"), **build_kind_tag("powerbi")},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -173,7 +174,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_report_asset_key(data),
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
tags={**PowerBITagSet(asset_type="report"), **build_kind_tag("powerbi")},
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -191,7 +192,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_semantic_model_asset_key(data),
deps=source_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="semantic_model")},
tags={**PowerBITagSet(asset_type="semantic_model"), **build_kind_tag("powerbi")},
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -209,5 +210,5 @@ def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
tags={**PowerBITagSet(asset_type="data_source")},
tags={**PowerBITagSet(asset_type="data_source"), **build_kind_tag("powerbi")},
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest
import responses
from dagster_powerbi.resource import BASE_API_URL
from dagster_powerbi.resource import BASE_API_URL, generate_data_source_id
from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData

SAMPLE_DASH = {
Expand Down Expand Up @@ -91,7 +91,6 @@
{
"datasourceType": "File",
"connectionDetails": {"path": "c:\\users\\mimyersm\\desktop\\sales & marketing datas.xlsx"},
"datasourceId": "46c83f90-3eaa-4658-b716-2307bc56e74d",
"gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c",
},
]
Expand All @@ -118,7 +117,11 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
sample_semantic_model = SAMPLE_SEMANTIC_MODEL.copy()

sample_data_sources = SAMPLE_DATA_SOURCES
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]
data_sources = [
ds if "datasourceId" in ds else {"datasourceId": generate_data_source_id(ds), **ds}
for ds in sample_data_sources
]
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in data_sources]

return PowerBIWorkspaceData(
workspace_id=workspace_id,
Expand All @@ -141,7 +144,7 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
ds["datasourceId"]: PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=ds
)
for ds in sample_data_sources
for ds in data_sources
},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.tags import build_kind_tag
from dagster_powerbi import DagsterPowerBITranslator
from dagster_powerbi.translator import PowerBIContentData, PowerBIWorkspaceData

Expand All @@ -20,7 +21,10 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/dashboards/efee0b80-4511-42e1-8ee0-2544fd44e122"
)
}
assert asset_spec.tags == {"dagster-powerbi/asset_type": "dashboard"}
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "dashboard",
**build_kind_tag("powerbi"),
}


def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -38,7 +42,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee"
)
}
assert asset_spec.tags == {"dagster-powerbi/asset_type": "report"}
assert asset_spec.tags == {"dagster-powerbi/asset_type": "report", **build_kind_tag("powerbi")}


def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -57,7 +61,10 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20"
)
}
assert asset_spec.tags == {"dagster-powerbi/asset_type": "semantic_model"}
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "semantic_model",
**build_kind_tag("powerbi"),
}


class MyCustomTranslator(DagsterPowerBITranslator):
Expand Down

0 comments on commit 4fe6fda

Please sign in to comment.