From cd2772ed2fe7573238591416d1762b1a00a7f71f Mon Sep 17 00:00:00 2001 From: benpankow Date: Wed, 7 Aug 2024 12:14:02 -0500 Subject: [PATCH] move methods to workspace --- .../dagster_powerbi/__init__.py | 1 - .../dagster_powerbi/asset_specs.py | 88 ------------------- .../dagster_powerbi/resource.py | 86 +++++++++++++++++- .../dagster_powerbi_tests/test_asset_specs.py | 7 +- 4 files changed, 87 insertions(+), 95 deletions(-) delete mode 100644 python_modules/libraries/dagster-powerbi/dagster_powerbi/asset_specs.py diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py index 4c9aea90fbcc0..6aa20338dd589 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py @@ -1,6 +1,5 @@ from dagster._core.libraries import DagsterLibraryRegistry -from .asset_specs import build_powerbi_asset_specs as build_powerbi_asset_specs from .resource import PowerBiWorkspace as PowerBiWorkspace from .translator import DagsterPowerBITranslator as DagsterPowerBITranslator diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/asset_specs.py deleted file mode 100644 index 061f24d8da745..0000000000000 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/asset_specs.py +++ /dev/null @@ -1,88 +0,0 @@ -from typing import Sequence, Type - -from dagster._core.definitions.asset_spec import AssetSpec - -from .resource import PowerBiWorkspace -from .translator import ( - DagsterPowerBITranslator, - PowerBIContentData, - PowerBIContentType, - PowerBIWorkspaceData, -) - - -def fetch_powerbi_workspace_data( - powerbi_workspace: PowerBiWorkspace, -) -> PowerBIWorkspaceData: - """Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object. - Future work will cache this data to avoid repeated calls to the Power BI API. - - Args: - powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data. - - Returns: - PowerBIWorkspaceData: A snapshot of the Power BI workspace's content. - """ - dashboard_data = powerbi_workspace.get_dashboards()["value"] - augmented_dashboard_data = [ - {**dashboard, "tiles": powerbi_workspace.get_dashboard_tiles(dashboard["id"])} - for dashboard in dashboard_data - ] - dashboards = [ - PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data) - for data in augmented_dashboard_data - ] - - reports = [ - PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data) - for data in powerbi_workspace.get_reports()["value"] - ] - semantic_models_data = powerbi_workspace.get_semantic_models()["value"] - data_sources_by_id = {} - for dataset in semantic_models_data: - dataset_sources = powerbi_workspace.get_semantic_model_sources(dataset["id"])["value"] - dataset["sources"] = [source["datasourceId"] for source in dataset_sources] - for data_source in dataset_sources: - data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData( - content_type=PowerBIContentType.DATA_SOURCE, properties=data_source - ) - semantic_models = [ - PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset) - for dataset in semantic_models_data - ] - return PowerBIWorkspaceData( - dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards}, - reports_by_id={report.properties["id"]: report for report in reports}, - semantic_models_by_id={dataset.properties["id"]: dataset for dataset in semantic_models}, - data_sources_by_id=data_sources_by_id, - ) - - -def build_powerbi_asset_specs( - *, - powerbi_workspace: PowerBiWorkspace, - dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, -) -> Sequence[AssetSpec]: - """Fetches Power BI content from the workspace and translates it into AssetSpecs, - using the provided translator. - Future work will cache this data to avoid repeated calls to the Power BI API. - - Args: - powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data. - dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use - to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator. - - Returns: - Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content. - """ - workspace_data = fetch_powerbi_workspace_data(powerbi_workspace) - translator = dagster_powerbi_translator(context=workspace_data) - - all_content = [ - *workspace_data.dashboards_by_id.values(), - *workspace_data.reports_by_id.values(), - *workspace_data.semantic_models_by_id.values(), - *workspace_data.data_sources_by_id.values(), - ] - - return [translator.get_asset_spec(content) for content in all_content] diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 41702e0ac618b..f56b0e062eefe 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -1,10 +1,18 @@ -from typing import Any, Dict +from typing import Any, Dict, Sequence, Type import requests from dagster import ConfigurableResource +from dagster._core.definitions.asset_spec import AssetSpec from dagster._utils.cached_method import cached_method from pydantic import Field +from .translator import ( + DagsterPowerBITranslator, + PowerBIContentData, + PowerBIContentType, + PowerBIWorkspaceData, +) + BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/" @@ -67,3 +75,79 @@ def get_dashboard_tiles( including which reports back each tile. """ return self.fetch_json(f"dashboards/{dashboard_id}/tiles") + + def fetch_powerbi_workspace_data( + self, + ) -> PowerBIWorkspaceData: + """Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object. + Future work will cache this data to avoid repeated calls to the Power BI API. + + Args: + powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data. + + Returns: + PowerBIWorkspaceData: A snapshot of the Power BI workspace's content. + """ + dashboard_data = self.get_dashboards()["value"] + augmented_dashboard_data = [ + {**dashboard, "tiles": self.get_dashboard_tiles(dashboard["id"])} + for dashboard in dashboard_data + ] + dashboards = [ + PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data) + for data in augmented_dashboard_data + ] + + reports = [ + PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data) + for data in self.get_reports()["value"] + ] + semantic_models_data = self.get_semantic_models()["value"] + data_sources_by_id = {} + for dataset in semantic_models_data: + dataset_sources = self.get_semantic_model_sources(dataset["id"])["value"] + dataset["sources"] = [source["datasourceId"] for source in dataset_sources] + for data_source in dataset_sources: + data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData( + content_type=PowerBIContentType.DATA_SOURCE, properties=data_source + ) + semantic_models = [ + PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset) + for dataset in semantic_models_data + ] + return PowerBIWorkspaceData( + dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards}, + reports_by_id={report.properties["id"]: report for report in reports}, + semantic_models_by_id={ + dataset.properties["id"]: dataset for dataset in semantic_models + }, + data_sources_by_id=data_sources_by_id, + ) + + def build_asset_specs( + self, + dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, + ) -> Sequence[AssetSpec]: + """Fetches Power BI content from the workspace and translates it into AssetSpecs, + using the provided translator. + Future work will cache this data to avoid repeated calls to the Power BI API. + + Args: + powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data. + dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use + to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator. + + Returns: + Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content. + """ + workspace_data = self.fetch_powerbi_workspace_data() + translator = dagster_powerbi_translator(context=workspace_data) + + all_content = [ + *workspace_data.dashboards_by_id.values(), + *workspace_data.reports_by_id.values(), + *workspace_data.semantic_models_by_id.values(), + *workspace_data.data_sources_by_id.values(), + ] + + return [translator.get_asset_spec(content) for content in all_content] diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 4843a9c5b8183..f1d69085027d8 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -10,9 +10,8 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_ api_token=fake_token, workspace_id=workspace_id, ) - from dagster_powerbi.asset_specs import fetch_powerbi_workspace_data - actual_workspace_data = fetch_powerbi_workspace_data(powerbi_workspace=resource) + actual_workspace_data = resource.fetch_powerbi_workspace_data() assert len(actual_workspace_data.dashboards_by_id) == 1 assert len(actual_workspace_data.reports_by_id) == 1 assert len(actual_workspace_data.semantic_models_by_id) == 1 @@ -20,14 +19,12 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None: - from dagster_powerbi.asset_specs import build_powerbi_asset_specs - fake_token = uuid.uuid4().hex resource = PowerBiWorkspace( api_token=fake_token, workspace_id=workspace_id, ) - all_asset_specs = build_powerbi_asset_specs(powerbi_workspace=resource) + all_asset_specs = resource.build_asset_specs() # 1 dashboard, 1 report, 1 semantic model, 2 data sources assert len(all_asset_specs) == 5