Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-powerbi] Setup powerbi package for release process #24121

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from dagster_powerbi.resource import PowerBIWorkspace as PowerBIWorkspace
from dagster_powerbi.translator import DagsterPowerBITranslator as DagsterPowerBITranslator

# Move back to version.py and edit setup.py once we are ready to publish.
__version__ = "1!0+dev"
from dagster_powerbi.version import __version__

DagsterLibraryRegistry.register("dagster-powerbi", __version__)
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from typing import Any, Dict, Sequence, Type

import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster import (
AssetsDefinition,
ConfigurableResource,
Definitions,
_check as check,
external_assets_from_specs,
)
from dagster._annotations import public
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._utils.cached_method import cached_method
from pydantic import Field

Expand All @@ -16,6 +26,7 @@
BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/"


@public
class PowerBIWorkspace(ConfigurableResource):
"""Represents a workspace in PowerBI and provides utilities
to interact with the PowerBI API.
Expand Down Expand Up @@ -112,32 +123,77 @@ def fetch_powerbi_workspace_data(
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={
dataset.properties["id"]: dataset for dataset in semantic_models
},
data_sources_by_id=data_sources_by_id,
return PowerBIWorkspaceData.from_content_data(
self.workspace_id,
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
)

def build_asset_specs(
def build_assets(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
dagster_powerbi_translator: Type[DagsterPowerBITranslator],
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
the workspace and translates it into AssetSpecs, using the provided translator.

Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.

Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Power BI content.
"""
workspace_data = self.fetch_powerbi_workspace_data()
translator = dagster_powerbi_translator(context=workspace_data)
return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)]

@public
def build_defs(
self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator
) -> Definitions:
"""Returns a Definitions object which will load Power BI content from
the workspace and translate it into assets, using the provided translator.

Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.

Returns:
Definitions: A Definitions object which will build and return the Power BI content.
"""
return Definitions(
assets=self.build_assets(dagster_powerbi_translator=dagster_powerbi_translator)
)


class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]):
self._workspace = workspace
self._translator_cls = translator
super().__init__(unique_id=self._workspace.workspace_id)

def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
workspace_data: PowerBIWorkspaceData = self._workspace.fetch_powerbi_workspace_data()
return [
AssetsDefinitionCacheableData(extra_metadata=data.to_cached_data())
for data in [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]
]

def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
) -> Sequence[AssetsDefinition]:
workspace_data = PowerBIWorkspaceData.from_content_data(
self._workspace.workspace_id,
[
PowerBIContentData.from_cached_data(check.not_none(entry.extra_metadata))
for entry in data
],
)

translator = self._translator_cls(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
Expand All @@ -146,4 +202,6 @@ def build_asset_specs(
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
return external_assets_from_specs(
[translator.get_asset_spec(content) for content in all_content]
)
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import re
import urllib.parse
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Mapping, Sequence

from dagster import _check as check
from dagster._annotations import public
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record

POWERBI_PREFIX = "powerbi/"


def _get_last_filepath_component(path: str) -> str:
"""Returns the last component of a file path."""
Expand Down Expand Up @@ -42,6 +45,16 @@ class PowerBIContentData:
content_type: PowerBIContentType
properties: Dict[str, Any]

def to_cached_data(self) -> Dict[str, Any]:
return {"content_type": self.content_type.value, "properties": self.properties}

@classmethod
def from_cached_data(cls, data: Mapping[Any, Any]) -> "PowerBIContentData":
return cls(
content_type=PowerBIContentType(data["content_type"]),
properties=data["properties"],
)


@record
class PowerBIWorkspaceData:
Expand All @@ -50,12 +63,42 @@ class PowerBIWorkspaceData:
Provided as context for the translator so that it can resolve dependencies between content.
"""

workspace_id: str
dashboards_by_id: Dict[str, PowerBIContentData]
reports_by_id: Dict[str, PowerBIContentData]
semantic_models_by_id: Dict[str, PowerBIContentData]
data_sources_by_id: Dict[str, PowerBIContentData]

@classmethod
def from_content_data(
cls, workspace_id: str, content_data: Sequence[PowerBIContentData]
) -> "PowerBIWorkspaceData":
return cls(
workspace_id=workspace_id,
dashboards_by_id={
dashboard.properties["id"]: dashboard
for dashboard in content_data
if dashboard.content_type == PowerBIContentType.DASHBOARD
},
reports_by_id={
report.properties["id"]: report
for report in content_data
if report.content_type == PowerBIContentType.REPORT
},
semantic_models_by_id={
dataset.properties["id"]: dataset
for dataset in content_data
if dataset.content_type == PowerBIContentType.SEMANTIC_MODEL
},
data_sources_by_id={
data_source.properties["datasourceId"]: data_source
for data_source in content_data
if data_source.content_type == PowerBIContentType.DATA_SOURCE
},
)


@public
class DagsterPowerBITranslator:
"""Translator class which converts raw response data from the PowerBI API into AssetSpecs.
Subclass this class to implement custom logic for each type of PowerBI content.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "1!0+dev"
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@
]


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data",
)
def workspace_data_fixture() -> PowerBIWorkspaceData:
def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
sample_dash = SAMPLE_DASH.copy()
# Response from tiles API, which we add to the dashboard data
sample_dash["tiles"] = SAMPLE_DASH_TILES
Expand All @@ -90,6 +95,7 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]

return PowerBIWorkspaceData(
workspace_id=workspace_id,
dashboards_by_id={
sample_dash["id"]: PowerBIContentData(
content_type=PowerBIContentType.DASHBOARD, properties=sample_dash
Expand All @@ -114,15 +120,10 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
)


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
Expand Down Expand Up @@ -159,4 +160,4 @@ def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
status=200,
)

yield
yield response
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import uuid
from typing import cast

from dagster import asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition.repository_definition import (
PendingRepositoryDefinition,
)
from dagster_powerbi import PowerBIWorkspace

fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token,
workspace_id="a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
)
pbi_defs = resource.build_defs()


@asset
def my_materializable_asset():
pass


pending_repo_from_cached_asset_metadata = cast(
PendingRepositoryDefinition,
Definitions.merge(
Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]),
pbi_defs,
).get_inner_repository(),
)
Loading