Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6/n][dagster-powerbi] use asset caching logic for powerbi assets #23486

Merged
merged 4 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from typing import Any, Dict, Sequence, Type

import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster import (
AssetsDefinition,
ConfigurableResource,
Definitions,
_check as check,
external_assets_from_specs,
)
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._utils.cached_method import cached_method
from pydantic import Field

Expand Down Expand Up @@ -112,32 +121,76 @@ def fetch_powerbi_workspace_data(
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={
dataset.properties["id"]: dataset for dataset in semantic_models
},
data_sources_by_id=data_sources_by_id,
return PowerBIWorkspaceData.from_content_data(
self.workspace_id,
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
)

def build_asset_specs(
def build_assets(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
dagster_powerbi_translator: Type[DagsterPowerBITranslator],
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
"""Returns a set of CacheableAssetsDefinitions which will load Power BI content from

the workspace and translates it into AssetSpecs, using the provided translator.

Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.

Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Power BI content.
"""
workspace_data = self.fetch_powerbi_workspace_data()
translator = dagster_powerbi_translator(context=workspace_data)
return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)]

def build_defs(
self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator
) -> Definitions:
"""Returns a Definitions object which will load Power BI content from
the workspace and translate it into assets, using the provided translator.

Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.

Returns:
Definitions: A Definitions object which will build and return the Power BI content.
"""
return Definitions(
assets=self.build_assets(dagster_powerbi_translator=dagster_powerbi_translator)
)


class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]):
self._workspace = workspace
self._translator_cls = translator
super().__init__(unique_id=self._workspace.workspace_id)

def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
workspace_data: PowerBIWorkspaceData = self._workspace.fetch_powerbi_workspace_data()
return [
AssetsDefinitionCacheableData(extra_metadata=data.to_cached_data())
for data in [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]
]

def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
) -> Sequence[AssetsDefinition]:
workspace_data = PowerBIWorkspaceData.from_content_data(
self._workspace.workspace_id,
[
PowerBIContentData.from_cached_data(check.not_none(entry.extra_metadata))
for entry in data
],
)

translator = self._translator_cls(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
Expand All @@ -146,4 +199,6 @@ def build_asset_specs(
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
return external_assets_from_specs(
[translator.get_asset_spec(content) for content in all_content]
)
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import re
import urllib.parse
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Mapping, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record

POWERBI_PREFIX = "powerbi/"


def _get_last_filepath_component(path: str) -> str:
"""Returns the last component of a file path."""
Expand Down Expand Up @@ -42,6 +44,16 @@ class PowerBIContentData:
content_type: PowerBIContentType
properties: Dict[str, Any]

def to_cached_data(self) -> Dict[str, Any]:
return {"content_type": self.content_type.value, "properties": self.properties}

@classmethod
def from_cached_data(cls, data: Mapping[Any, Any]) -> "PowerBIContentData":
return cls(
content_type=PowerBIContentType(data["content_type"]),
properties=data["properties"],
)


@record
class PowerBIWorkspaceData:
Expand All @@ -50,11 +62,40 @@ class PowerBIWorkspaceData:
Provided as context for the translator so that it can resolve dependencies between content.
"""

workspace_id: str
dashboards_by_id: Dict[str, PowerBIContentData]
reports_by_id: Dict[str, PowerBIContentData]
semantic_models_by_id: Dict[str, PowerBIContentData]
data_sources_by_id: Dict[str, PowerBIContentData]

@classmethod
def from_content_data(
cls, workspace_id: str, content_data: Sequence[PowerBIContentData]
) -> "PowerBIWorkspaceData":
return cls(
workspace_id=workspace_id,
dashboards_by_id={
dashboard.properties["id"]: dashboard
for dashboard in content_data
if dashboard.content_type == PowerBIContentType.DASHBOARD
},
reports_by_id={
report.properties["id"]: report
for report in content_data
if report.content_type == PowerBIContentType.REPORT
},
semantic_models_by_id={
dataset.properties["id"]: dataset
for dataset in content_data
if dataset.content_type == PowerBIContentType.SEMANTIC_MODEL
},
data_sources_by_id={
data_source.properties["datasourceId"]: data_source
for data_source in content_data
if data_source.content_type == PowerBIContentType.DATA_SOURCE
},
)


class DagsterPowerBITranslator:
"""Translator class which converts raw response data from the PowerBI API into AssetSpecs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@
]


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data",
)
def workspace_data_fixture() -> PowerBIWorkspaceData:
def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
sample_dash = SAMPLE_DASH.copy()
# Response from tiles API, which we add to the dashboard data
sample_dash["tiles"] = SAMPLE_DASH_TILES
Expand All @@ -90,6 +95,7 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]

