Skip to content

Commit

Permalink
publicize and privatize PowerBIWorkspace methods (#24795)
Browse files Browse the repository at this point in the history
## Summary & Motivation

On `PowerBIWorkspace`:
- Mark public methods with `@public`
- Make other methods private, so they won't so up in auto-complete

Also change some `Dict`s to `Mapping`s.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
sryza authored Sep 27, 2024
1 parent 7998da4 commit aa5236a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import time
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Dict, Optional, Type, cast
from typing import Any, Dict, Mapping, Optional, Type, cast

import requests
from dagster import ConfigurableResource, Definitions, external_assets_from_specs, multi_asset
from dagster._annotations import public
from dagster._config.pythonic_config.resource import ResourceDependency
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.events import Failure
Expand Down Expand Up @@ -98,10 +99,10 @@ class PowerBIWorkspace(ConfigurableResource):
)

@cached_property
def api_token(self) -> str:
def _api_token(self) -> str:
return self.credentials.api_token

def fetch(
def _fetch(
self,
endpoint: str,
method: str = "GET",
Expand All @@ -118,7 +119,7 @@ def fetch(
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_token}",
"Authorization": f"Bearer {self._api_token}",
}
base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL
response = requests.request(
Expand All @@ -131,18 +132,19 @@ def fetch(
response.raise_for_status()
return response

def fetch_json(
def _fetch_json(
self,
endpoint: str,
method: str = "GET",
json: Any = None,
group_scoped: bool = True,
) -> Dict[str, Any]:
return self.fetch(endpoint, method, json, group_scoped=group_scoped).json()
return self._fetch(endpoint, method, json, group_scoped=group_scoped).json()

@public
def trigger_refresh(self, dataset_id: str) -> None:
"""Triggers a refresh of a PowerBI dataset."""
response = self.fetch(
response = self._fetch(
method="POST",
endpoint=f"datasets/{dataset_id}/refreshes",
json={"notifyOption": "NoNotification"},
Expand All @@ -151,6 +153,7 @@ def trigger_refresh(self, dataset_id: str) -> None:
if response.status_code != 202:
raise Failure(f"Refresh failed to start: {response.content}")

@public
def poll_refresh(self, dataset_id: str) -> None:
"""Polls the refresh status of a PowerBI dataset until it completes or fails."""
status = None
Expand All @@ -160,7 +163,7 @@ def poll_refresh(self, dataset_id: str) -> None:
if time.monotonic() - start > self.refresh_timeout:
raise Failure(f"Refresh timed out after {self.refresh_timeout} seconds.")

last_refresh = self.fetch_json(
last_refresh = self._fetch_json(
f"datasets/{dataset_id}/refreshes",
group_scoped=False,
)["value"][0]
Expand All @@ -173,50 +176,42 @@ def poll_refresh(self, dataset_id: str) -> None:
raise Failure(f"Refresh failed: {error}")

@cached_method
def get_reports(self) -> Dict[str, Any]:
def _get_reports(self) -> Mapping[str, Any]:
"""Fetches a list of all PowerBI reports in the workspace."""
return self.fetch_json("reports")
return self._fetch_json("reports")

@cached_method
def get_semantic_models(self) -> Dict[str, Any]:
def _get_semantic_models(self) -> Mapping[str, Any]:
"""Fetches a list of all PowerBI semantic models in the workspace."""
return self.fetch_json("datasets")
return self._fetch_json("datasets")

@cached_method
def get_semantic_model_sources(
self,
dataset_id: str,
) -> Dict[str, Any]:
def _get_semantic_model_sources(self, dataset_id: str) -> Mapping[str, Any]:
"""Fetches a list of all data sources for a given semantic model."""
return self.fetch_json(f"datasets/{dataset_id}/datasources")
return self._fetch_json(f"datasets/{dataset_id}/datasources")

@cached_method
def get_dashboards(self) -> Dict[str, Any]:
def _get_dashboards(self) -> Mapping[str, Any]:
"""Fetches a list of all PowerBI dashboards in the workspace."""
return self.fetch_json("dashboards")
return self._fetch_json("dashboards")

@cached_method
def get_dashboard_tiles(
self,
dashboard_id: str,
) -> Dict[str, Any]:
def _get_dashboard_tiles(self, dashboard_id: str) -> Mapping[str, Any]:
"""Fetches a list of all tiles for a given PowerBI dashboard,
including which reports back each tile.
"""
return self.fetch_json(f"dashboards/{dashboard_id}/tiles")
return self._fetch_json(f"dashboards/{dashboard_id}/tiles")

def fetch_powerbi_workspace_data(
self,
) -> PowerBIWorkspaceData:
def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData:
"""Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Power BI API.
Returns:
PowerBIWorkspaceData: A snapshot of the Power BI workspace's content.
"""
dashboard_data = self.get_dashboards()["value"]
dashboard_data = self._get_dashboards()["value"]
augmented_dashboard_data = [
{**dashboard, "tiles": self.get_dashboard_tiles(dashboard["id"])["value"]}
{**dashboard, "tiles": self._get_dashboard_tiles(dashboard["id"])["value"]}
for dashboard in dashboard_data
]
dashboards = [
Expand All @@ -226,12 +221,12 @@ def fetch_powerbi_workspace_data(

reports = [
PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data)
for data in self.get_reports()["value"]
for data in self._get_reports()["value"]
]
semantic_models_data = self.get_semantic_models()["value"]
semantic_models_data = self._get_semantic_models()["value"]
data_sources_by_id = {}
for dataset in semantic_models_data:
dataset_sources = self.get_semantic_model_sources(dataset["id"])["value"]
dataset_sources = self._get_semantic_model_sources(dataset["id"])["value"]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources]
for data_source in dataset_sources:
data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData(
Expand All @@ -246,6 +241,7 @@ def fetch_powerbi_workspace_data(
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
)

@public
def build_defs(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
Expand Down Expand Up @@ -284,7 +280,7 @@ def defs_key(self) -> str:

def fetch_state(self) -> PowerBIWorkspaceData:
with self.workspace.process_config_and_initialize_cm() as initialized_workspace:
return initialized_workspace.fetch_powerbi_workspace_data()
return initialized_workspace._fetch_powerbi_workspace_data() # noqa: SLF001

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls(context=state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_
workspace_id=workspace_id,
)

actual_workspace_data = resource.fetch_powerbi_workspace_data()
actual_workspace_data = resource._fetch_powerbi_workspace_data() # noqa: SLF001
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_basic_resource_request() -> None:
status=200,
)

resource.get_reports()
resource._get_reports() # noqa: SLF001

assert len(responses.calls) == 1
assert responses.calls[0].request.headers["Authorization"] == f"Bearer {fake_token}"
Expand Down Expand Up @@ -63,7 +63,7 @@ def test_service_principal_auth() -> None:
status=200,
)

resource.get_reports()
resource._get_reports() # noqa: SLF001

assert len(responses.calls) == 2
assert fake_client_id in responses.calls[0].request.body
Expand Down

0 comments on commit aa5236a

Please sign in to comment.