diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index d0b56d7756052..307d903d7e725 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -1,14 +1,20 @@ from typing import Any, Dict, Sequence, Type import requests -from dagster import ConfigurableResource -from dagster._core.definitions.asset_spec import AssetSpec -from dagster._core.definitions.cacheable_assets import extract_from_current_repository_load_data +from dagster import ( + AssetsDefinition, + ConfigurableResource, + _check as check, + external_assets_from_specs, +) +from dagster._core.definitions.cacheable_assets import ( + AssetsDefinitionCacheableData, + CacheableAssetsDefinition, +) from dagster._utils.cached_method import cached_method from pydantic import Field from dagster_powerbi.translator import ( - POWERBI_PREFIX, DagsterPowerBITranslator, PowerBIContentData, PowerBIContentType, @@ -119,33 +125,54 @@ def fetch_powerbi_workspace_data( dashboards + reports + semantic_models + list(data_sources_by_id.values()), ) - def build_asset_specs( + def build_assets( self, dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator, - ) -> Sequence[AssetSpec]: - """Fetches Power BI content from the workspace and translates it into AssetSpecs, - using the provided translator. - Future work will cache this data to avoid repeated calls to the Power BI API. + ) -> Sequence[CacheableAssetsDefinition]: + """Returns a set of CacheableAssetsDefinition which will load Power BI content from + the workspace and translates it into AssetSpecs, using the provided translator. Args: dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator. Returns: - Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content. + Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which + will load the Power BI content. """ - cached_workspace_data = extract_from_current_repository_load_data( - f"{POWERBI_PREFIX}{self.workspace_id}" + return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)] + + +class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition): + def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]): + self._workspace = workspace + self._translator_cls = translator + super().__init__(unique_id=self._workspace.workspace_id) + + def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: + workspace_data: PowerBIWorkspaceData = self._workspace.fetch_powerbi_workspace_data() + return [ + AssetsDefinitionCacheableData(extra_metadata=data.to_cached_data()) + for data in [ + *workspace_data.dashboards_by_id.values(), + *workspace_data.reports_by_id.values(), + *workspace_data.semantic_models_by_id.values(), + *workspace_data.data_sources_by_id.values(), + ] + ] + + def build_definitions( + self, data: Sequence[AssetsDefinitionCacheableData] + ) -> Sequence[AssetsDefinition]: + workspace_data = PowerBIWorkspaceData.from_content_data( + self._workspace.workspace_id, + [ + PowerBIContentData.from_cached_data(check.not_none(entry.extra_metadata)) + for entry in data + ], ) - if cached_workspace_data: - workspace_data = PowerBIWorkspaceData.from_content_data( - self.workspace_id, - [PowerBIContentData.from_cached_data(data) for data in cached_workspace_data], - ) - else: - workspace_data = self.fetch_powerbi_workspace_data() - translator = dagster_powerbi_translator(context=workspace_data) + translator = self._translator_cls(context=workspace_data) all_content = [ *workspace_data.dashboards_by_id.values(), @@ -154,4 +181,6 @@ def build_asset_specs( *workspace_data.data_sources_by_id.values(), ] - return [translator.get_asset_spec(content) for content in all_content] + return external_assets_from_specs( + [translator.get_asset_spec(content) for content in all_content] + ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 55ea7c7e55904..1c81b0c6a3ec4 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -6,10 +6,6 @@ from dagster import _check as check from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec -from dagster._core.definitions.cacheable_assets import ( - CACHED_ASSET_ID_KEY, - CACHED_ASSET_METADATA_KEY, -) from dagster._record import record POWERBI_PREFIX = "powerbi/" @@ -143,10 +139,6 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_dashboard_asset_key(data), tags={"dagster/storage_kind": "powerbi"}, deps=report_keys, - metadata={ - CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}", - CACHED_ASSET_METADATA_KEY: data.to_cached_data(), - }, ) def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -161,10 +153,6 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_report_asset_key(data), deps=[dataset_key] if dataset_key else None, tags={"dagster/storage_kind": "powerbi"}, - metadata={ - CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}", - CACHED_ASSET_METADATA_KEY: data.to_cached_data(), - }, ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -181,10 +169,6 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: key=self.get_semantic_model_asset_key(data), tags={"dagster/storage_kind": "powerbi"}, deps=source_keys, - metadata={ - CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}", - CACHED_ASSET_METADATA_KEY: data.to_cached_data(), - }, ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: @@ -203,8 +187,4 @@ def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: return AssetSpec( key=self.get_data_source_asset_key(data), tags={"dagster/storage_kind": "powerbi"}, - metadata={ - CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}", - CACHED_ASSET_METADATA_KEY: data.to_cached_data(), - }, ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py index 9b668e9c096d6..f8b5c9c9de5a2 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/pending_repo.py @@ -1,7 +1,7 @@ import uuid from typing import cast -from dagster import asset, define_asset_job, external_assets_from_specs +from dagster import asset, define_asset_job from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.repository_definition.repository_definition import ( PendingRepositoryDefinition, @@ -13,9 +13,7 @@ api_token=fake_token, workspace_id="a2122b8f-d7e1-42e8-be2b-a5e636ca3221", ) -all_asset_specs = resource.build_asset_specs() - -assets = external_assets_from_specs(all_asset_specs) +pbi_assets = resource.build_assets() @asset @@ -26,6 +24,6 @@ def my_materializable_asset(): pending_repo_from_cached_asset_metadata = cast( PendingRepositoryDefinition, Definitions( - assets=[*assets, my_materializable_asset], jobs=[define_asset_job("all_asset_job")] + assets=[*pbi_assets, my_materializable_asset], jobs=[define_asset_job("all_asset_job")] ).get_inner_repository(), ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 5fb4fdc17b6cc..8338936dc32ee 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -30,31 +30,33 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: api_token=fake_token, workspace_id=workspace_id, ) - all_asset_specs = resource.build_asset_specs() + cacheable_asset = resource.build_assets()[0] + data = cacheable_asset.compute_cacheable_data() + all_assets = cacheable_asset.build_definitions(data) # 1 dashboard, 1 report, 1 semantic model, 2 data sources - assert len(all_asset_specs) == 5 + assert len(all_assets) == 5 # Sanity check outputs, translator tests cover details here - dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard") - assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] + dashboard_asset = next(asset for asset in all_assets if asset.key.path[0] == "dashboard") + assert dashboard_asset.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] - report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report") - assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] + report_asset = next(asset for asset in all_assets if asset.key.path[0] == "report") + assert report_asset.key.path == ["report", "Sales_Returns_Sample_v201912"] - semantic_model_spec = next( - spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model" + semantic_model_asset = next( + asset for asset in all_assets if asset.key.path[0] == "semantic_model" ) - assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] + assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] - data_source_specs = [ - spec - for spec in all_asset_specs - if spec.key.path[0] not in ("dashboard", "report", "semantic_model") + data_source_assets = [ + asset + for asset in all_assets + if asset.key.path[0] not in ("dashboard", "report", "semantic_model") ] - assert len(data_source_specs) == 2 + assert len(data_source_assets) == 2 - data_source_keys = {spec.key for spec in data_source_specs} + data_source_keys = {spec.key for spec in data_source_assets} assert data_source_keys == { AssetKey(["data_27_09_2019.xlsx"]), AssetKey(["sales_marketing_datas.xlsx"]), @@ -67,10 +69,9 @@ def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMoc from dags.pending_repo import pending_repo_from_cached_asset_metadata - assert len(workspace_data_api_mocks.calls) == 5 - # first, we resolve the repository to generate our cached metadata repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition() + assert len(workspace_data_api_mocks.calls) == 5 # 5 PowerBI external assets, one materializable asset assert len(repository_def.assets_defs_by_key) == 5 + 1