Skip to content

Commit

Permalink
move methods to workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 7, 2024
1 parent 8671933 commit cd2772e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster._core.libraries import DagsterLibraryRegistry

from .asset_specs import build_powerbi_asset_specs as build_powerbi_asset_specs
from .resource import PowerBiWorkspace as PowerBiWorkspace
from .translator import DagsterPowerBITranslator as DagsterPowerBITranslator

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from typing import Any, Dict
from typing import Any, Dict, Sequence, Type

import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._utils.cached_method import cached_method
from pydantic import Field

from .translator import (
DagsterPowerBITranslator,
PowerBIContentData,
PowerBIContentType,
PowerBIWorkspaceData,
)

BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/"


Expand Down Expand Up @@ -67,3 +75,79 @@ def get_dashboard_tiles(
including which reports back each tile.
"""
return self.fetch_json(f"dashboards/{dashboard_id}/tiles")

def fetch_powerbi_workspace_data(
self,
) -> PowerBIWorkspaceData:
"""Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data.
Returns:
PowerBIWorkspaceData: A snapshot of the Power BI workspace's content.
"""
dashboard_data = self.get_dashboards()["value"]
augmented_dashboard_data = [
{**dashboard, "tiles": self.get_dashboard_tiles(dashboard["id"])}
for dashboard in dashboard_data
]
dashboards = [
PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data)
for data in augmented_dashboard_data
]

reports = [
PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data)
for data in self.get_reports()["value"]
]
semantic_models_data = self.get_semantic_models()["value"]
data_sources_by_id = {}
for dataset in semantic_models_data:
dataset_sources = self.get_semantic_model_sources(dataset["id"])["value"]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources]
for data_source in dataset_sources:
data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=data_source
)
semantic_models = [
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={
dataset.properties["id"]: dataset for dataset in semantic_models
},
data_sources_by_id=data_sources_by_id,
)

def build_asset_specs(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
powerbi_workspace (PowerBiWorkspace): The Power BI resource to use to fetch the data.
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.
Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
"""
workspace_data = self.fetch_powerbi_workspace_data()
translator = dagster_powerbi_translator(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,21 @@ def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_
api_token=fake_token,
workspace_id=workspace_id,
)
from dagster_powerbi.asset_specs import fetch_powerbi_workspace_data

actual_workspace_data = fetch_powerbi_workspace_data(powerbi_workspace=resource)
actual_workspace_data = resource.fetch_powerbi_workspace_data()
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
assert len(actual_workspace_data.data_sources_by_id) == 2


def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None:
from dagster_powerbi.asset_specs import build_powerbi_asset_specs

fake_token = uuid.uuid4().hex
resource = PowerBiWorkspace(
api_token=fake_token,
workspace_id=workspace_id,
)
all_asset_specs = build_powerbi_asset_specs(powerbi_workspace=resource)
all_asset_specs = resource.build_asset_specs()

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_asset_specs) == 5
Expand Down

0 comments on commit cd2772e

Please sign in to comment.