Skip to content

Commit

Permalink
[dagster-powerbi] Move contextual data from DagsterPowerBITranslator …
Browse files Browse the repository at this point in the history
…to PowerBITranslatorData
  • Loading branch information
maximearmstrong committed Dec 26, 2024
1 parent 78c3c38 commit e018d8f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
PowerBIContentData,
PowerBIContentType,
PowerBITagSet,
PowerBITranslatorData,
PowerBIWorkspaceData,
)

Expand Down Expand Up @@ -437,15 +438,22 @@ def fetch_state(self) -> PowerBIWorkspaceData:
)

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls(context=state)
translator = self.translator_cls()

all_external_data = [
*state.dashboards_by_id.values(),
*state.reports_by_id.values(),
*state.semantic_models_by_id.values(),
]
all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
translator.get_asset_spec(
PowerBITranslatorData(
content_type=content.content_type,
properties=content.properties,
workspace_data=state,
)
)
for content in all_external_data
]

return Definitions(assets=[*all_external_asset_specs])
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ class PowerBIContentData:
properties: Dict[str, Any]


@whitelist_for_serdes
@record
class PowerBITranslatorData(PowerBIContentData):
"""A record representing a piece of content in PowerBI and the PowerBI workspace data.
Includes the content's type and data as returned from the API.
"""

workspace_data: "PowerBIWorkspaceData"


@whitelist_for_serdes
@record
class PowerBIWorkspaceData:
Expand Down Expand Up @@ -155,14 +165,8 @@ class DagsterPowerBITranslator:
Subclass this class to implement custom logic for each type of PowerBI content.
"""

def __init__(self, context: PowerBIWorkspaceData):
self._context = context

@property
def workspace_data(self) -> PowerBIWorkspaceData:
return self._context

def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
if data.content_type == PowerBIContentType.DASHBOARD:
return self.get_dashboard_spec(data)
elif data.content_type == PowerBIContentType.REPORT:
Expand All @@ -179,20 +183,28 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key
self.get_report_spec(
PowerBITranslatorData(
content_type=data.workspace_data.reports_by_id[report_id].content_type,
properties=data.workspace_data.reports_by_id[report_id].properties,
workspace_data=data.workspace_data,
)
).key
for report_id in tile_report_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/dashboards/{dashboard_id}"
)

return AssetSpec(
Expand All @@ -213,18 +225,30 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_report_spec(data).key

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
report_id = data.properties["id"]
dataset_id = data.properties.get("datasetId")
dataset_data = (
self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
data.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
)
dataset_key = (
self.get_semantic_model_spec(
PowerBITranslatorData(
content_type=dataset_data.content_type,
properties=dataset_data.properties,
workspace_data=data.workspace_data,
)
).key
if dataset_data
else None
)
dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/reports/{report_id}"
)

owner = data.properties.get("createdBy")
Expand All @@ -243,18 +267,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_semantic_model_spec(data).key

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key
self.get_data_source_spec(
PowerBITranslatorData(
content_type=data.workspace_data.data_sources_by_id[source_id].content_type,
properties=data.workspace_data.data_sources_by_id[source_id].properties,
workspace_data=data.workspace_data,
)
).key
for source_id in source_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/datasets/{dataset_id}"
)

for table in data.properties.get("tables", []):
Expand Down Expand Up @@ -297,9 +329,11 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
connection_name = (
data.properties["connectionDetails"].get("path")
or data.properties["connectionDetails"].get("url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,25 @@
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import build_kind_tag
from dagster_powerbi import DagsterPowerBITranslator
from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData
from dagster_powerbi.translator import (
PowerBIContentData,
PowerBIContentType,
PowerBITranslatorData,
PowerBIWorkspaceData,
)


def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=dashboard.content_type,
properties=dashboard.properties,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand All @@ -32,8 +43,14 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None
def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
report = next(iter(workspace_data.reports_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(report)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=report.content_type,
properties=report.properties,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand All @@ -55,8 +72,14 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None:
semantic_model = next(iter(workspace_data.semantic_models_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(semantic_model)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=semantic_model.content_type,
properties=semantic_model.properties,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand Down Expand Up @@ -90,8 +113,14 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None:
semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values()))

translator = DagsterPowerBITranslator(second_workspace_data)
asset_spec = translator.get_asset_spec(semantic_model)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=semantic_model.content_type,
properties=semantic_model.properties,
workspace_data=second_workspace_data,
)
)
assert asset_spec.metadata == {
"dagster-powerbi/web_url": MetadataValue.url(
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20"
Expand Down Expand Up @@ -128,8 +157,14 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = MyCustomTranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)
translator = MyCustomTranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=dashboard.content_type,
properties=dashboard.properties,
workspace_data=workspace_data,
)
)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
Expand All @@ -153,8 +188,14 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData)
},
)

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(report_no_dataset)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_type=report_no_dataset.content_type,
properties=report_no_dataset.properties,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand Down

0 comments on commit e018d8f

Please sign in to comment.