diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 8533d3f969d1d..40b789bad38eb 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from functools import cached_property from typing import Any, Dict, Mapping, Optional, Sequence, Type +from urllib.parse import urlencode import requests from dagster import ( @@ -17,6 +18,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.events import Failure +from dagster._time import get_current_timestamp from dagster._utils.cached_method import cached_method from dagster._utils.security import non_secure_md5_hash_str from pydantic import Field, PrivateAttr @@ -32,6 +34,8 @@ BASE_API_URL = "https://api.powerbi.com/v1.0/myorg" POWER_BI_RECONSTRUCTION_METADATA_KEY_PREFIX = "__power_bi" +ADMIN_SCAN_TIMEOUT = 60 + def _clean_op_name(name: str) -> str: """Cleans an input to be a valid Dagster op name.""" @@ -122,6 +126,7 @@ def _fetch( endpoint: str, method: str = "GET", json: Any = None, + params: Optional[Dict[str, Any]] = None, group_scoped: bool = True, ) -> requests.Response: """Fetch JSON data from the PowerBI API. Raises an exception if the request fails. @@ -137,9 +142,14 @@ def _fetch( "Authorization": f"Bearer {self._api_token}", } base_url = f"{BASE_API_URL}/groups/{self.workspace_id}" if group_scoped else BASE_API_URL + url = f"{base_url}/{endpoint}" + if params: + url_parameters = urlencode(params) if params else None + url = f"{url}?{url_parameters}" + response = requests.request( method=method, - url=f"{base_url}/{endpoint}", + url=url, headers=headers, json=json, allow_redirects=True, @@ -152,9 +162,10 @@ def _fetch_json( endpoint: str, method: str = "GET", json: Any = None, + params: Optional[Dict[str, Any]] = None, group_scoped: bool = True, ) -> Dict[str, Any]: - return self._fetch(endpoint, method, json, group_scoped=group_scoped).json() + return self._fetch(endpoint, method, json, group_scoped=group_scoped, params=params).json() @public def trigger_and_poll_refresh(self, dataset_id: str) -> None: @@ -223,13 +234,81 @@ def _get_dashboard_tiles(self, dashboard_id: str) -> Mapping[str, Any]: """ return self._fetch_json(f"dashboards/{dashboard_id}/tiles") - def _fetch_powerbi_workspace_data(self) -> PowerBIWorkspaceData: + @cached_method + def _scan(self) -> Mapping[str, Any]: + submission = self._fetch_json( + method="POST", + endpoint="admin/workspaces/getInfo", + group_scoped=False, + json={"workspaces": [self.workspace_id]}, + params={ + "lineage": "true", + "datasourceDetails": "true", + "datasetSchema": "true", + "datasetExpressions": "true", + }, + ) + scan_id = submission["id"] + + now = get_current_timestamp() + start_time = now + + status = None + while status != "Succeeded" and now - start_time < ADMIN_SCAN_TIMEOUT: + scan_details = self._fetch_json( + endpoint=f"admin/workspaces/scanStatus/{scan_id}", group_scoped=False + ) + status = scan_details["status"] + time.sleep(0.1) + now = get_current_timestamp() + + if status != "Succeeded": + raise Failure(f"Scan not successful after {ADMIN_SCAN_TIMEOUT} seconds: {scan_details}") + + return self._fetch_json( + endpoint=f"admin/workspaces/scanResult/{scan_id}", group_scoped=False + ) + + def _fetch_powerbi_workspace_data(self, use_workspace_scan: bool) -> PowerBIWorkspaceData: """Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object. Future work will cache this data to avoid repeated calls to the Power BI API. + Args: + use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs + at once to get all content. + Returns: PowerBIWorkspaceData: A snapshot of the Power BI workspace's content. """ + if use_workspace_scan: + return self._fetch_powerbi_workspace_data_scan() + return self._fetch_powerbi_workspace_data_legacy() + + def _fetch_powerbi_workspace_data_scan(self) -> PowerBIWorkspaceData: + scan_result = self._scan() + augmented_dashboard_data = scan_result["workspaces"][0]["dashboards"] + + dashboards = [ + PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data) + for data in augmented_dashboard_data + ] + + reports = [ + PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data) + for data in scan_result["workspaces"][0]["reports"] + ] + + semantic_models_data = scan_result["workspaces"][0]["datasets"] + + semantic_models = [ + PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset) + for dataset in semantic_models_data + ] + return PowerBIWorkspaceData.from_content_data( + self.workspace_id, dashboards + reports + semantic_models + ) + + def _fetch_powerbi_workspace_data_legacy(self) -> PowerBIWorkspaceData: dashboard_data = self._get_dashboards()["value"] augmented_dashboard_data = [ {**dashboard, "tiles": self._get_dashboard_tiles(dashboard["id"])["value"]} @@ -314,11 +393,14 @@ def build_defs( def load_powerbi_asset_specs( workspace: PowerBIWorkspace, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, + use_workspace_scan: bool = False, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Power BI content in the workspace. Args: workspace (PowerBIWorkspace): The Power BI workspace to load assets from. + use_workspace_scan (bool): Whether to scan the entire workspace using admin APIs + at once to get all content. Defaults to False. Returns: List[AssetSpec]: The set of assets representing the Power BI content in the workspace. @@ -328,6 +410,7 @@ def load_powerbi_asset_specs( PowerBIWorkspaceDefsLoader( workspace=initialized_workspace, translator_cls=dagster_powerbi_translator, + use_workspace_scan=use_workspace_scan, ) .build_defs() .assets, @@ -339,6 +422,7 @@ def load_powerbi_asset_specs( class PowerBIWorkspaceDefsLoader(StateBackedDefinitionsLoader[PowerBIWorkspaceData]): workspace: PowerBIWorkspace translator_cls: Type[DagsterPowerBITranslator] + use_workspace_scan: bool @property def defs_key(self) -> str: @@ -346,7 +430,9 @@ def defs_key(self) -> str: def fetch_state(self) -> PowerBIWorkspaceData: with self.workspace.process_config_and_initialize_cm() as initialized_workspace: - return initialized_workspace._fetch_powerbi_workspace_data() # noqa: SLF001 + return initialized_workspace._fetch_powerbi_workspace_data( # noqa: SLF001 + use_workspace_scan=self.use_workspace_scan + ) def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: translator = self.translator_cls(context=state) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index d0adf204fc095..f1200aa4ab03e 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -1,7 +1,7 @@ import re import urllib.parse from enum import Enum -from typing import Any, Dict, Literal, Optional, Sequence +from typing import Any, Dict, List, Literal, Optional, Sequence from dagster import ( UrlMetadataValue, @@ -31,6 +31,31 @@ def _clean_asset_name(name: str) -> str: return re.sub(r"[^A-Za-z0-9_]+", "_", name) +# regex to find objects of form +# [Name="ANALYTICS",Kind="Schema"] +PARSE_M_QUERY_OBJECT = re.compile(r'\[Name="(?P[^"]+)",Kind="(?P[^"]+)"\]') + + +def _attempt_parse_m_query_source(sources: List[Dict[str, Any]]) -> Optional[AssetKey]: + for source in sources: + if "expression" in source: + if "Snowflake.Databases" in source["expression"]: + objects = PARSE_M_QUERY_OBJECT.findall(source["expression"]) + objects_by_kind = {obj[1]: obj[0].lower() for obj in objects} + + if "Schema" in objects_by_kind and "Table" in objects_by_kind: + if "Database" in objects_by_kind: + return AssetKey( + [ + objects_by_kind["Database"], + objects_by_kind["Schema"], + objects_by_kind["Table"], + ] + ) + else: + return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]]) + + @whitelist_for_serdes class PowerBIContentType(Enum): """Enum representing each object in PowerBI's ontology, generically referred to as "content" by the API.""" @@ -159,7 +184,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: deps=report_keys, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, tags={**PowerBITagSet(asset_type="dashboard")}, - kinds={"powerbi"}, + kinds={"powerbi", "dashboard"}, ) def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -176,20 +201,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: deps=[dataset_key] if dataset_key else None, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, tags={**PowerBITagSet(asset_type="report")}, - kinds={"powerbi"}, + kinds={"powerbi", "report"}, ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]) def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: - source_ids = data.properties["sources"] + source_ids = data.properties.get("sources", []) source_keys = [ self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id]) for source_id in source_ids ] url = data.properties.get("webUrl") + for table in data.properties.get("tables", []): + source = table.get("source") + source_key = _attempt_parse_m_query_source(source) + if source_key: + source_keys.append(source_key) + return AssetSpec( key=self.get_semantic_model_asset_key(data), deps=source_keys, @@ -199,7 +230,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: ) }, tags={**PowerBITagSet(asset_type="semantic_model")}, - kinds={"powerbi"}, + kinds={"powerbi", "semantic model"}, ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py index 509020ab2a5f0..56147c9dacd01 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py @@ -149,6 +149,45 @@ def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData: ) +@pytest.fixture( + name="workspace_scan_data_api_mocks", +) +def workspace_scan_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]: + with responses.RequestsMock() as response: + scan_id = "1234" + response.add( + method=responses.POST, + url=f"{BASE_API_URL}/admin/workspaces/getInfo?lineage=true&datasourceDetails=true&datasetSchema=true&datasetExpressions=true", + json={"id": scan_id}, + status=200, + ) + + response.add( + method=responses.GET, + url=f"{BASE_API_URL}/admin/workspaces/scanStatus/{scan_id}", + json={"status": "Succeeded"}, + status=200, + ) + + response.add( + method=responses.GET, + url=f"{BASE_API_URL}/admin/workspaces/scanResult/{scan_id}", + json={ + "workspaces": [ + { + "dashboards": [SAMPLE_DASH], + "reports": [SAMPLE_REPORT], + "datasets": [SAMPLE_SEMANTIC_MODEL], + "datasourceInstances": SAMPLE_DATA_SOURCES, + } + ] + }, + status=200, + ) + + yield response + + @pytest.fixture( name="workspace_data_api_mocks", ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 8c367b485e2de..4c6fb7fbde618 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -35,13 +35,29 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_ workspace_id=workspace_id, ) - actual_workspace_data = resource._fetch_powerbi_workspace_data() # noqa: SLF001 + actual_workspace_data = resource._fetch_powerbi_workspace_data(use_workspace_scan=False) # noqa: SLF001 assert len(actual_workspace_data.dashboards_by_id) == 1 assert len(actual_workspace_data.reports_by_id) == 1 assert len(actual_workspace_data.semantic_models_by_id) == 1 assert len(actual_workspace_data.data_sources_by_id) == 2 +def test_fetch_powerbi_workspace_data_scan( + workspace_scan_data_api_mocks: None, workspace_id: str +) -> None: + fake_token = uuid.uuid4().hex + resource = PowerBIWorkspace( + credentials=PowerBIToken(api_token=fake_token), + workspace_id=workspace_id, + ) + + actual_workspace_data = resource._fetch_powerbi_workspace_data(use_workspace_scan=True) # noqa: SLF001 + assert len(actual_workspace_data.dashboards_by_id) == 1 + assert len(actual_workspace_data.reports_by_id) == 1 + assert len(actual_workspace_data.semantic_models_by_id) == 1 + # assert len(actual_workspace_data.data_sources_by_id) == 2 + + def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None: fake_token = uuid.uuid4().hex resource = PowerBIWorkspace( diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index f4f1ff4536c80..9181c356035a6 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -24,6 +24,7 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None assert asset_spec.tags == { "dagster-powerbi/asset_type": "dashboard", **build_kind_tag("powerbi"), + **build_kind_tag("dashboard"), } @@ -42,7 +43,11 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee" ) } - assert asset_spec.tags == {"dagster-powerbi/asset_type": "report", **build_kind_tag("powerbi")} + assert asset_spec.tags == { + "dagster-powerbi/asset_type": "report", + **build_kind_tag("powerbi"), + **build_kind_tag("report"), + } def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None: @@ -65,6 +70,7 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None assert asset_spec.tags == { "dagster-powerbi/asset_type": "semantic_model", **build_kind_tag("powerbi"), + **build_kind_tag("semantic model"), }