Skip to content

Commit

Permalink
[dagster-powerbi] add ability to reload semantic models
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 12, 2024
1 parent a8b1aa8 commit fae8ffe
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 21 deletions.
138 changes: 121 additions & 17 deletions python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re
import time
from typing import Any, Dict, Sequence, Type

import requests
Expand All @@ -6,11 +8,14 @@
ConfigurableResource,
_check as check,
external_assets_from_specs,
multi_asset,
)
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._core.definitions.events import Failure
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._utils.cached_method import cached_method
from pydantic import Field

Expand All @@ -21,7 +26,12 @@
PowerBIWorkspaceData,
)

BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/"
BASE_API_URL = "https://api.powerbi.com/v1.0/myorg"


def _clean_op_name(name: str) -> str:
"""Cleans an input to be a valid Dagster op name."""
return re.sub(r"[^a-z0-9A-Z]+", "_", name)


class PowerBIWorkspace(ConfigurableResource):
Expand All @@ -31,8 +41,16 @@ class PowerBIWorkspace(ConfigurableResource):

api_token: str = Field(..., description="An API token used to connect to PowerBI.")
workspace_id: str = Field(..., description="The ID of the PowerBI group to use.")
refresh_poll_interval: int = Field(
default=5, description="The interval in seconds to poll for refresh status."
)
refresh_timeout: int = Field(
default=300, description="The maximum time in seconds to wait for a refresh to complete."
)

def fetch_json(self, endpoint: str) -> Dict[str, Any]:
def fetch(
self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True
) -> requests.Response:
"""Fetch JSON data from the PowerBI API. Raises an exception if the request fails.
Args:
Expand All @@ -45,11 +63,53 @@ def fetch_json(self, endpoint: str) -> Dict[str, Any]:
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_token}",
}
response = requests.get(
f"{BASE_API_URL}/groups/{self.workspace_id}/{endpoint}", headers=headers
base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL
response = requests.request(
method=method,
url=f"{base_url}/{endpoint}",
headers=headers,
json=json,
allow_redirects=True,
)
response.raise_for_status()
return response.json()
return response

def fetch_json(
self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True
) -> Dict[str, Any]:
return self.fetch(endpoint, method, json, group_scoped=group_scoped).json()

def trigger_refresh(self, dataset_id: str) -> None:
"""Triggers a refresh of a PowerBI dataset."""
response = self.fetch(
method="POST",
endpoint=f"datasets/{dataset_id}/refreshes",
json={"notifyOption": "NoNotification"},
group_scoped=False,
)
if response.status_code != 202:
raise Failure(f"Refresh failed to start: {response.content}")

def poll_refresh(self, dataset_id: str) -> None:
"""Polls the refresh status of a PowerBI dataset until it completes or fails."""
status = None

start = time.monotonic()
while status not in ["Completed", "Failed"]:
if time.monotonic() - start > self.refresh_timeout:
raise Failure(f"Refresh timed out after {self.refresh_timeout} seconds.")

last_refresh = self.fetch_json(
f"datasets/{dataset_id}/refreshes",
group_scoped=False,
)["value"][0]
status = last_refresh["status"]

time.sleep(self.refresh_poll_interval)

if status == "Failed":
error = last_refresh.get("serviceExceptionJson")
raise Failure(f"Refresh failed: {error}")

@cached_method
def get_reports(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -128,6 +188,7 @@ def fetch_powerbi_workspace_data(
def build_assets(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
enable_refresh_semantic_models: bool = False,
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
the workspace and translates it into AssetSpecs, using the provided translator.
Expand All @@ -140,13 +201,25 @@ def build_assets(
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Power BI content.
"""
return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)]
return [
PowerBICacheableAssetsDefinition(
self,
dagster_powerbi_translator,
enable_refresh_semantic_models=enable_refresh_semantic_models,
)
]


class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]):
def __init__(
self,
workspace: PowerBIWorkspace,
translator: Type[DagsterPowerBITranslator],
enable_refresh_semantic_models: bool,
):
self._workspace = workspace
self._translator_cls = translator
self._enable_refresh_semantic_models = enable_refresh_semantic_models
super().__init__(unique_id=self._workspace.workspace_id)

