From 9ab65c3b67a65b6dfd268d5597525fd508c58cae Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 26 Dec 2024 14:49:30 -0500 Subject: [PATCH] Make PowerBITranslatorData a container class --- .../power-bi/customize-power-bi-asset-defs.py | 2 +- .../dashboard/definitions.py | 4 +- .../dagster_powerbi/resource.py | 3 +- .../dagster_powerbi/translator.py | 42 ++++++++++--------- .../dagster_powerbi_tests/test_asset_specs.py | 27 ++++++++++++ .../dagster_powerbi_tests/test_translator.py | 25 ++++------- 6 files changed, 63 insertions(+), 40 deletions(-) diff --git a/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py b/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py index 982c9796ea0cd..091121656e147 100644 --- a/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py +++ b/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py @@ -23,7 +23,7 @@ class MyCustomPowerBITranslator(DagsterPowerBITranslator): def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec: # We create the default asset spec using super() - default_spec = super().get_asset_spec(data) + default_spec = super().get_asset_spec(data) # type: ignore # We customize the team owner tag for all assets, # and we customize the asset key prefix only for dashboards. return default_spec.replace_attributes( diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py b/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py index dba89ead146d3..e5414bee55a1e 100644 --- a/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py +++ b/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py @@ -21,7 +21,7 @@ class CustomDagsterPowerBITranslator(DagsterPowerBITranslator): def get_report_spec(self, data: PowerBIContentData) -> dg.AssetSpec: return ( super() - .get_report_spec(data) + .get_report_spec(data) # type: ignore .replace_attributes( group_name="reporting", ) @@ -33,7 +33,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> dg.AssetSpec: ] return ( super() - .get_semantic_model_spec(data) + .get_semantic_model_spec(data) # type: ignore .replace_attributes( group_name="reporting", deps=upsteam_table_deps, diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index be3b83f1ed292..16d5f7bcaeb58 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -448,8 +448,7 @@ def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: all_external_asset_specs = [ translator.get_asset_spec( PowerBITranslatorData( - content_type=content.content_type, - properties=content.properties, + content_data=content, workspace_data=state, ) ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 64185bcbda1a4..6b9c3d638a570 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -1,7 +1,7 @@ import re import urllib.parse from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Sequence +from typing import Any, Dict, List, Literal, NamedTuple, Optional, Sequence from dagster import ( UrlMetadataValue, @@ -78,15 +78,22 @@ class PowerBIContentData: properties: Dict[str, Any] -@whitelist_for_serdes -@record -class PowerBITranslatorData(PowerBIContentData): +class PowerBITranslatorData(NamedTuple): """A record representing a piece of content in PowerBI and the PowerBI workspace data. Includes the content's type and data as returned from the API. """ + content_data: "PowerBIContentData" workspace_data: "PowerBIWorkspaceData" + @property + def content_type(self) -> PowerBIContentType: + return self.content_data.content_type + + @property + def properties(self) -> Dict[str, Any]: + return self.content_data.properties + @whitelist_for_serdes @record @@ -165,7 +172,7 @@ class DagsterPowerBITranslator: Subclass this class to implement custom logic for each type of PowerBI content. """ - def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec: data = check.inst(data, PowerBITranslatorData) if data.content_type == PowerBIContentType.DASHBOARD: return self.get_dashboard_spec(data) @@ -182,11 +189,11 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_dashboard_asset_key(self, data: PowerBITranslatorData) -> AssetKey: data = check.inst(data, PowerBITranslatorData) return self.get_dashboard_spec(data).key - def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_dashboard_spec(self, data: PowerBITranslatorData) -> AssetSpec: data = check.inst(data, PowerBITranslatorData) dashboard_id = data.properties["id"] tile_report_ids = [ @@ -195,8 +202,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: report_keys = [ self.get_report_spec( PowerBITranslatorData( - content_type=data.workspace_data.reports_by_id[report_id].content_type, - properties=data.workspace_data.reports_by_id[report_id].properties, + content_data=data.workspace_data.reports_by_id[report_id], workspace_data=data.workspace_data, ) ).key @@ -224,11 +230,11 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_report_asset_key(self, data: PowerBITranslatorData) -> AssetKey: data = check.inst(data, PowerBITranslatorData) return self.get_report_spec(data).key - def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_report_spec(self, data: PowerBITranslatorData) -> AssetSpec: data = check.inst(data, PowerBITranslatorData) report_id = data.properties["id"] dataset_id = data.properties.get("datasetId") @@ -238,8 +244,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: dataset_key = ( self.get_semantic_model_spec( PowerBITranslatorData( - content_type=dataset_data.content_type, - properties=dataset_data.properties, + content_data=dataset_data, workspace_data=data.workspace_data, ) ).key @@ -266,19 +271,18 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_semantic_model_asset_key(self, data: PowerBITranslatorData) -> AssetKey: data = check.inst(data, PowerBITranslatorData) return self.get_semantic_model_spec(data).key - def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_semantic_model_spec(self, data: PowerBITranslatorData) -> AssetSpec: data = check.inst(data, PowerBITranslatorData) dataset_id = data.properties["id"] source_ids = data.properties.get("sources", []) source_keys = [ self.get_data_source_spec( PowerBITranslatorData( - content_type=data.workspace_data.data_sources_by_id[source_id].content_type, - properties=data.workspace_data.data_sources_by_id[source_id].properties, + content_data=data.workspace_data.data_sources_by_id[source_id], workspace_data=data.workspace_data, ) ).key @@ -328,11 +332,11 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_data_source_asset_key(self, data: PowerBITranslatorData) -> AssetKey: data = check.inst(data, PowerBITranslatorData) return self.get_data_source_spec(data).key - def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_data_source_spec(self, data: PowerBITranslatorData) -> AssetSpec: data = check.inst(data, PowerBITranslatorData) connection_name = ( data.properties["connectionDetails"].get("path") diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 1c3ce099ebe8d..9f9bd5891a4cd 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -6,6 +6,7 @@ from dagster import materialize from dagster._config.field_utils import EnvVar from dagster._core.code_pointer import CodePointer +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.asset_decorator import asset from dagster._core.definitions.definitions_class import Definitions @@ -24,6 +25,7 @@ from dagster_powerbi import PowerBIWorkspace from dagster_powerbi.assets import build_semantic_model_refresh_asset_definition from dagster_powerbi.resource import BASE_API_URL, PowerBIToken, load_powerbi_asset_specs +from dagster_powerbi.translator import DagsterPowerBITranslator, PowerBIContentData from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL @@ -82,6 +84,31 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] +class MyCustomTranslator(DagsterPowerBITranslator): + def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + default_spec = super().get_asset_spec(data) # type: ignore + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + ).merge_attributes(metadata={"custom": "metadata"}) + + +def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id: str) -> None: + fake_token = uuid.uuid4().hex + resource = PowerBIWorkspace( + credentials=PowerBIToken(api_token=fake_token), + workspace_id=workspace_id, + ) + all_asset_specs = load_powerbi_asset_specs( + workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False + ) + asset_spec = next(spec for spec in all_asset_specs) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"] + assert "dagster/kind/powerbi" in asset_spec.tags + + @lazy_definitions def state_derived_defs_two_workspaces() -> Definitions: resource = PowerBIWorkspace( diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index 99b99a57badd3..4a5264f445556 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -18,8 +18,7 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None translator = DagsterPowerBITranslator() asset_spec = translator.get_asset_spec( PowerBITranslatorData( - content_type=dashboard.content_type, - properties=dashboard.properties, + content_data=dashboard, workspace_data=workspace_data, ) ) @@ -46,8 +45,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: translator = DagsterPowerBITranslator() asset_spec = translator.get_asset_spec( PowerBITranslatorData( - content_type=report.content_type, - properties=report.properties, + content_data=report, workspace_data=workspace_data, ) ) @@ -75,8 +73,7 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None translator = DagsterPowerBITranslator() asset_spec = translator.get_asset_spec( PowerBITranslatorData( - content_type=semantic_model.content_type, - properties=semantic_model.properties, + content_data=semantic_model, workspace_data=workspace_data, ) ) @@ -116,8 +113,7 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor translator = DagsterPowerBITranslator() asset_spec = translator.get_asset_spec( PowerBITranslatorData( - content_type=semantic_model.content_type, - properties=semantic_model.properties, + content_data=semantic_model, workspace_data=second_workspace_data, ) ) @@ -147,11 +143,10 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor class MyCustomTranslator(DagsterPowerBITranslator): def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: - default_spec = super().get_asset_spec(data) + default_spec = super().get_asset_spec(data) # type: ignore return default_spec.replace_attributes( key=default_spec.key.with_prefix("prefix"), - metadata={**default_spec.metadata, "custom": "metadata"}, - ) + ).merge_attributes(metadata={"custom": "metadata"}) def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None: @@ -159,9 +154,8 @@ def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> Non translator = MyCustomTranslator() asset_spec = translator.get_asset_spec( - PowerBITranslatorData( - content_type=dashboard.content_type, - properties=dashboard.properties, + PowerBITranslatorData( # type: ignore + content_data=dashboard, workspace_data=workspace_data, ) ) @@ -191,8 +185,7 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) translator = DagsterPowerBITranslator() asset_spec = translator.get_asset_spec( PowerBITranslatorData( - content_type=report_no_dataset.content_type, - properties=report_no_dataset.properties, + content_data=report_no_dataset, workspace_data=workspace_data, ) )