diff --git a/docs/content/_apidocs.mdx b/docs/content/_apidocs.mdx index b89795bba5629..48246fa857011 100644 --- a/docs/content/_apidocs.mdx +++ b/docs/content/_apidocs.mdx @@ -610,6 +610,16 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi Includes implementations of run and event log storage built on Postgres. + + + Power BI ( + dagster-powerbi) + + + Provides an integration to represent a Power BI workspace as a graph of + assets. + + Prometheus ( diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 0653bca3b7a19..cd5e71c8f9e3a 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -1532,6 +1532,10 @@ "title": "PostgreSQL (dagster-postgres)", "path": "/_apidocs/libraries/dagster-postgres" }, + { + "title": "Power BI (dagster-powerbi)", + "path": "/_apidocs/libraries/dagster-powerbi" + }, { "title": "Prometheus (dagster-prometheus)", "path": "/_apidocs/libraries/dagster-prometheus" diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index dbc21166df0e8..d9d87a92f5660 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index 9d9765a94f4e0..d3e799db8bc0b 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 5d8d811aff96d..baa3bf6f0f02e 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 0b5514b7cca86..feacc91726748 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py index 8a74633f4b1e6..b7d0bf010e826 100644 --- a/docs/sphinx/conf.py +++ b/docs/sphinx/conf.py @@ -38,6 +38,7 @@ "../../python_modules/libraries/dagster-github", "../../python_modules/libraries/dagster-k8s", "../../python_modules/libraries/dagster-looker", + "../../python_modules/libraries/dagster-powerbi", "../../python_modules/libraries/dagster-managed-elements", "../../python_modules/libraries/dagster-mlflow", "../../python_modules/libraries/dagster-msteams", diff --git a/docs/sphinx/index.rst b/docs/sphinx/index.rst index 75d32595ab06e..8518e7ce91548 100644 --- a/docs/sphinx/index.rst +++ b/docs/sphinx/index.rst @@ -67,6 +67,7 @@ sections/api/apidocs/libraries/dagster-pipes sections/api/apidocs/libraries/dagster-polars sections/api/apidocs/libraries/dagster-postgres + sections/api/apidocs/libraries/dagster-powerbi sections/api/apidocs/libraries/dagster-prometheus sections/api/apidocs/libraries/dagster-pyspark sections/api/apidocs/libraries/dagster-shell diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-powerbi.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-powerbi.rst new file mode 100644 index 0000000000000..af091d503a6c9 --- /dev/null +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-powerbi.rst @@ -0,0 +1,17 @@ +########################## +Power BI (dagster-powerbi) +########################## + +Dagster allows you to represent your Power BI workspace as assets, alongside other your other +technologies like dbt and Sling. This allows you to see how your Power BI assets are connected to +your other data assets, and how changes to other data assets might impact your Power BI project. + +.. currentmodule:: dagster_powerbi + + +Assets +====== + +.. autoclass:: DagsterPowerBITranslator + +.. autoclass:: PowerBIWorkspace diff --git a/docs/tox.ini b/docs/tox.ini index 68fcf161d2291..e74e8ee93c3a7 100644 --- a/docs/tox.ini +++ b/docs/tox.ini @@ -45,6 +45,7 @@ deps = -e ../python_modules/libraries/dagster-deltalake-polars -e ../python_modules/libraries/dagster-openai -e ../python_modules/libraries/dagster-looker + -e ../python_modules/libraries/dagster-powerbi commands = make --directory=sphinx clean diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py index b54e4e1b15316..5f505f8fdc5d0 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/__init__.py @@ -2,8 +2,6 @@ from dagster_powerbi.resource import PowerBIWorkspace as PowerBIWorkspace from dagster_powerbi.translator import DagsterPowerBITranslator as DagsterPowerBITranslator - -# Move back to version.py and edit setup.py once we are ready to publish. -__version__ = "1!0+dev" +from dagster_powerbi.version import __version__ DagsterLibraryRegistry.register("dagster-powerbi", __version__) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 309f4704ceca3..1a6b32a7a3a24 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -1,8 +1,17 @@ from typing import Any, Dict, Sequence, Type import requests -from dagster import ConfigurableResource -from dagster._core.definitions.asset_spec import AssetSpec +from dagster import ( + AssetsDefinition, + ConfigurableResource, + _check as check, + external_assets_from_specs, +) +from dagster._annotations import public +from dagster._core.definitions.cacheable_assets import ( + AssetsDefinitionCacheableData, + CacheableAssetsDefinition, +) from dagster._utils.cached_method import cached_method from pydantic import Field @@ -16,6 +25,7 @@ BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/" +@public class PowerBIWorkspace(ConfigurableResource): """Represents a workspace in PowerBI and provides utilities to interact with the PowerBI API. @@ -112,32 +122,60 @@ def fetch_powerbi_workspace_data( PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset) for dataset in semantic_models_data ] - return PowerBIWorkspaceData( - dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards}, - reports_by_id={report.properties["id"]: report for report in reports}, - semantic_models_by_id={ - dataset.properties["id"]: dataset for dataset in semantic_models - }, - data_sources_by_id=data_sources_by_id, + return PowerBIWorkspaceData.from_content_data( + self.workspace_id, + dashboards + reports + semantic_models + list(data_sources_by_id.values()), ) - def build_asset_specs( + @public + def build_assets( self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, - ) -> Sequence[AssetSpec]: - """Fetches Power BI content from the workspace and translates it into AssetSpecs, - using the provided translator. - Future work will cache this data to avoid repeated calls to the Power BI API. + ) -> Sequence[CacheableAssetsDefinition]: + """Returns a set of CacheableAssetsDefinition which will load Power BI content from + the workspace and translates it into AssetSpecs, using the provided translator. Args: dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator. Returns: - Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content. + Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which + will load the Power BI content. """ - workspace_data = self.fetch_powerbi_workspace_data() - translator = dagster_powerbi_translator(context=workspace_data) + return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)] + + +class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition): + def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]): + self._workspace = workspace + self._translator_cls = translator + super().__init__(unique_id=self._workspace.workspace_id) + + def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: + workspace_data: PowerBIWorkspaceData = self._workspace.fetch_powerbi_workspace_data() + return [ + AssetsDefinitionCacheableData(extra_metadata=data.to_cached_data()) + for data in [ + *workspace_data.dashboards_by_id.values(), + *workspace_data.reports_by_id.values(), + *workspace_data.semantic_models_by_id.values(), + *workspace_data.data_sources_by_id.values(), + ] + ] + + def build_definitions( + self, data: Sequence[AssetsDefinitionCacheableData] + ) -> Sequence[AssetsDefinition]: + workspace_data = PowerBIWorkspaceData.from_content_data( + self._workspace.workspace_id, + [ + PowerBIContentData.from_cached_data(check.not_none(entry.extra_metadata)) + for entry in data + ], + ) + + translator = self._translator_cls(context=workspace_data) all_content = [ *workspace_data.dashboards_by_id.values(), @@ -146,4 +184,6 @@ def build_asset_specs( *workspace_data.data_sources_by_id.values(), ] - return [translator.get_asset_spec(content) for content in all_content] + return external_assets_from_specs( + [translator.get_asset_spec(content) for content in all_content] + ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index a2b7a009abd46..078ea008eb5ab 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -1,13 +1,16 @@ import re import urllib.parse from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, Mapping, Sequence from dagster import _check as check +from dagster._annotations import public from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._record import record +POWERBI_PREFIX = "powerbi/" + def _get_last_filepath_component(path: str) -> str: """Returns the last component of a file path.""" @@ -42,6 +45,16 @@ class PowerBIContentData: content_type: PowerBIContentType properties: Dict[str, Any] + def to_cached_data(self) -> Dict[str, Any]: + return {"content_type": self.content_type.value, "properties": self.properties} + + @classmethod + def from_cached_data(cls, data: Mapping[Any, Any]) -> "PowerBIContentData": + return cls( + content_type=PowerBIContentType(data["content_type"]), + properties=data["properties"], + ) + @record class PowerBIWorkspaceData: @@ -50,12 +63,42 @@ class PowerBIWorkspaceData: Provided as context for the translator so that it can resolve dependencies between content. """ + workspace_id: str dashboards_by_id: Dict[str, PowerBIContentData] reports_by_id: Dict[str, PowerBIContentData] semantic_models_by_id: Dict[str, PowerBIContentData] data_sources_by_id: Dict[str, PowerBIContentData] + @classmethod + def from_content_data( + cls, workspace_id: str, content_data: Sequence[PowerBIContentData] + ) -> "PowerBIWorkspaceData": + return cls( + workspace_id=workspace_id, + dashboards_by_id={ + dashboard.properties["id"]: dashboard + for dashboard in content_data + if dashboard.content_type == PowerBIContentType.DASHBOARD + }, + reports_by_id={ + report.properties["id"]: report + for report in content_data + if report.content_type == PowerBIContentType.REPORT + }, + semantic_models_by_id={ + dataset.properties["id"]: dataset + for dataset in content_data + if dataset.content_type == PowerBIContentType.SEMANTIC_MODEL + }, + data_sources_by_id={ + data_source.properties["datasourceId"]: data_source + for data_source in content_data + if data_source.content_type == PowerBIContentType.DATA_SOURCE + }, + ) + +@public class DagsterPowerBITranslator: """Translator class which converts raw response data from the PowerBI API into AssetSpecs. Subclass this class to implement custom logic for each type of PowerBI content. diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/version.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/version.py new file mode 100644 index 0000000000000..fe3fd8a8248b6 --- /dev/null +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/version.py @@ -0,0 +1 @@ +__version__ = "1!0+dev" diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py index 508a31d1e2f54..56fc4d6315e17 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py @@ -76,10 +76,15 @@ ] +@pytest.fixture(name="workspace_id") +def workspace_id_fixture() -> str: + return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221" + + @pytest.fixture( name="workspace_data", ) -def workspace_data_fixture() -> PowerBIWorkspaceData: +def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData: sample_dash = SAMPLE_DASH.copy() # Response from tiles API, which we add to the dashboard data sample_dash["tiles"] = SAMPLE_DASH_TILES @@ -90,6 +95,7 @@ def workspace_data_fixture() -> PowerBIWorkspaceData: sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources] return PowerBIWorkspaceData( + workspace_id=workspace_id, dashboards_by_id={ sample_dash["id"]: PowerBIContentData( content_type=PowerBIContentType.DASHBOARD, properties=sample_dash @@ -114,15 +120,10 @@ def workspace_data_fixture() -> PowerBIWorkspaceData: ) -@pytest.fixture(name="workspace_id") -def workspace_id_fixture() -> str: - return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221" - - @pytest.fixture( name="workspace_data_api_mocks", ) -def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]: +def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]: with responses.RequestsMock() as response: response.add( method=responses.GET, @@ -159,4 +160,4 @@ def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]: status=200, ) - yield + yield response diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py new file mode 100644 index 0000000000000..f8b5c9c9de5a2 --- /dev/null +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py @@ -0,0 +1,29 @@ +import uuid +from typing import cast + +from dagster import asset, define_asset_job +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.repository_definition.repository_definition import ( + PendingRepositoryDefinition, +) +from dagster_powerbi import PowerBIWorkspace + +fake_token = uuid.uuid4().hex +resource = PowerBIWorkspace( + api_token=fake_token, + workspace_id="a2122b8f-d7e1-42e8-be2b-a5e636ca3221", +) +pbi_assets = resource.build_assets() + + +@asset +def my_materializable_asset(): + pass + + +pending_repo_from_cached_asset_metadata = cast( + PendingRepositoryDefinition, + Definitions( + assets=[*pbi_assets, my_materializable_asset], jobs=[define_asset_job("all_asset_job")] + ).get_inner_repository(), +) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 2bc9e45c3aecb..8338936dc32ee 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -1,6 +1,12 @@ import uuid +import responses from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository +from dagster._core.events import DagsterEventType +from dagster._core.execution.api import create_execution_plan, execute_plan +from dagster._core.instance_for_test import instance_for_test +from dagster._utils import file_relative_path from dagster_powerbi import PowerBIWorkspace @@ -24,32 +30,75 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: api_token=fake_token, workspace_id=workspace_id, ) - all_asset_specs = resource.build_asset_specs() + cacheable_asset = resource.build_assets()[0] + data = cacheable_asset.compute_cacheable_data() + all_assets = cacheable_asset.build_definitions(data) # 1 dashboard, 1 report, 1 semantic model, 2 data sources - assert len(all_asset_specs) == 5 + assert len(all_assets) == 5 # Sanity check outputs, translator tests cover details here - dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard") - assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] + dashboard_asset = next(asset for asset in all_assets if asset.key.path[0] == "dashboard") + assert dashboard_asset.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] - report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report") - assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] + report_asset = next(asset for asset in all_assets if asset.key.path[0] == "report") + assert report_asset.key.path == ["report", "Sales_Returns_Sample_v201912"] - semantic_model_spec = next( - spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model" + semantic_model_asset = next( + asset for asset in all_assets if asset.key.path[0] == "semantic_model" ) - assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] + assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] - data_source_specs = [ - spec - for spec in all_asset_specs - if spec.key.path[0] not in ("dashboard", "report", "semantic_model") + data_source_assets = [ + asset + for asset in all_assets + if asset.key.path[0] not in ("dashboard", "report", "semantic_model") ] - assert len(data_source_specs) == 2 + assert len(data_source_assets) == 2 - data_source_keys = {spec.key for spec in data_source_specs} + data_source_keys = {spec.key for spec in data_source_assets} assert data_source_keys == { AssetKey(["data_27_09_2019.xlsx"]), AssetKey(["sales_marketing_datas.xlsx"]), } + + +def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None: + with instance_for_test() as instance: + assert len(workspace_data_api_mocks.calls) == 0 + + from dags.pending_repo import pending_repo_from_cached_asset_metadata + + # first, we resolve the repository to generate our cached metadata + repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition() + assert len(workspace_data_api_mocks.calls) == 5 + + # 5 PowerBI external assets, one materializable asset + assert len(repository_def.assets_defs_by_key) == 5 + 1 + + job_def = repository_def.get_job("all_asset_job") + repository_load_data = repository_def.repository_load_data + + recon_repo = ReconstructableRepository.for_file( + file_relative_path(__file__, "pending_repo.py"), + fn_name="pending_repo_from_cached_asset_metadata", + ) + recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") + + execution_plan = create_execution_plan(recon_job, repository_load_data=repository_load_data) + + run = instance.create_run_for_job(job_def=job_def, execution_plan=execution_plan) + + events = execute_plan( + execution_plan=execution_plan, + job=recon_job, + dagster_run=run, + instance=instance, + ) + + assert ( + len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS]) + == 1 + ), "Expected two successful steps" + + assert len(workspace_data_api_mocks.calls) == 5 diff --git a/python_modules/libraries/dagster-powerbi/setup.py b/python_modules/libraries/dagster-powerbi/setup.py index dea63b2a2c1b2..1b46961551fda 100644 --- a/python_modules/libraries/dagster-powerbi/setup.py +++ b/python_modules/libraries/dagster-powerbi/setup.py @@ -1,14 +1,15 @@ +from pathlib import Path +from typing import Dict + from setuptools import find_packages, setup def get_version() -> str: - return "1!0+dev" - # Uncomment when ready to publish - # version: Dict[str, str] = {} - # with open(Path(__file__).parent / "dagster_powerbi/version.py", encoding="utf8") as fp: - # exec(fp.read(), version) + version: Dict[str, str] = {} + with open(Path(__file__).parent / "dagster_powerbi/version.py", encoding="utf8") as fp: + exec(fp.read(), version) - # return version["__version__"] + return version["__version__"] ver = get_version()