Skip to content

Commit

Permalink
[dagster-powerbi] Enable loading Power BI data using admin scan APIs (#…
Browse files Browse the repository at this point in the history
…25647)

## Summary

Rather than fetching Power BI information from individual object-level
APIs, this PR introduces the option to "scan" the entire PowerBI
workspace at once, which should hopefully allow us to retrieve data for
a larger instance without blocking on individual object IO & to retrieve
more detailed lineage information for datasets.

## How I Tested These Changes

New unit test, tested against Hooli instance.
  • Loading branch information
benpankow authored Oct 31, 2024
1 parent bb00738 commit 0988d6d
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Dict, Mapping, Optional, Sequence, Type
from urllib.parse import urlencode

import requests
from dagster import (
Expand All @@ -17,6 +18,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.events import Failure
from dagster._time import get_current_timestamp
from dagster._utils.cached_method import cached_method
from dagster._utils.security import non_secure_md5_hash_str
from pydantic import Field, PrivateAttr
Expand All @@ -32,6 +34,8 @@
BASE_API_URL = "https://api.powerbi.com/v1.0/myorg"
POWER_BI_RECONSTRUCTION_METADATA_KEY_PREFIX = "__power_bi"

ADMIN_SCAN_TIMEOUT = 60


def _clean_op_name(name: str) -> str:
"""Cleans an input to be a valid Dagster op name."""
Expand Down Expand Up @@ -122,6 +126,7 @@ def _fetch(
endpoint: str,
method: str = "GET",
json: Any = None,
params: Optional[Dict[str, Any]] = None,
group_scoped: bool = True,
) -> requests.Response:
"""Fetch JSON data from the PowerBI API. Raises an exception if the request fails.
Expand All @@ -137,9 +142,14 @@ def _fetch(
"Authorization": f"Bearer {self._api_token}",
}
base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL
url = f"{base_url}/{endpoint}"
if params:
url_parameters = urlencode(params) if params else None
url = f"{url}?{url_parameters}"

response = requests.request(
method=method,
url=f"{base_url}/{endpoint}",
url=url,
headers=headers,
json=json,
allow_redirects=True,
Expand All @@ -152,9 +162,10 @@ def _fetch_json(
endpoint: str,
method: str = "GET",
json: Any = None,
params: Optional[Dict[str, Any]] = None,
group_scoped: bool = True,
) -> Dict[str, Any]:
return self._fetch(endpoint, method, json, group_scoped=group_scoped).json()
return self._fetch(endpoint, method, json, group_scoped=group_scoped, params=params).json()

@public
def trigger_and_poll_refresh(self, dataset_id: str) -> None:
Expand Down Expand Up @@ -223,13 +234,81 @@ def _get_dashboard_tiles(self, dashboard_id: str) -> Mapping[str, Any]:
"""
return self._fetch_json(f"dashboards/{dashboard_id}/tiles")

def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData:
@cached_method
def _scan(self) -> Mapping[str, Any]:
submission = self._fetch_json(
method="POST",
endpoint="admin/workspaces/getInfo",
group_scoped=False,
json={"workspaces": [self.workspace_id]},
params={
"lineage": "true",
"datasourceDetails": "true",
"datasetSchema": "true",
"datasetExpressions": "true",
},
)
scan_id = submission["id"]

now = get_current_timestamp()
start_time = now

status = None
while status != "Succeeded" and now - start_time < ADMIN_SCAN_TIMEOUT:
scan_details = self._fetch_json(
endpoint=f"admin/workspaces/scanStatus/{scan_id}", group_scoped=False
)
status = scan_details["status"]
time.sleep(0.1)
now = get_current_timestamp()

if status != "Succeeded":
raise Failure(f"Scan not successful after {ADMIN_SCAN_TIMEOUT} seconds: {scan_details}")

return self._fetch_json(
endpoint=f"admin/workspaces/scanResult/{scan_id}", group_scoped=False
)

def _fetch_powerbi_workspace_data(self, use_workspace_scan: bool) -> PowerBIWorkspaceData:
"""Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs
at once to get all content.
Returns:
PowerBIWorkspaceData: A snapshot of the Power BI workspace's content.
"""
if use_workspace_scan:
return self._fetch_powerbi_workspace_data_scan()
return self._fetch_powerbi_workspace_data_legacy()

def _fetch_powerbi_workspace_data_scan(self) -> PowerBIWorkspaceData:
scan_result = self._scan()
augmented_dashboard_data = scan_result["workspaces"][0]["dashboards"]

dashboards = [
PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data)
for data in augmented_dashboard_data
]

reports = [
PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data)
for data in scan_result["workspaces"][0]["reports"]
]

semantic_models_data = scan_result["workspaces"][0]["datasets"]

