diff --git a/hooli_data_eng/assets/powerbi_assets.py b/hooli_data_eng/assets/powerbi_assets.py index 524fe509..e0300922 100644 --- a/hooli_data_eng/assets/powerbi_assets.py +++ b/hooli_data_eng/assets/powerbi_assets.py @@ -1,5 +1,5 @@ -from dagster import AssetKey, AssetSpec, EnvVar -from dagster._core.definitions.asset_spec import replace_attributes +from dagster import AssetKey, AssetSpec, EnvVar, TableSchema, TableColumn +from dagster._core.definitions.asset_spec import replace_attributes, merge_attributes from dagster_powerbi import ( load_powerbi_asset_specs, DagsterPowerBITranslator, @@ -12,20 +12,29 @@ class MyCustomPowerBITranslator(DagsterPowerBITranslator): def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_report_spec(data) - return replace_attributes( + specs_replaced = replace_attributes( spec, description=f"Report link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/reports/{data.properties["id"]}", group_name="BI", - tags={"core_kpis":"","dagster-powerbi/asset_type": "report"}) + ) + return merge_attributes(specs_replaced, tags={"core_kpis":""}) def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_semantic_model_spec(data) - return replace_attributes( + spec_replaced = replace_attributes( spec, description=f"Semantic model link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/datasets/{data.properties["id"]}/details", group_name="BI", deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps], - tags={"core_kpis":"","dagster-powerbi/asset_type": "semantic_model"}) + ) + return merge_attributes(spec_replaced, + metadata={"dagster/column_schema": TableSchema( + columns=[TableColumn( + name=col["name"], + type=col["dataType"]) + for col in data.properties["tables"][0]["columns"]])}, + tags={"core_kpis":""}, + ) def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: spec = super().get_dashboard_spec(data)