diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 6255a9db1f173..96bd91c0748e2 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -3,10 +3,11 @@ import time from dataclasses import dataclass from functools import cached_property -from typing import Any, Dict, Optional, Type, cast +from typing import Any, Dict, Mapping, Optional, Type, cast import requests from dagster import ConfigurableResource, Definitions, external_assets_from_specs, multi_asset +from dagster._annotations import public from dagster._config.pythonic_config.resource import ResourceDependency from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.events import Failure @@ -98,10 +99,10 @@ class PowerBIWorkspace(ConfigurableResource): ) @cached_property - def api_token(self) -> str: + def _api_token(self) -> str: return self.credentials.api_token - def fetch( + def _fetch( self, endpoint: str, method: str = "GET", @@ -118,7 +119,7 @@ def fetch( """ headers = { "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_token}", + "Authorization": f"Bearer {self._api_token}", } base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL response = requests.request( @@ -131,18 +132,19 @@ def fetch( response.raise_for_status() return response - def fetch_json( + def _fetch_json( self, endpoint: str, method: str = "GET", json: Any = None, group_scoped: bool = True, ) -> Dict[str, Any]: - return self.fetch(endpoint, method, json, group_scoped=group_scoped).json() + return self._fetch(endpoint, method, json, group_scoped=group_scoped).json() + @public def trigger_refresh(self, dataset_id: str) -> None: """Triggers a refresh of a PowerBI dataset.""" - response = self.fetch( + response = self._fetch( method="POST", endpoint=f"datasets/{dataset_id}/refreshes", json={"notifyOption": "NoNotification"}, @@ -151,6 +153,7 @@ def trigger_refresh(self, dataset_id: str) -> None: if response.status_code != 202: raise Failure(f"Refresh failed to start: {response.content}") + @public def poll_refresh(self, dataset_id: str) -> None: """Polls the refresh status of a PowerBI dataset until it completes or fails.""" status = None @@ -160,7 +163,7 @@ def poll_refresh(self, dataset_id: str) -> None: if time.monotonic() - start > self.refresh_timeout: raise Failure(f"Refresh timed out after {self.refresh_timeout} seconds.") - last_refresh = self.fetch_json( + last_refresh = self._fetch_json( f"datasets/{dataset_id}/refreshes", group_scoped=False, )["value"][0] @@ -173,50 +176,42 @@ def poll_refresh(self, dataset_id: str) -> None: raise Failure(f"Refresh failed: {error}") @cached_method - def get_reports(self) -> Dict[str, Any]: + def _get_reports(self) -> Mapping[str, Any]: """Fetches a list of all PowerBI reports in the workspace.""" - return self.fetch_json("reports") + return self._fetch_json("reports") @cached_method - def get_semantic_models(self) -> Dict[str, Any]: + def _get_semantic_models(self) -> Mapping[str, Any]: """Fetches a list of all PowerBI semantic models in the workspace.""" - return self.fetch_json("datasets") + return self._fetch_json("datasets") @cached_method - def get_semantic_model_sources( - self, - dataset_id: str, - ) -> Dict[str, Any]: + def _get_semantic_model_sources(self, dataset_id: str) -> Mapping[str, Any]: """Fetches a list of all data sources for a given semantic model.""" - return self.fetch_json(f"datasets/{dataset_id}/datasources") + return self._fetch_json(f"datasets/{dataset_id}/datasources") @cached_method - def get_dashboards(self) -> Dict[str, Any]: + def _get_dashboards(self) -> Mapping[str, Any]: """Fetches a list of all PowerBI dashboards in the workspace.""" - return self.fetch_json("dashboards") + return self._fetch_json("dashboards") @cached_method - def get_dashboard_tiles( - self, - dashboard_id: str, - ) -> Dict[str, Any]: + def _get_dashboard_tiles(self, dashboard_id: str) -> Mapping[str, Any]: """Fetches a list of all tiles for a given PowerBI dashboard, including which reports back each tile. """ - return self.fetch_json(f"dashboards/{dashboard_id}/tiles") + return self._fetch_json(f"dashboards/{dashboard_id}/tiles") - def fetch_powerbi_workspace_data( - self, - ) -> PowerBIWorkspaceData: + def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData: """Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object. Future work will cache this data to avoid repeated calls to the Power BI API. Returns: PowerBIWorkspaceData: A snapshot of the Power BI workspace's content. """ - dashboard_data = self.get_dashboards()["value"] + dashboard_data = self._get_dashboards()["value"] augmented_dashboard_data = [ - {**dashboard, "tiles": self.get_dashboard_tiles(dashboard["id"])["value"]} + {**dashboard, "tiles": self._get_dashboard_tiles(dashboard["id"])["value"]} for dashboard in dashboard_data ] dashboards = [ @@ -226,12 +221,12 @@ def fetch_powerbi_workspace_data( reports = [ PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data) - for data in self.get_reports()["value"] + for data in self._get_reports()["value"] ] - semantic_models_data = self.get_semantic_models()["value"] + semantic_models_data = self._get_semantic_models()["value"] data_sources_by_id = {} for dataset in semantic_models_data: - dataset_sources = self.get_semantic_model_sources(dataset["id"])["value"] + dataset_sources = self._get_semantic_model_sources(dataset["id"])["value"] dataset["sources"] = [source["datasourceId"] for source in dataset_sources] for data_source in dataset_sources: data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData( @@ -246,6 +241,7 @@ def fetch_powerbi_workspace_data( dashboards + reports + semantic_models + list(data_sources_by_id.values()), ) + @public def build_defs( self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, @@ -284,7 +280,7 @@ def defs_key(self) -> str: def fetch_state(self) -> PowerBIWorkspaceData: with self.workspace.process_config_and_initialize_cm() as initialized_workspace: - return initialized_workspace.fetch_powerbi_workspace_data() + return initialized_workspace._fetch_powerbi_workspace_data() # noqa: SLF001 def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: translator = self.translator_cls(context=state) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 293e2c0b1a9c4..24208379bccd8 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -27,7 +27,7 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_ workspace_id=workspace_id, ) - actual_workspace_data = resource.fetch_powerbi_workspace_data() + actual_workspace_data = resource._fetch_powerbi_workspace_data() # noqa: SLF001 assert len(actual_workspace_data.dashboards_by_id) == 1 assert len(actual_workspace_data.reports_by_id) == 1 assert len(actual_workspace_data.semantic_models_by_id) == 1 diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py index aa01fec8850a7..181a29705772d 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py @@ -26,7 +26,7 @@ def test_basic_resource_request() -> None: status=200, ) - resource.get_reports() + resource._get_reports() # noqa: SLF001 assert len(responses.calls) == 1 assert responses.calls[0].request.headers["Authorization"] == f"Bearer {fake_token}" @@ -63,7 +63,7 @@ def test_service_principal_auth() -> None: status=200, ) - resource.get_reports() + resource._get_reports() # noqa: SLF001 assert len(responses.calls) == 2 assert fake_client_id in responses.calls[0].request.body