Skip to content

Commit

Permalink
add tableschema to semantic models use replace and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
cnolanminich committed Oct 31, 2024
1 parent cd1b656 commit a9698c0
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions hooli_data_eng/assets/powerbi_assets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import AssetKey, AssetSpec, EnvVar
from dagster._core.definitions.asset_spec import replace_attributes
from dagster import AssetKey, AssetSpec, EnvVar, TableSchema, TableColumn
from dagster._core.definitions.asset_spec import replace_attributes, merge_attributes
from dagster_powerbi import (
load_powerbi_asset_specs,
DagsterPowerBITranslator,
Expand All @@ -12,20 +12,29 @@
class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_report_spec(data)
return replace_attributes(
specs_replaced = replace_attributes(
spec,
description=f"Report link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/reports/{data.properties["id"]}",
group_name="BI",
tags={"core_kpis":"","dagster-powerbi/asset_type": "report"})
)
return merge_attributes(specs_replaced, tags={"core_kpis":""})

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_semantic_model_spec(data)
return replace_attributes(
spec_replaced = replace_attributes(
spec,
description=f"Semantic model link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/datasets/{data.properties["id"]}/details",
group_name="BI",
deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps],
tags={"core_kpis":"","dagster-powerbi/asset_type": "semantic_model"})
)
return merge_attributes(spec_replaced,
metadata={"dagster/column_schema": TableSchema(
columns=[TableColumn(
name=col["name"],
type=col["dataType"])
for col in data.properties["tables"][0]["columns"]])},
tags={"core_kpis":""},
)

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_dashboard_spec(data)
Expand Down

0 comments on commit a9698c0

Please sign in to comment.