From 114288cee4d3e8561ef3b52d9696aa22e14895bc Mon Sep 17 00:00:00 2001 From: benpankow Date: Mon, 12 Aug 2024 16:59:54 -0400 Subject: [PATCH 1/2] [dagster-powerbi] add ability to reload semantic models --- .../dagster_powerbi/resource.py | 147 +++++++++++++++--- .../dagster_powerbi/translator.py | 2 +- .../dagster_powerbi_tests/test_asset_specs.py | 58 ++++++- 3 files changed, 185 insertions(+), 22 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index e6bc5e8ce3178..89c250922997e 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -1,3 +1,5 @@ +import re +import time from typing import Any, Dict, Sequence, Type import requests @@ -7,11 +9,14 @@ Definitions, _check as check, external_assets_from_specs, + multi_asset, ) from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, CacheableAssetsDefinition, ) +from dagster._core.definitions.events import Failure +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext from dagster._utils.cached_method import cached_method from pydantic import Field @@ -22,7 +27,12 @@ PowerBIWorkspaceData, ) -BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/" +BASE_API_URL = "https://api.powerbi.com/v1.0/myorg" + + +def _clean_op_name(name: str) -> str: + """Cleans an input to be a valid Dagster op name.""" + return re.sub(r"[^a-z0-9A-Z]+", "_", name) class PowerBIWorkspace(ConfigurableResource): @@ -32,8 +42,16 @@ class PowerBIWorkspace(ConfigurableResource): api_token: str = Field(..., description="An API token used to connect to PowerBI.") workspace_id: str = Field(..., description="The ID of the PowerBI group to use.") + refresh_poll_interval: int = Field( + default=5, description="The interval in seconds to poll for refresh status." + ) + refresh_timeout: int = Field( + default=300, description="The maximum time in seconds to wait for a refresh to complete." + ) - def fetch_json(self, endpoint: str) -> Dict[str, Any]: + def fetch( + self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True + ) -> requests.Response: """Fetch JSON data from the PowerBI API. Raises an exception if the request fails. Args: @@ -46,11 +64,53 @@ def fetch_json(self, endpoint: str) -> Dict[str, Any]: "Content-Type": "application/json", "Authorization": f"Bearer {self.api_token}", } - response = requests.get( - f"{BASE_API_URL}/groups/{self.workspace_id}/{endpoint}", headers=headers + base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL + response = requests.request( + method=method, + url=f"{base_url}/{endpoint}", + headers=headers, + json=json, + allow_redirects=True, ) response.raise_for_status() - return response.json() + return response + + def fetch_json( + self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True + ) -> Dict[str, Any]: + return self.fetch(endpoint, method, json, group_scoped=group_scoped).json() + + def trigger_refresh(self, dataset_id: str) -> None: + """Triggers a refresh of a PowerBI dataset.""" + response = self.fetch( + method="POST", + endpoint=f"datasets/{dataset_id}/refreshes", + json={"notifyOption": "NoNotification"}, + group_scoped=False, + ) + if response.status_code != 202: + raise Failure(f"Refresh failed to start: {response.content}") + + def poll_refresh(self, dataset_id: str) -> None: + """Polls the refresh status of a PowerBI dataset until it completes or fails.""" + status = None + + start = time.monotonic() + while status not in ["Completed", "Failed"]: + if time.monotonic() - start > self.refresh_timeout: + raise Failure(f"Refresh timed out after {self.refresh_timeout} seconds.") + + last_refresh = self.fetch_json( + f"datasets/{dataset_id}/refreshes", + group_scoped=False, + )["value"][0] + status = last_refresh["status"] + + time.sleep(self.refresh_poll_interval) + + if status == "Failed": + error = last_refresh.get("serviceExceptionJson") + raise Failure(f"Refresh failed: {error}") @cached_method def get_reports(self) -> Dict[str, Any]: @@ -129,6 +189,7 @@ def fetch_powerbi_workspace_data( def build_assets( self, dagster_powerbi_translator: Type[DagsterPowerBITranslator], + enable_refresh_semantic_models: bool, ) -> Sequence[CacheableAssetsDefinition]: """Returns a set of CacheableAssetsDefinition which will load Power BI content from the workspace and translates it into AssetSpecs, using the provided translator. @@ -141,10 +202,18 @@ def build_assets( Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which will load the Power BI content. """ - return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)] + return [ + PowerBICacheableAssetsDefinition( + self, + dagster_powerbi_translator, + enable_refresh_semantic_models=enable_refresh_semantic_models, + ) + ] def build_defs( - self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator + self, + dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, + enable_refresh_semantic_models: bool = False, ) -> Definitions: """Returns a Definitions object which will load Power BI content from the workspace and translate it into assets, using the provided translator. @@ -157,14 +226,23 @@ def build_defs( Definitions: A Definitions object which will build and return the Power BI content. """ return Definitions( - assets=self.build_assets(dagster_powerbi_translator=dagster_powerbi_translator) + assets=self.build_assets( + dagster_powerbi_translator=dagster_powerbi_translator, + enable_refresh_semantic_models=enable_refresh_semantic_models, + ) ) class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition): - def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]): + def __init__( + self, + workspace: PowerBIWorkspace, + translator: Type[DagsterPowerBITranslator], + enable_refresh_semantic_models: bool, + ): self._workspace = workspace self._translator_cls = translator + self._enable_refresh_semantic_models = enable_refresh_semantic_models super().__init__(unique_id=self._workspace.workspace_id) def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: @@ -179,8 +257,9 @@ def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: ] ] - def build_definitions( - self, data: Sequence[AssetsDefinitionCacheableData] + def build_assets( + self, + data: Sequence[AssetsDefinitionCacheableData], ) -> Sequence[AssetsDefinition]: workspace_data = PowerBIWorkspaceData.from_content_data( self._workspace.workspace_id, @@ -192,13 +271,43 @@ def build_definitions( translator = self._translator_cls(context=workspace_data) - all_content = [ - *workspace_data.dashboards_by_id.values(), - *workspace_data.reports_by_id.values(), - *workspace_data.semantic_models_by_id.values(), - *workspace_data.data_sources_by_id.values(), + if self._enable_refresh_semantic_models: + all_external_data = [ + *workspace_data.dashboards_by_id.values(), + *workspace_data.reports_by_id.values(), + *workspace_data.data_sources_by_id.values(), + ] + all_executable_data = [*workspace_data.semantic_models_by_id.values()] + else: + all_external_data = [ + *workspace_data.dashboards_by_id.values(), + *workspace_data.reports_by_id.values(), + *workspace_data.data_sources_by_id.values(), + *workspace_data.semantic_models_by_id.values(), + ] + all_executable_data = [] + + all_external_asset_specs = [ + translator.get_asset_spec(content) for content in all_external_data + ] + all_executable_asset_specs = [ + translator.get_asset_spec(content) for content in all_executable_data ] - return external_assets_from_specs( - [translator.get_asset_spec(content) for content in all_content] - ) + executable_assets = [] + for content, spec in zip(all_executable_data, all_executable_asset_specs): + dataset_id = content.properties["id"] + + @multi_asset( + specs=[spec], + name="_".join(spec.key.path), + resource_defs={"power_bi": self._workspace.get_resource_definition()}, + ) + def asset_fn(context: AssetExecutionContext, power_bi: PowerBIWorkspace) -> None: + power_bi.trigger_refresh(dataset_id) + power_bi.poll_refresh(dataset_id) + context.log.info("Refresh completed.") + + executable_assets.append(asset_fn) + + return [*external_assets_from_specs(all_external_asset_specs), *executable_assets] diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 1c81b0c6a3ec4..f0c881c0485e3 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -23,7 +23,7 @@ def _remove_file_ext(name: str) -> str: def _clean_asset_name(name: str) -> str: """Cleans an input to be a valid Dagster asset name.""" - return re.sub(r"[^a-z0-9A-Z.]+", "_", name) + return re.sub(r"[^A-Za-z0-9_]+", "_", name) class PowerBIContentType(Enum): diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index f160d26da6ed3..1cee7998ddaa1 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -1,6 +1,8 @@ import uuid +import pytest import responses +from dagster import materialize from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository from dagster._core.events import DagsterEventType @@ -8,6 +10,9 @@ from dagster._core.instance_for_test import instance_for_test from dagster._utils import file_relative_path from dagster_powerbi import PowerBIWorkspace +from dagster_powerbi.resource import BASE_API_URL + +from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_id: str) -> None: @@ -56,11 +61,60 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: data_source_keys = {spec.key for spec in data_source_assets} assert data_source_keys == { - AssetKey(["data_27_09_2019.xlsx"]), - AssetKey(["sales_marketing_datas.xlsx"]), + AssetKey(["data_27_09_2019_xlsx"]), + AssetKey(["sales_marketing_datas_xlsx"]), } +@pytest.mark.parametrize("success", [True, False]) +def test_refreshable_semantic_model( + workspace_data_api_mocks: responses.RequestsMock, workspace_id: str, success: bool +) -> None: + fake_token = uuid.uuid4().hex + resource = PowerBIWorkspace( + api_token=fake_token, workspace_id=workspace_id, refresh_poll_interval=0 + ) + all_assets = ( + resource.build_defs(enable_refresh_semantic_models=True).get_asset_graph().assets_defs + ) + + # 1 dashboard, 1 report, 1 semantic model, 2 data sources + assert len(all_assets) == 5 + + semantic_model_asset = next( + asset for asset in all_assets if asset.key.path[0] == "semantic_model" + ) + assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] + assert semantic_model_asset.is_executable + + # materialize the semantic model + + workspace_data_api_mocks.add( + method=responses.POST, + url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes", + json={"notifyOption": "NoNotification"}, + status=202, + ) + + workspace_data_api_mocks.add( + method=responses.GET, + url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes", + json={"value": [{"status": "Unknown"}]}, + status=200, + ) + workspace_data_api_mocks.add( + method=responses.GET, + url=f"{BASE_API_URL}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/refreshes", + json={ + "value": [{"status": "Completed" if success else "Failed", "serviceExceptionJson": {}}] + }, + status=200, + ) + + result = materialize([semantic_model_asset], raise_on_error=False) + assert result.success is success + + def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None: with instance_for_test() as instance: assert len(workspace_data_api_mocks.calls) == 0 From 68ff9429f73fc3e869aba8a448e4f16c10a3d992 Mon Sep 17 00:00:00 2001 From: benpankow Date: Tue, 13 Aug 2024 09:02:07 -0400 Subject: [PATCH 2/2] fix --- .../libraries/dagster-powerbi/dagster_powerbi/resource.py | 2 +- .../dagster-powerbi/dagster_powerbi_tests/test_translator.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 89c250922997e..8d6ff99c92d39 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -257,7 +257,7 @@ def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: ] ] - def build_assets( + def build_definitions( self, data: Sequence[AssetsDefinitionCacheableData], ) -> Sequence[AssetsDefinition]: diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index a9d06118f75be..2d688c37d45c3 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -40,8 +40,8 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None assert asset_spec.tags == {"dagster/storage_kind": "powerbi"} deps = list(asset_spec.deps) assert len(deps) == 2 - assert deps[0].asset_key == AssetKey(["data_27_09_2019.xlsx"]) - assert deps[1].asset_key == AssetKey(["sales_marketing_datas.xlsx"]) + assert deps[0].asset_key == AssetKey(["data_27_09_2019_xlsx"]) + assert deps[1].asset_key == AssetKey(["sales_marketing_datas_xlsx"]) class MyCustomTranslator(DagsterPowerBITranslator):