return PowerBIWorkspaceData(
workspace_id=workspace_id,
dashboards_by_id={
sample_dash["id"]: PowerBIContentData(
content_type=PowerBIContentType.DASHBOARD, properties=sample_dash
Expand All @@ -114,15 +120,10 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
)


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
Expand Down Expand Up @@ -159,4 +160,4 @@ def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
status=200,
)

yield
yield response
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import uuid
from typing import cast

from dagster import asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition.repository_definition import (
PendingRepositoryDefinition,
)
from dagster_powerbi import PowerBIWorkspace

fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token,
workspace_id="a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
)
pbi_defs = resource.build_defs()


@asset
def my_materializable_asset():
pass


pending_repo_from_cached_asset_metadata = cast(
PendingRepositoryDefinition,
Definitions.merge(
Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]),
pbi_defs,
).get_inner_repository(),
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import uuid

import responses
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository
from dagster._core.events import DagsterEventType
from dagster._core.execution.api import create_execution_plan, execute_plan
from dagster._core.instance_for_test import instance_for_test
from dagster._utils import file_relative_path
from dagster_powerbi import PowerBIWorkspace


Expand All @@ -24,32 +30,73 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
api_token=fake_token,
workspace_id=workspace_id,
)
all_asset_specs = resource.build_asset_specs()
all_assets = resource.build_defs().get_asset_graph().assets_defs

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_asset_specs) == 5
assert len(all_assets) == 5

# Sanity check outputs, translator tests cover details here
dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard")
assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
dashboard_asset = next(asset for asset in all_assets if asset.key.path[0] == "dashboard")
assert dashboard_asset.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]

report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report")
assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
report_asset = next(asset for asset in all_assets if asset.key.path[0] == "report")
assert report_asset.key.path == ["report", "Sales_Returns_Sample_v201912"]

semantic_model_spec = next(
spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model"
semantic_model_asset = next(
asset for asset in all_assets if asset.key.path[0] == "semantic_model"
)
assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]

data_source_specs = [
spec
for spec in all_asset_specs
if spec.key.path[0] not in ("dashboard", "report", "semantic_model")
data_source_assets = [
asset
for asset in all_assets
if asset.key.path[0] not in ("dashboard", "report", "semantic_model")
]
assert len(data_source_specs) == 2
assert len(data_source_assets) == 2

data_source_keys = {spec.key for spec in data_source_specs}
data_source_keys = {spec.key for spec in data_source_assets}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
}


def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None:
with instance_for_test() as instance:
assert len(workspace_data_api_mocks.calls) == 0

from dagster_powerbi_tests.pending_repo import pending_repo_from_cached_asset_metadata

# first, we resolve the repository to generate our cached metadata
repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition()
assert len(workspace_data_api_mocks.calls) == 5

# 5 PowerBI external assets, one materializable asset
assert len(repository_def.assets_defs_by_key) == 5 + 1

job_def = repository_def.get_job("all_asset_job")
repository_load_data = repository_def.repository_load_data

recon_repo = ReconstructableRepository.for_file(
file_relative_path(__file__, "pending_repo.py"),
fn_name="pending_repo_from_cached_asset_metadata",
)
recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job")

execution_plan = create_execution_plan(recon_job, repository_load_data=repository_load_data)

run = instance.create_run_for_job(job_def=job_def, execution_plan=execution_plan)

events = execute_plan(
execution_plan=execution_plan,
job=recon_job,
dagster_run=run,
instance=instance,
)

assert (
len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS])
== 1
), "Expected two successful steps"

assert len(workspace_data_api_mocks.calls) == 5