Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-powerbi] add ability to reload semantic models #23590

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 127 additions & 18 deletions python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re
import time
from typing import Any, Dict, Sequence, Type

import requests
Expand All @@ -7,11 +9,14 @@
Definitions,
_check as check,
external_assets_from_specs,
multi_asset,
)
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._core.definitions.events import Failure
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._utils.cached_method import cached_method
from pydantic import Field

Expand All @@ -22,7 +27,12 @@
PowerBIWorkspaceData,
)

BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/"
BASE_API_URL = "https://api.powerbi.com/v1.0/myorg"


def _clean_op_name(name: str) -> str:
"""Cleans an input to be a valid Dagster op name."""
return re.sub(r"[^a-z0-9A-Z]+", "_", name)


class PowerBIWorkspace(ConfigurableResource):
Expand All @@ -32,8 +42,16 @@ class PowerBIWorkspace(ConfigurableResource):

api_token: str = Field(..., description="An API token used to connect to PowerBI.")
workspace_id: str = Field(..., description="The ID of the PowerBI group to use.")
refresh_poll_interval: int = Field(
default=5, description="The interval in seconds to poll for refresh status."
)
refresh_timeout: int = Field(
default=300, description="The maximum time in seconds to wait for a refresh to complete."
)

def fetch_json(self, endpoint: str) -> Dict[str, Any]:
def fetch(
self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True
) -> requests.Response:
"""Fetch JSON data from the PowerBI API. Raises an exception if the request fails.

Args:
Expand All @@ -46,11 +64,53 @@ def fetch_json(self, endpoint: str) -> Dict[str, Any]:
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_token}",
}
response = requests.get(
f"{BASE_API_URL}/groups/{self.workspace_id}/{endpoint}", headers=headers
base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL
response = requests.request(
method=method,
url=f"{base_url}/{endpoint}",
headers=headers,
json=json,
allow_redirects=True,
)
response.raise_for_status()
return response.json()
return response

def fetch_json(
self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True
) -> Dict[str, Any]:
return self.fetch(endpoint, method, json, group_scoped=group_scoped).json()

def trigger_refresh(self, dataset_id: str) -> None:
"""Triggers a refresh of a PowerBI dataset."""
response = self.fetch(
method="POST",
endpoint=f"datasets/{dataset_id}/refreshes",
json={"notifyOption": "NoNotification"},
group_scoped=False,
)
if response.status_code != 202:
raise Failure(f"Refresh failed to start: {response.content}")

def poll_refresh(self, dataset_id: str) -> None:
"""Polls the refresh status of a PowerBI dataset until it completes or fails."""
status = None

start = time.monotonic()
while status not in ["Completed", "Failed"]:
if time.monotonic() - start > self.refresh_timeout:
raise Failure(f"Refresh timed out after {self.refresh_timeout} seconds.")

last_refresh = self.fetch_json(
f"datasets/{dataset_id}/refreshes",
group_scoped=False,
)["value"][0]
status = last_refresh["status"]

time.sleep(self.refresh_poll_interval)

if status == "Failed":
error = last_refresh.get("serviceExceptionJson")
raise Failure(f"Refresh failed: {error}")

@cached_method
def get_reports(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -129,6 +189,7 @@ def fetch_powerbi_workspace_data(
def build_assets(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator],
enable_refresh_semantic_models: bool,
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
the workspace and translates it into AssetSpecs, using the provided translator.
Expand All @@ -141,10 +202,18 @@ def build_assets(
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Power BI content.
"""
return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)]
return [
PowerBICacheableAssetsDefinition(
self,
dagster_powerbi_translator,
enable_refresh_semantic_models=enable_refresh_semantic_models,
)
]

def build_defs(
self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
enable_refresh_semantic_models: bool = False,
) -> Definitions:
"""Returns a Definitions object which will load Power BI content from
the workspace and translate it into assets, using the provided translator.
Expand All @@ -157,14 +226,23 @@ def build_defs(
Definitions: A Definitions object which will build and return the Power BI content.
"""
return Definitions(
assets=self.build_assets(dagster_powerbi_translator=dagster_powerbi_translator)
assets=self.build_assets(
dagster_powerbi_translator=dagster_powerbi_translator,
enable_refresh_semantic_models=enable_refresh_semantic_models,
)
)


