diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 96bd91c0748e2..91ff38e499a37 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -1,4 +1,5 @@ import abc +import json import re import time from dataclasses import dataclass @@ -13,6 +14,7 @@ from dagster._core.definitions.events import Failure from dagster._core.execution.context.asset_execution_context import AssetExecutionContext from dagster._utils.cached_method import cached_method +from dagster._utils.security import non_secure_md5_hash_str from pydantic import Field, PrivateAttr from dagster_powerbi.translator import ( @@ -31,6 +33,14 @@ def _clean_op_name(name: str) -> str: return re.sub(r"[^a-z0-9A-Z]+", "_", name) +def generate_data_source_id(data_source: Dict[str, Any]) -> str: + """Generates a unique ID for a data source based on its properties. + We use this for cases where the API does not provide a unique ID for a data source. + This ID is never surfaced to the user and is only used internally to track dependencies. + """ + return non_secure_md5_hash_str(json.dumps(data_source, sort_keys=True).encode()) + + class PowerBICredentials(ConfigurableResource, abc.ABC): @property def api_token(self) -> str: ... @@ -224,13 +234,22 @@ def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData: for data in self._get_reports()["value"] ] semantic_models_data = self._get_semantic_models()["value"] - data_sources_by_id = {} + data_sources = [] for dataset in semantic_models_data: dataset_sources = self._get_semantic_model_sources(dataset["id"])["value"] - dataset["sources"] = [source["datasourceId"] for source in dataset_sources] - for data_source in dataset_sources: - data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData( - content_type=PowerBIContentType.DATA_SOURCE, properties=data_source + + dataset_sources_with_id = [ + source + if "datasourceId" in source + else {"datasourceId": generate_data_source_id(source), **source} + for source in dataset_sources + ] + dataset["sources"] = [source["datasourceId"] for source in dataset_sources_with_id] + for data_source in dataset_sources_with_id: + data_sources.append( + PowerBIContentData( + content_type=PowerBIContentType.DATA_SOURCE, properties=data_source + ) ) semantic_models = [ PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset) @@ -238,7 +257,7 @@ def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData: ] return PowerBIWorkspaceData.from_content_data( self.workspace_id, - dashboards + reports + semantic_models + list(data_sources_by_id.values()), + dashboards + reports + semantic_models + data_sources, ) @public diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 20822c9557c09..28139b8ef9393 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -11,6 +11,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet from dagster._core.definitions.metadata.metadata_value import MetadataValue +from dagster._core.definitions.tags import build_kind_tag from dagster._core.definitions.tags.tag_set import NamespacedTagSet from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @@ -157,7 +158,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_dashboard_asset_key(data), deps=report_keys, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, - tags={**PowerBITagSet(asset_type="dashboard")}, + tags={**PowerBITagSet(asset_type="dashboard"), **build_kind_tag("powerbi")}, ) def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -173,7 +174,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_report_asset_key(data), deps=[dataset_key] if dataset_key else None, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, - tags={**PowerBITagSet(asset_type="report")}, + tags={**PowerBITagSet(asset_type="report"), **build_kind_tag("powerbi")}, ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -191,7 +192,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_semantic_model_asset_key(data), deps=source_keys, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, - tags={**PowerBITagSet(asset_type="semantic_model")}, + tags={**PowerBITagSet(asset_type="semantic_model"), **build_kind_tag("powerbi")}, ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -209,5 +210,5 @@ def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: return AssetSpec( key=self.get_data_source_asset_key(data), - tags={**PowerBITagSet(asset_type="data_source")}, + tags={**PowerBITagSet(asset_type="data_source"), **build_kind_tag("powerbi")}, ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py index 7e02f00013cc4..509020ab2a5f0 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py @@ -2,7 +2,7 @@ import pytest import responses -from dagster_powerbi.resource import BASE_API_URL +from dagster_powerbi.resource import BASE_API_URL, generate_data_source_id from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData SAMPLE_DASH = { @@ -91,7 +91,6 @@ { "datasourceType": "File", "connectionDetails": {"path": "c:\\users\\mimyersm\\desktop\\sales & marketing datas.xlsx"}, - "datasourceId": "46c83f90-3eaa-4658-b716-2307bc56e74d", "gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c", }, ] @@ -118,7 +117,11 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData: sample_semantic_model = SAMPLE_SEMANTIC_MODEL.copy() sample_data_sources = SAMPLE_DATA_SOURCES - sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources] + data_sources = [ + ds if "datasourceId" in ds else {"datasourceId": generate_data_source_id(ds), **ds} + for ds in sample_data_sources + ] + sample_semantic_model["sources"] = [ds["datasourceId"] for ds in data_sources] return PowerBIWorkspaceData( workspace_id=workspace_id, @@ -141,7 +144,7 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData: ds["datasourceId"]: PowerBIContentData( content_type=PowerBIContentType.DATA_SOURCE, properties=ds ) - for ds in sample_data_sources + for ds in data_sources }, ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index de9a1e83127af..80a3cc3d0e74b 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -1,6 +1,7 @@ from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_value import MetadataValue +from dagster._core.definitions.tags import build_kind_tag from dagster_powerbi import DagsterPowerBITranslator from dagster_powerbi.translator import PowerBIContentData, PowerBIWorkspaceData @@ -20,7 +21,10 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/dashboards/efee0b80-4511-42e1-8ee0-2544fd44e122" ) } - assert asset_spec.tags == {"dagster-powerbi/asset_type": "dashboard"} + assert asset_spec.tags == { + "dagster-powerbi/asset_type": "dashboard", + **build_kind_tag("powerbi"), + } def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: @@ -38,7 +42,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee" ) } - assert asset_spec.tags == {"dagster-powerbi/asset_type": "report"} + assert asset_spec.tags == {"dagster-powerbi/asset_type": "report", **build_kind_tag("powerbi")} def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None: @@ -57,7 +61,10 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20" ) } - assert asset_spec.tags == {"dagster-powerbi/asset_type": "semantic_model"} + assert asset_spec.tags == { + "dagster-powerbi/asset_type": "semantic_model", + **build_kind_tag("powerbi"), + } class MyCustomTranslator(DagsterPowerBITranslator):