semantic_models = [
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData.from_content_data(
self.workspace_id, dashboards + reports + semantic_models
)

def _fetch_powerbi_workspace_data_legacy(self) -> PowerBIWorkspaceData:
dashboard_data = self._get_dashboards()["value"]
augmented_dashboard_data = [
{**dashboard, "tiles": self._get_dashboard_tiles(dashboard["id"])["value"]}
Expand Down Expand Up @@ -314,11 +393,14 @@ def build_defs(
def load_powerbi_asset_specs(
workspace: PowerBIWorkspace,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
use_workspace_scan: bool = False,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Power BI content in the workspace.
Args:
workspace (PowerBIWorkspace): The Power BI workspace to load assets from.
use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs
at once to get all content. Defaults to False.
Returns:
List[AssetSpec]: The set of assets representing the Power BI content in the workspace.
Expand All @@ -328,6 +410,7 @@ def load_powerbi_asset_specs(
PowerBIWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_powerbi_translator,
use_workspace_scan=use_workspace_scan,
)
.build_defs()
.assets,
Expand All @@ -339,14 +422,17 @@ def load_powerbi_asset_specs(
class PowerBIWorkspaceDefsLoader(StateBackedDefinitionsLoader[PowerBIWorkspaceData]):
workspace: PowerBIWorkspace
translator_cls: Type[DagsterPowerBITranslator]
use_workspace_scan: bool

@property
def defs_key(self) -> str:
return f"{POWER_BI_RECONSTRUCTION_METADATA_KEY_PREFIX}/{self.workspace.workspace_id}"

def fetch_state(self) -> PowerBIWorkspaceData:
with self.workspace.process_config_and_initialize_cm() as initialized_workspace:
return initialized_workspace._fetch_powerbi_workspace_data() # noqa: SLF001
return initialized_workspace._fetch_powerbi_workspace_data( # noqa: SLF001
use_workspace_scan=self.use_workspace_scan
)

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls(context=state)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import urllib.parse
from enum import Enum
from typing import Any, Dict, Literal, Optional, Sequence
from typing import Any, Dict, List, Literal, Optional, Sequence

from dagster import (
UrlMetadataValue,
Expand Down Expand Up @@ -31,6 +31,31 @@ def _clean_asset_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_]+", "_", name)


# regex to find objects of form
# [Name="ANALYTICS",Kind="Schema"]
PARSE_M_QUERY_OBJECT = re.compile(r'\[Name="(?P<name>[^"]+)",Kind="(?P<kind>[^"]+)"\]')


def _attempt_parse_m_query_source(sources: List[Dict[str, Any]]) -> Optional[AssetKey]:
for source in sources:
if "expression" in source:
if "Snowflake.Databases" in source["expression"]:
objects = PARSE_M_QUERY_OBJECT.findall(source["expression"])
objects_by_kind = {obj[1]: obj[0].lower() for obj in objects}

if "Schema" in objects_by_kind and "Table" in objects_by_kind:
if "Database" in objects_by_kind:
return AssetKey(
[
objects_by_kind["Database"],
objects_by_kind["Schema"],
objects_by_kind["Table"],
]
)
else:
return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]])


@whitelist_for_serdes
class PowerBIContentType(Enum):
"""Enum representing each object in PowerBI's ontology, generically referred to as "content" by the API."""
Expand Down Expand Up @@ -159,7 +184,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
deps=report_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="dashboard")},
kinds={"powerbi"},
kinds={"powerbi", "dashboard"},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -176,20 +201,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
kinds={"powerbi"},
kinds={"powerbi", "report"},
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])])

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
source_ids = data.properties["sources"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id])
for source_id in source_ids
]
url = data.properties.get("webUrl")

for table in data.properties.get("tables", []):
source = table.get("source")
source_key = _attempt_parse_m_query_source(source)
if source_key:
source_keys.append(source_key)

return AssetSpec(
key=self.get_semantic_model_asset_key(data),
deps=source_keys,
Expand All @@ -199,7 +230,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
)
},
tags={**PowerBITagSet(asset_type="semantic_model")},
kinds={"powerbi"},
kinds={"powerbi", "semantic model"},
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,45 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
)


@pytest.fixture(
name="workspace_scan_data_api_mocks",
)
def workspace_scan_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
scan_id = "1234"
response.add(
method=responses.POST,
url=f"{BASE_API_URL}/admin/workspaces/getInfo?lineage=true&datasourceDetails=true&datasetSchema=true&datasetExpressions=true",
json={"id": scan_id},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/admin/workspaces/scanStatus/{scan_id}",
json={"status": "Succeeded"},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/admin/workspaces/scanResult/{scan_id}",
json={
"workspaces": [
{
"dashboards": [SAMPLE_DASH],
"reports": [SAMPLE_REPORT],
"datasets": [SAMPLE_SEMANTIC_MODEL],
"datasourceInstances": SAMPLE_DATA_SOURCES,
}
]
},
status=200,
)

yield response


@pytest.fixture(
name="workspace_data_api_mocks",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,29 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_
workspace_id=workspace_id,
)

actual_workspace_data = resource._fetch_powerbi_workspace_data() # noqa: SLF001
actual_workspace_data = resource._fetch_powerbi_workspace_data(use_workspace_scan=False) # noqa: SLF001
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
assert len(actual_workspace_data.data_sources_by_id) == 2


def test_fetch_powerbi_workspace_data_scan(
workspace_scan_data_api_mocks: None, workspace_id: str
) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
credentials=PowerBIToken(api_token=fake_token),
workspace_id=workspace_id,
)

actual_workspace_data = resource._fetch_powerbi_workspace_data(use_workspace_scan=True) # noqa: SLF001
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
# assert len(actual_workspace_data.data_sources_by_id) == 2


def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "dashboard",
**build_kind_tag("powerbi"),
**build_kind_tag("dashboard"),
}


Expand All @@ -42,7 +43,11 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee"
)
}
assert asset_spec.tags == {"dagster-powerbi/asset_type": "report", **build_kind_tag("powerbi")}
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "report",
**build_kind_tag("powerbi"),
**build_kind_tag("report"),
}


def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -65,6 +70,7 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "semantic_model",
**build_kind_tag("powerbi"),
**build_kind_tag("semantic model"),
}


Expand Down

0 comments on commit 0988d6d

Please sign in to comment.