diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py index 6192fd8702888..b54e4e1b15316 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py @@ -1,5 +1,6 @@ from dagster._core.libraries import DagsterLibraryRegistry +from dagster_powerbi.resource import PowerBIWorkspace as PowerBIWorkspace from dagster_powerbi.translator import DagsterPowerBITranslator as DagsterPowerBITranslator # Move back to version.py and edit setup.py once we are ready to publish. diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py new file mode 100644 index 0000000000000..f63803c728c31 --- /dev/null +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -0,0 +1,69 @@ +from typing import Any, Dict + +import requests +from dagster import ConfigurableResource +from dagster._utils.cached_method import cached_method +from pydantic import Field + +BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/" + + +class PowerBIWorkspace(ConfigurableResource): + """Represents a workspace in PowerBI and provides utilities + to interact with the PowerBI API. + """ + + api_token: str = Field(..., description="An API token used to connect to PowerBI.") + workspace_id: str = Field(..., description="The ID of the PowerBI group to use.") + + def fetch_json(self, endpoint: str) -> Dict[str, Any]: + """Fetch JSON data from the PowerBI API. Raises an exception if the request fails. + + Args: + endpoint (str): The API endpoint to fetch data from. + + Returns: + Dict[str, Any]: The JSON data returned from the API. + """ + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_token}", + } + response = requests.get( + f"{BASE_API_URL}/groups/{self.workspace_id}/{endpoint}", headers=headers + ) + response.raise_for_status() + return response.json() + + @cached_method + def get_reports(self) -> Dict[str, Any]: + """Fetches a list of all PowerBI reports in the workspace.""" + return self.fetch_json("reports") + + @cached_method + def get_semantic_models(self) -> Dict[str, Any]: + """Fetches a list of all PowerBI semantic models in the workspace.""" + return self.fetch_json("datasets") + + @cached_method + def get_semantic_model_sources( + self, + dataset_id: str, + ) -> Dict[str, Any]: + """Fetches a list of all data sources for a given semantic model.""" + return self.fetch_json(f"datasets/{dataset_id}/datasources") + + @cached_method + def get_dashboards(self) -> Dict[str, Any]: + """Fetches a list of all PowerBI dashboards in the workspace.""" + return self.fetch_json("dashboards") + + @cached_method + def get_dashboard_tiles( + self, + dashboard_id: str, + ) -> Dict[str, Any]: + """Fetches a list of all tiles for a given PowerBI dashboard, + including which reports back each tile. + """ + return self.fetch_json(f"dashboards/{dashboard_id}/tiles") diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py new file mode 100644 index 0000000000000..34ed11e4ebc97 --- /dev/null +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_resource.py @@ -0,0 +1,27 @@ +import uuid + +import responses +from dagster_powerbi import PowerBIWorkspace +from dagster_powerbi.resource import BASE_API_URL + + +@responses.activate +def test_basic_resource_request() -> None: + fake_token = uuid.uuid4().hex + fake_workspace_id = uuid.uuid4().hex + resource = PowerBIWorkspace( + api_token=fake_token, + workspace_id=fake_workspace_id, + ) + + responses.add( + method=responses.GET, + url=f"{BASE_API_URL}/groups/{fake_workspace_id}/reports", + json={}, + status=200, + ) + + resource.get_reports() + + assert len(responses.calls) == 1 + assert responses.calls[0].request.headers["Authorization"] == f"Bearer {fake_token}"