def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
Expand All @@ -161,8 +234,9 @@ def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
]
]

def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
def build_assets(
self,
data: Sequence[AssetsDefinitionCacheableData],
) -> Sequence[AssetsDefinition]:
workspace_data = PowerBIWorkspaceData.from_content_data(
self._workspace.workspace_id,
Expand All @@ -174,13 +248,43 @@ def build_definitions(

translator = self._translator_cls(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
if self._enable_refresh_semantic_models:
all_external_data = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]
all_executable_data = [*workspace_data.semantic_models_by_id.values()]
else:
all_external_data = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.data_sources_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
]
all_executable_data = []

all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
]
all_executable_asset_specs = [
translator.get_asset_spec(content) for content in all_executable_data
]

return external_assets_from_specs(
[translator.get_asset_spec(content) for content in all_content]
)
executable_assets = []
for content, spec in zip(all_executable_data, all_executable_asset_specs):
dataset_id = content.properties["id"]

@multi_asset(
specs=[spec],
name="_".join(spec.key.path),
resource_defs={"power_bi": self._workspace.get_resource_definition()},
)
def asset_fn(context: AssetExecutionContext, power_bi: PowerBIWorkspace) -> None:
power_bi.trigger_refresh(dataset_id)
power_bi.poll_refresh(dataset_id)
context.log.info("Refresh completed.")

executable_assets.append(asset_fn)

return [*external_assets_from_specs(all_external_asset_specs), *executable_assets]
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _remove_file_ext(name: str) -> str:

def _clean_asset_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^a-z0-9A-Z.]+", "_", name)
return re.sub(r"[^A-Za-z0-9_]+", "_", name)


T = TypeVar("T")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import uuid

import pytest
import responses
from dagster import materialize
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository
from dagster._core.events import DagsterEventType
from dagster._core.execution.api import create_execution_plan, execute_plan
from dagster._core.instance_for_test import instance_for_test
from dagster._utils import file_relative_path
from dagster_powerbi import PowerBIWorkspace
from dagster_powerbi.resource import BASE_API_URL

from .conftest import SAMPLE_SEMANTIC_MODEL


def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_id: str) -> None:
Expand All @@ -32,7 +37,7 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
)
cacheable_asset = resource.build_assets()[0]
data = cacheable_asset.compute_cacheable_data()
all_assets = cacheable_asset.build_definitions(data)
all_assets = cacheable_asset.build_assets(data)

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_assets) == 5
Expand All @@ -58,11 +63,60 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:

data_source_keys = {spec.key for spec in data_source_assets}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
AssetKey(["data_27_09_2019_xlsx"]),
AssetKey(["sales_marketing_datas_xlsx"]),
}


@pytest.mark.parametrize("success", [True, False])
def test_refreshable_semantic_model(
workspace_data_api_mocks: responses.RequestsMock, workspace_id: str, success: bool
) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token, workspace_id=workspace_id, refresh_poll_interval=0
)
cacheable_asset = resource.build_assets(enable_refresh_semantic_models=True)[0]
data = cacheable_asset.compute_cacheable_data()
all_assets = cacheable_asset.build_assets(data)

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_assets) == 5

semantic_model_asset = next(
asset for asset in all_assets if asset.key.path[0] == "semantic_model"
)
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
assert semantic_model_asset.is_executable

# materialize the semantic model

workspace_data_api_mocks.add(
method=responses.POST,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={"notifyOption": "NoNotification"},
status=202,
)

workspace_data_api_mocks.add(
method=responses.GET,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={"value": [{"status": "Unknown"}]},
status=200,
)
workspace_data_api_mocks.add(
method=responses.GET,
url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes",
json={
"value": [{"status": "Completed" if success else "Failed", "serviceExceptionJson": {}}]
},
status=200,
)

result = materialize([semantic_model_asset], raise_on_error=False)
assert result.success is success


def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None:
with instance_for_test() as instance:
assert len(workspace_data_api_mocks.calls) == 0
Expand Down

0 comments on commit fae8ffe

Please sign in to comment.