Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add initial powerbi assets #127

Merged
merged 18 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions hooli_data_eng/assets/powerbi_assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster import AssetKey, AssetSpec
from dagster._core.definitions.asset_spec import replace_attributes
from dagster_powerbi import (
load_powerbi_asset_specs,
DagsterPowerBITranslator,
build_semantic_model_refresh_asset_definition,
)
from dagster_powerbi.translator import PowerBIContentData
from hooli_data_eng.powerbi_workspace import power_bi_workspace


class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
return super().get_report_spec(data)._replace(group_name="BI")

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_semantic_model_spec(data)
return replace_attributes(
spec,
group_name="BI",
deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps],
tags={"core_kpis":"","dagster-powerbi/asset_type": "semantic_model"})

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
return super().get_dashboard_spec(data)._replace(group_name="BI")

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return super().get_data_source_spec(data)._replace(group_name="BI")

powerbi_assets = [
build_semantic_model_refresh_asset_definition(resource_key="power_bi", spec=spec)
if spec.tags.get("dagster-powerbi/asset_type") == "semantic_model"
else spec
for spec in load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator, use_workspace_scan=True)
]
9 changes: 6 additions & 3 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
Definitions,
Expand All @@ -12,6 +11,7 @@
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.powerbi_assets import powerbi_assets
from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job
from hooli_data_eng.assets.marketing import check_avg_orders
from hooli_data_eng.assets.raw_data import check_users, raw_data_schema_checks
Expand All @@ -22,7 +22,6 @@
from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor
from hooli_data_eng.assets.marketing import avg_orders_freshness_check, min_order_freshness_check, min_order_freshness_check_sensor, check_avg_orders, avg_orders_freshness_check_schedule
from hooli_data_eng.assets.dbt_assets import weekly_freshness_check, weekly_freshness_check_sensor

# ---------------------------------------------------
# Assets

Expand Down Expand Up @@ -59,6 +58,7 @@

marketing_assets = load_assets_from_package_module(marketing, group_name="MARKETING")


# ---------------------------------------------------
# Definitions

Expand All @@ -70,7 +70,7 @@
{"max_concurrent": 3}
),
assets=link_code_references_to_git_if_cloud(
with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]),
with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets, *powerbi_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli_data_eng/definitions.py"
Expand All @@ -89,3 +89,6 @@
],
jobs=[analytics_job, predict_job, dbt_slim_ci_job],
)


#defs = Definitions.merge(static_defs, powerbi_assets.build_defs(dagster_powerbi_translator=MyCustomPowerBITranslator, enable_refresh_semantic_models=True))
cnolanminich marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions hooli_data_eng/powerbi_workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import EnvVar
from dagster_powerbi import PowerBIWorkspace, PowerBIServicePrincipal

# Connect using a service principal
power_bi_workspace = PowerBIWorkspace(
cnolanminich marked this conversation as resolved.
Show resolved Hide resolved
credentials=PowerBIServicePrincipal(
client_id=EnvVar("AZURE_POWERBI_CLIENT_ID"),
client_secret=EnvVar("AZURE_POWERBI_CLIENT_SECRET"),
tenant_id=EnvVar("AZURE_POWERBI_TENANT_ID"),
),
workspace_id=EnvVar("AZURE_POWERBI_WORKSPACE_ID"),
)
4 changes: 4 additions & 0 deletions hooli_data_eng/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert
from hooli_data_eng.powerbi_workspace import power_bi_workspace

from databricks.sdk import WorkspaceClient
from dagster_databricks import PipesDatabricksClient
Expand Down Expand Up @@ -58,6 +59,7 @@

resource_def = {
"LOCAL": {
"power_bi": power_bi_workspace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is something we can do here to mock it out so folks who clone and run this locally without the powerBI creds still work?

(Speaking as someone who will want to be able to run the demo locally without worrying if I have sourced the power BI creds correctly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That 100% makes sense -- I'll look into mocking it, I guess the only question is if we want to mock the power bi assets locally, I'd lean towards leaving it all off from local dev if possible, but I'm not exactly sure how that would work. Maybe @benpankow you have some ideas?

"io_manager": DuckDBPandasIOManager(
database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")
),
Expand All @@ -77,6 +79,7 @@
"pipes_k8s_client": ResourceDefinition.none_resource(),
},
"BRANCH": {
"power_bi": power_bi_workspace,
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2_BRANCH",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
Expand All @@ -102,6 +105,7 @@
"pipes_k8s_client": PipesK8sClient(),
},
"PROD": {
"power_bi": power_bi_workspace,
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"requests_toolbelt",
"html5lib",
"scikit-learn",
"dagster-powerbi==0.0.11"
cnolanminich marked this conversation as resolved.
Show resolved Hide resolved
],
extras_require={"dev": ["dagster-webserver", "pytest"],
"sling": ["dagster-embedded-elt"]},
Expand Down
Loading