Skip to content

Commit

Permalink
Make PowerBITranslatorData a container class
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 26, 2024
1 parent e018d8f commit 9ab65c3
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
# We customize the team owner tag for all assets,
# and we customize the asset key prefix only for dashboards.
return default_spec.replace_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CustomDagsterPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
return (
super()
.get_report_spec(data)
.get_report_spec(data) # type: ignore
.replace_attributes(
group_name="reporting",
)
Expand All @@ -33,7 +33,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
]
return (
super()
.get_semantic_model_spec(data)
.get_semantic_model_spec(data) # type: ignore
.replace_attributes(
group_name="reporting",
deps=upsteam_table_deps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,7 @@ def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
all_external_asset_specs = [
translator.get_asset_spec(
PowerBITranslatorData(
content_type=content.content_type,
properties=content.properties,
content_data=content,
workspace_data=state,
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import urllib.parse
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Sequence
from typing import Any, Dict, List, Literal, NamedTuple, Optional, Sequence

from dagster import (
UrlMetadataValue,
Expand Down Expand Up @@ -78,15 +78,22 @@ class PowerBIContentData:
properties: Dict[str, Any]


@whitelist_for_serdes
@record
class PowerBITranslatorData(PowerBIContentData):
class PowerBITranslatorData(NamedTuple):
"""A record representing a piece of content in PowerBI and the PowerBI workspace data.
Includes the content's type and data as returned from the API.
"""

content_data: "PowerBIContentData"
workspace_data: "PowerBIWorkspaceData"

@property
def content_type(self) -> PowerBIContentType:
return self.content_data.content_type

@property
def properties(self) -> Dict[str, Any]:
return self.content_data.properties


@whitelist_for_serdes
@record
Expand Down Expand Up @@ -165,7 +172,7 @@ class DagsterPowerBITranslator:
Subclass this class to implement custom logic for each type of PowerBI content.
"""

def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
if data.content_type == PowerBIContentType.DASHBOARD:
return self.get_dashboard_spec(data)
Expand All @@ -182,11 +189,11 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_dashboard_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_dashboard_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dashboard_id = data.properties["id"]
tile_report_ids = [
Expand All @@ -195,8 +202,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
report_keys = [
self.get_report_spec(
PowerBITranslatorData(
content_type=data.workspace_data.reports_by_id[report_id].content_type,
properties=data.workspace_data.reports_by_id[report_id].properties,
content_data=data.workspace_data.reports_by_id[report_id],
workspace_data=data.workspace_data,
)
).key
Expand Down Expand Up @@ -224,11 +230,11 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_report_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_report_spec(data).key

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_report_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
report_id = data.properties["id"]
dataset_id = data.properties.get("datasetId")
Expand All @@ -238,8 +244,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
dataset_key = (
self.get_semantic_model_spec(
PowerBITranslatorData(
content_type=dataset_data.content_type,
properties=dataset_data.properties,
content_data=dataset_data,
workspace_data=data.workspace_data,
)
).key
Expand All @@ -266,19 +271,18 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_semantic_model_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_semantic_model_spec(data).key

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_semantic_model_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_spec(
PowerBITranslatorData(
content_type=data.workspace_data.data_sources_by_id[source_id].content_type,
properties=data.workspace_data.data_sources_by_id[source_id].properties,
content_data=data.workspace_data.data_sources_by_id[source_id],
workspace_data=data.workspace_data,
)
).key
Expand Down Expand Up @@ -328,11 +332,11 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_data_source_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_data_source_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
connection_name = (
data.properties["connectionDetails"].get("path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster import materialize
from dagster._config.field_utils import EnvVar
from dagster._core.code_pointer import CodePointer
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.definitions_class import Definitions
Expand All @@ -24,6 +25,7 @@
from dagster_powerbi import PowerBIWorkspace
from dagster_powerbi.assets import build_semantic_model_refresh_asset_definition
from dagster_powerbi.resource import BASE_API_URL, PowerBIToken, load_powerbi_asset_specs
from dagster_powerbi.translator import DagsterPowerBITranslator, PowerBIContentData

from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL

Expand Down Expand Up @@ -82,6 +84,31 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]


class MyCustomTranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
default_spec = super().get_asset_spec(data) # type: ignore
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
).merge_attributes(metadata={"custom": "metadata"})


def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
credentials=PowerBIToken(api_token=fake_token),
workspace_id=workspace_id,
)
all_asset_specs = load_powerbi_asset_specs(
workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]
assert "dagster/kind/powerbi" in asset_spec.tags


@lazy_definitions
def state_derived_defs_two_workspaces() -> Definitions:
resource = PowerBIWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=dashboard.content_type,
properties=dashboard.properties,
content_data=dashboard,
workspace_data=workspace_data,
)
)
Expand All @@ -46,8 +45,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=report.content_type,
properties=report.properties,
content_data=report,
workspace_data=workspace_data,
)
)
Expand Down Expand Up @@ -75,8 +73,7 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=semantic_model.content_type,
properties=semantic_model.properties,
content_data=semantic_model,
workspace_data=workspace_data,
)
)
Expand Down Expand Up @@ -116,8 +113,7 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=semantic_model.content_type,
properties=semantic_model.properties,
content_data=semantic_model,
workspace_data=second_workspace_data,
)
)
Expand Down Expand Up @@ -147,21 +143,19 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor

class MyCustomTranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)
).merge_attributes(metadata={"custom": "metadata"})


def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = MyCustomTranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=dashboard.content_type,
properties=dashboard.properties,
PowerBITranslatorData( # type: ignore
content_data=dashboard,
workspace_data=workspace_data,
)
)
Expand Down Expand Up @@ -191,8 +185,7 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=report_no_dataset.content_type,
properties=report_no_dataset.properties,
content_data=report_no_dataset,
workspace_data=workspace_data,
)
)
Expand Down

0 comments on commit 9ab65c3

Please sign in to comment.