diff --git a/gooddata-dbt/gooddata_dbt/args.py b/gooddata-dbt/gooddata_dbt/args.py index c76cd1503..dab15e29b 100644 --- a/gooddata-dbt/gooddata_dbt/args.py +++ b/gooddata-dbt/gooddata_dbt/args.py @@ -140,6 +140,7 @@ def set_dbt_cloud_stats_args(parser: argparse.ArgumentParser) -> None: def parse_arguments(description: str) -> argparse.Namespace: parser = get_parser(description) parser.add_argument("--debug", action="store_true", default=False, help="Increase logging level to DEBUG") + parser.add_argument("--dry-run", action="store_true", default=False, help="Do not call GoodData APIs") set_gooddata_endpoint_args(parser) subparsers = parser.add_subparsers(help="actions") @@ -175,6 +176,7 @@ def parse_arguments(description: str) -> argparse.Namespace: upload_notification = subparsers.add_parser("upload_notification") set_dbt_args(upload_notification) + set_gooddata_upper_case_args(upload_notification) upload_notification.set_defaults(method="upload_notification") deploy_analytics = subparsers.add_parser("deploy_analytics") diff --git a/gooddata-dbt/gooddata_dbt/dbt/tables.py b/gooddata-dbt/gooddata_dbt/dbt/tables.py index e63529922..cfe0e8bc2 100644 --- a/gooddata-dbt/gooddata_dbt/dbt/tables.py +++ b/gooddata-dbt/gooddata_dbt/dbt/tables.py @@ -218,14 +218,19 @@ def read_dbt_models(dbt_catalog: Dict, upper_case: bool, all_model_ids: List[str column.meta.gooddata.upper_case_names() return tables - def set_data_types(self, scan_pdm: CatalogDeclarativeTables) -> None: - for table in self.tables: - scan_table = self.get_scan_table(scan_pdm, table.name) - for column in table.columns.values(): - # dbt does not provide data types in manifest.json - # get it from GoodData scan API - scan_column = self.get_scan_column(scan_table, column.name) - column.data_type = scan_column.data_type + def set_data_types(self, scan_pdm: CatalogDeclarativeTables, dry_run: bool = False) -> None: + if dry_run: + for table in self.tables: + for column in table.columns.values(): + column.data_type = "STRING" + else: + for table in self.tables: + scan_table = self.get_scan_table(scan_pdm, table.name) + for column in table.columns.values(): + # dbt does not provide data types in manifest.json + # get it from GoodData scan API + scan_column = self.get_scan_column(scan_table, column.name) + column.data_type = scan_column.data_type @property def schema_name(self) -> str: diff --git a/gooddata-dbt/gooddata_dbt/dbt_plugin.py b/gooddata-dbt/gooddata_dbt/dbt_plugin.py index a437f01ec..aeb27c0dd 100644 --- a/gooddata-dbt/gooddata_dbt/dbt_plugin.py +++ b/gooddata-dbt/gooddata_dbt/dbt_plugin.py @@ -14,6 +14,7 @@ from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution from gooddata_dbt.dbt.profiles import DbtProfiles from gooddata_dbt.dbt.tables import DbtModelTables +from gooddata_dbt.gooddata.api_wrapper import GoodDataApiWrapper from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct from gooddata_dbt.logger import get_logger from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper @@ -32,7 +33,7 @@ def layout_model_path(data_product: GoodDataConfigProduct) -> Path: def generate_and_put_ldm( logger: logging.Logger, - sdk: GoodDataSdk, + sdk_wrapper: GoodDataSdkWrapper, data_source_id: str, workspace_id: str, dbt_tables: DbtModelTables, @@ -40,22 +41,27 @@ def generate_and_put_ldm( ) -> None: scan_request = CatalogScanModelRequest(scan_tables=True, scan_views=True) logger.info(f"Scan data source {data_source_id=}") - scan_pdm = sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm + scan_pdm = sdk_wrapper.sdk_facade.scan_data_source(data_source_id, scan_request) scan_pdm.store_to_disk(Path("test")) # Store data types to dbt_tables class. It is used in make_declarative_datasets to inject data types to LDM. - dbt_tables.set_data_types(scan_pdm) + dbt_tables.set_data_types(scan_pdm, sdk_wrapper.sdk_facade.dry_run) # Construct GoodData LDM from dbt models declarative_datasets = dbt_tables.make_declarative_datasets(data_source_id, model_ids) ldm = CatalogDeclarativeModel.from_dict({"ldm": declarative_datasets}, camel_case=False) # Deploy logical into target workspace - sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, ldm) + sdk_wrapper.sdk_facade.put_declarative_ldm(workspace_id, ldm) -def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, workspace_title: str) -> None: +def create_workspace( + logger: logging.Logger, + sdk_wrapper: GoodDataSdkWrapper, + workspace_id: str, + workspace_title: str, +) -> None: logger.info(f"Create workspace {workspace_id=} {workspace_title=}") # Create workspaces, if they do not exist yet, otherwise update them workspace = CatalogWorkspace(workspace_id=workspace_id, name=workspace_title) - sdk.catalog_workspace.create_or_update(workspace=workspace) + sdk_wrapper.sdk_facade.create_workspace(workspace=workspace) def deploy_ldm( @@ -70,7 +76,7 @@ def deploy_ldm( dbt_profiles = DbtProfiles(args) data_source_id = dbt_profiles.data_source_id dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids) - generate_and_put_ldm(logger, sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids) + generate_and_put_ldm(logger, sdk_wrapper, data_source_id, workspace_id, dbt_tables, model_ids) workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}" logger.info(f"LDM successfully loaded, verify here: {workspace_url}") @@ -89,12 +95,12 @@ def register_data_source( logger.info(f"Register data source {data_source_id=} schema={dbt_tables.schema_name}") data_source = dbt_target.to_gooddata(data_source_id, dbt_tables.schema_name) - sdk_wrapper.sdk.catalog_data_source.create_or_update_data_source(data_source) + sdk_wrapper.sdk_facade.create_or_update_data_source(data_source) -def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str) -> None: +def upload_notification(logger: logging.Logger, sdk_wrapper: GoodDataSdkWrapper, data_source_id: str) -> None: logger.info(f"Upload notification {data_source_id=}") - sdk.catalog_data_source.register_upload_notification(data_source_id) + sdk_wrapper.sdk_facade.register_upload_notification(data_source_id) def deploy_analytics( @@ -110,7 +116,7 @@ def deploy_analytics( # Deploy analytics model into target workspace logger.info("Load analytics model into GoodData") - sdk_wrapper.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm) + sdk_wrapper.sdk_facade.put_declarative_analytics_model(workspace_id, adm) workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/dashboards/#/workspace/{workspace_id}" logger.info(f"Analytics successfully loaded, verify here: {workspace_url}") @@ -130,20 +136,20 @@ def store_analytics( ) -async def execute_insight(sdk: GoodDataSdk, workspace_id: str, insight: Insight) -> None: - sdk.tables.for_insight(workspace_id, insight) +async def execute_insight(sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, insight: Insight) -> None: + sdk_wrapper.sdk_facade.execute_insight(workspace_id, insight) async def test_insight( logger: logging.Logger, - sdk: GoodDataSdk, + sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, insight: Insight, ) -> dict: logger.info(f"Executing insight {insight.id=} {insight.title=} ...") start = time() try: - await execute_insight(sdk, workspace_id, insight) + await execute_insight(sdk_wrapper, workspace_id, insight) duration = get_duration(start) logger.info(f"Test successful {insight.id=} {insight.title=} duration={duration}(ms)") return {"id": insight.id, "title": insight.title, "duration": duration, "status": "success"} @@ -155,7 +161,7 @@ async def test_insight( async def safe_test_insight( logger: logging.Logger, - sdk: GoodDataSdk, + sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, insight: Insight, semaphore: Semaphore, @@ -163,7 +169,7 @@ async def safe_test_insight( async with semaphore: # semaphore limits num of simultaneous executions return await test_insight( logger, - sdk, + sdk_wrapper, workspace_id, insight, ) @@ -171,21 +177,21 @@ async def safe_test_insight( async def test_insights( logger: logging.Logger, - sdk: GoodDataSdk, + sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, skip_tests: Optional[List[str]], test_insights_parallelism: int = 1, ) -> None: start = time() logger.info(f"Test insights {workspace_id=}") - insights = sdk.insights.get_insights(workspace_id) + insights = sdk_wrapper.sdk_facade.get_insights(workspace_id) semaphore = asyncio.Semaphore(test_insights_parallelism) tasks = [] for insight in insights: if skip_tests is not None and insight.id in skip_tests: logger.info(f"Skip test insight={insight.title} (requested in gooddata.yaml)") else: - tasks.append(safe_test_insight(logger, sdk, workspace_id, insight, semaphore)) + tasks.append(safe_test_insight(logger, sdk_wrapper, workspace_id, insight, semaphore)) results = await asyncio.gather(*tasks) duration = get_duration(start) errors = [result for result in results if result["status"] == "failed"] @@ -195,7 +201,11 @@ async def test_insights( logger.info(f"Test insights finished {workspace_id=} {duration=}(ms)") -def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDataSdk, workspace_id: str) -> None: +def create_localized_workspaces( + data_product: GoodDataConfigProduct, + sdk_facade: GoodDataApiWrapper, + workspace_id: str, +) -> None: if data_product.localization is None: return for to in data_product.localization.to: @@ -205,15 +215,14 @@ def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDa source=data_product.localization.from_language, target=to.language ).translate_batch logging.info(f"create_localized_workspaces layout_root_path={GOODDATA_LAYOUTS_DIR / data_product.id}") - sdk.catalog_workspace.generate_localized_workspaces( + sdk_facade.generate_localized_workspaces( workspace_id, - to_lang=to.language, - to_locale=to.locale, - from_lang=data_product.localization.from_language, + to=to, + data_product=data_product, translator_func=translator_func, + layout_path=GOODDATA_LAYOUTS_DIR / data_product.id, provision_workspace=True, store_layouts=False, - layout_root_path=GOODDATA_LAYOUTS_DIR / data_product.id, ) @@ -329,7 +338,7 @@ def process_organization( if args.method == "upload_notification": dbt_profiles = DbtProfiles(args) # Caches are invalidated only per data source, not per data product - upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id) + upload_notification(logger, sdk_wrapper, dbt_profiles.data_source_id) elif args.method == "register_data_sources": register_data_source(logger, args, gd_config.all_model_ids, sdk_wrapper) else: @@ -345,13 +354,13 @@ def process_organization( workspace_id = f"{data_product.id}_{environment.id}" workspace_title = f"{data_product.name} ({environment.name})" if args.method == "provision_workspaces": - create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title) + create_workspace(logger, sdk_wrapper, workspace_id, workspace_title) elif args.method == "deploy_ldm": deploy_ldm( logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id ) if data_product.localization: - create_localized_workspaces(data_product, sdk_wrapper.sdk, workspace_id) + create_localized_workspaces(data_product, sdk_wrapper.sdk_facade, workspace_id) elif args.method == "store_analytics": store_analytics(logger, sdk_wrapper.sdk, workspace_id, data_product) elif args.method == "deploy_analytics": @@ -359,7 +368,7 @@ def process_organization( elif args.method == "test_insights": parallelism = gd_config.global_properties.test_insights_parallelism or 1 asyncio.run( - test_insights(logger, sdk_wrapper.sdk, workspace_id, data_product.skip_tests, parallelism) + test_insights(logger, sdk_wrapper, workspace_id, data_product.skip_tests, parallelism) ) else: raise Exception(f"Unsupported method requested in args: {args.method}") diff --git a/gooddata-dbt/gooddata_dbt/gooddata/api_wrapper.py b/gooddata-dbt/gooddata_dbt/gooddata/api_wrapper.py new file mode 100644 index 000000000..526c35aee --- /dev/null +++ b/gooddata-dbt/gooddata_dbt/gooddata/api_wrapper.py @@ -0,0 +1,116 @@ +# (C) 2023 GoodData Corporation +import logging +from pathlib import Path +from typing import Any, List, Union + +from gooddata_dbt.gooddata.config import GoodDataConfigLocalizationTo, GoodDataConfigProduct + +from gooddata_sdk import ( + CatalogDataSourcePostgres, + CatalogDataSourceSnowflake, + CatalogDataSourceVertica, + CatalogDeclarativeAnalytics, + CatalogDeclarativeColumn, + CatalogDeclarativeModel, + CatalogDeclarativeTable, + CatalogDeclarativeTables, + CatalogScanModelRequest, + CatalogWorkspace, + GoodDataSdk, + Insight, +) + +DataSource = Union[CatalogDataSourcePostgres, CatalogDataSourceSnowflake, CatalogDataSourceVertica] + + +class GoodDataApiWrapper: + def __init__(self, sdk: GoodDataSdk, logger: logging.Logger, dry_run: bool = False) -> None: + self.sdk = sdk + self.logger = logger + self.dry_run = dry_run + + def get_insights(self, workspace_id: str) -> List[Insight]: + if self.dry_run: + self.logger.info("Dry run - skipping insights listing") + return [] + else: + return self.sdk.insights.get_insights(workspace_id) + + def execute_insight(self, workspace_id: str, insight: Insight) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping insights execution") + else: + self.sdk.tables.for_insight(workspace_id, insight) + + def scan_data_source(self, data_source_id: str, scan_request: CatalogScanModelRequest) -> CatalogDeclarativeTables: + if self.dry_run: + self.logger.info("Dry run - skipping data source scanning") + return CatalogDeclarativeTables( + tables=[ + CatalogDeclarativeTable( + id="dry_run", + type="DATA_SOURCE_TABLE", + path=["table"], + columns=[CatalogDeclarativeColumn(name="dry_run", data_type="STRING")], + ) + ] + ) + else: + return self.sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm + + def put_declarative_ldm(self, workspace_id: str, declarative_ldm: CatalogDeclarativeModel) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping declarative LDM put") + else: + self.sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, declarative_ldm) + + def create_workspace(self, workspace: CatalogWorkspace) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping workspace creation") + else: + self.sdk.catalog_workspace.create_or_update(workspace=workspace) + + def create_or_update_data_source(self, data_source: DataSource) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping data source creation") + else: + self.sdk.catalog_data_source.create_or_update_data_source(data_source) + + def register_upload_notification(self, data_source_id: str) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping upload notification registration") + else: + self.sdk.catalog_data_source.register_upload_notification(data_source_id) + + def put_declarative_analytics_model(self, workspace_id: str, adm: CatalogDeclarativeAnalytics) -> None: + if self.dry_run: + self.logger.info("Dry run - skipping declarative analytics model put") + else: + self.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm) + + def generate_localized_workspaces( + self, + workspace_id: str, + to: GoodDataConfigLocalizationTo, + data_product: GoodDataConfigProduct, + translator_func: Any, + layout_path: Path, + provision_workspace: bool = True, + store_layouts: bool = False, + ) -> None: + from_language = "en" + if data_product.localization is not None: + from_language = data_product.localization.from_language + if self.dry_run: + self.logger.info("Dry run - skipping localized workspaces generation") + else: + self.sdk.catalog_workspace.generate_localized_workspaces( + workspace_id, + to_lang=to.language, + to_locale=to.locale, + from_lang=from_language, + translator_func=translator_func, + layout_root_path=layout_path, + provision_workspace=provision_workspace, + store_layouts=store_layouts, + ) diff --git a/gooddata-dbt/gooddata_dbt/sdk_wrapper.py b/gooddata-dbt/gooddata_dbt/sdk_wrapper.py index cd06113eb..458bdb368 100644 --- a/gooddata-dbt/gooddata_dbt/sdk_wrapper.py +++ b/gooddata-dbt/gooddata_dbt/sdk_wrapper.py @@ -3,6 +3,8 @@ from logging import Logger from typing import List, Optional +from gooddata_dbt.gooddata.api_wrapper import GoodDataApiWrapper + from gooddata_sdk import GoodDataSdk @@ -16,7 +18,9 @@ def __init__( self.timeout = timeout self.profile = profile self.sdk = self.create_sdk() - self.wait_for_gooddata_is_up(self.timeout) + self.sdk_facade = self.create_sdk_facade() + if not self.args.dry_run: + self.wait_for_gooddata_is_up(self.timeout) def get_host_from_sdk(self) -> Optional[str]: # TODO - make _hostname public in gooddata_sdk @@ -39,6 +43,9 @@ def create_sdk(self) -> GoodDataSdk: sdk = GoodDataSdk.create(host_=host, token_=token, **kwargs) return sdk + def create_sdk_facade(self) -> GoodDataApiWrapper: + return GoodDataApiWrapper(self.sdk, self.logger, self.args.dry_run) + def wait_for_gooddata_is_up(self, timeout: int) -> None: # Wait for the GoodData.CN docker image to start up or prevent hiccups of cloud deployments # We have to take hostname from sdk.client, because it can also be collected from profiles.yml file