class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]):
def __init__(
self,
workspace: PowerBIWorkspace,
translator: Type[DagsterPowerBITranslator],
enable_refresh_semantic_models: bool,
):
self._workspace = workspace
self._translator_cls = translator
self._enable_refresh_semantic_models = enable_refresh_semantic_models
super().__init__(unique_id=self._workspace.workspace_id)

def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
Expand All @@ -180,7 +258,8 @@ def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
]

def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
self,
data: Sequence[AssetsDefinitionCacheableData],
) -> Sequence[AssetsDefinition]:
workspace_data = PowerBIWorkspaceData.from_content_data(
self._workspace.workspace_id,
Expand All @@ -192,13 +271,43 @@ def build_definitions(

translator = self._translator_cls(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
if self._enable_refresh_semantic_models:
all_external_data = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]
all_executable_data = [*workspace_data.semantic_models_by_id.values()]
else:
all_external_data = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.data_sources_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
]
all_executable_data = []

all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
]
all_executable_asset_specs = [
translator.get_asset_spec(content) for content in all_executable_data
]

return external_assets_from_specs(
[translator.get_asset_spec(content) for content in all_content]
)
executable_assets = []
for content, spec in zip(all_executable_data, all_executable_asset_specs):
dataset_id = content.properties["id"]

@multi_asset(
specs=[spec],
name="_".join(spec.key.path),
resource_defs={"power_bi": self._workspace.get_resource_definition()},
)
def asset_fn(context: AssetExecutionContext, power_bi: PowerBIWorkspace) -> None:
power_bi.trigger_refresh(dataset_id)
power_bi.poll_refresh(dataset_id)
context.log.info("Refresh completed.")

executable_assets.append(asset_fn)

return [*external_assets_from_specs(all_external_asset_specs), *executable_assets]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def _remove_file_ext(name: str) -> str:

def _clean_asset_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^a-z0-9A-Z.]+", "_", name)
return re.sub(r"[^A-Za-z0-9_]+", "_", name)


class PowerBIContentType(Enum):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import uuid

import pytest
import responses
from dagster import materialize
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository
from dagster._core.events import DagsterEventType
from dagster._core.execution.api import create_execution_plan, execute_plan
from dagster._core.instance_for_test import instance_for_test
from dagster._utils import file_relative_path
from dagster_powerbi import PowerBIWorkspace
from dagster_powerbi.resource import BASE_API_URL

from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL


def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_id: str) -> None:
Expand Down Expand Up @@ -56,11 +61,60 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:

data_source_keys = {spec.key for spec in data_source_assets}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
AssetKey(["data_27_09_2019_xlsx"]),
AssetKey(["sales_marketing_datas_xlsx"]),
}


@pytest.mark.parametrize("success", [True, False])
def test_refreshable_semantic_model(
workspace_data_api_mocks: responses.RequestsMock, workspace_id: str, success: bool
) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token, workspace_id=workspace_id, refresh_poll_interval=0
)
all_assets = (
resource.build_defs(enable_refresh_semantic_models=True).get_asset_graph().assets_defs
)

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_assets) == 5

semantic_model_asset = next(
asset for asset in all_assets if asset.key.path[0] == "semantic_model"
)
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
assert semantic_model_asset.is_executable

# materialize the semantic model

workspace_data_api_mocks.add(
method=responses.POST,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={"notifyOption": "NoNotification"},
status=202,
)

workspace_data_api_mocks.add(
method=responses.GET,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={"value": [{"status": "Unknown"}]},
status=200,
)
workspace_data_api_mocks.add(
method=responses.GET,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={
"value": [{"status": "Completed" if success else "Failed", "serviceExceptionJson": {}}]
},
status=200,
)

result = materialize([semantic_model_asset], raise_on_error=False)
assert result.success is success


def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None:
with instance_for_test() as instance:
assert len(workspace_data_api_mocks.calls) == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
assert asset_spec.tags == {"dagster/storage_kind": "powerbi"}
deps = list(asset_spec.deps)
assert len(deps) == 2
assert deps[0].asset_key == AssetKey(["data_27_09_2019.xlsx"])
assert deps[1].asset_key == AssetKey(["sales_marketing_datas.xlsx"])
assert deps[0].asset_key == AssetKey(["data_27_09_2019_xlsx"])
assert deps[1].asset_key == AssetKey(["sales_marketing_datas_xlsx"])


class MyCustomTranslator(DagsterPowerBITranslator):
Expand Down