From eb7c7fd8d2e55c3c1a5dfc4ef3f35744e42906ce Mon Sep 17 00:00:00 2001 From: Jacek Date: Thu, 12 Oct 2023 16:21:12 +0200 Subject: [PATCH] TRIVIAL: gooddata-dbt - allow running against multiple organizations Read them from ~/.gooddata/profiles.yaml Plus refactor dbt_plugin.py, unify interation over products/envs --- gooddata-dbt/gooddata_dbt/args.py | 9 ++ gooddata-dbt/gooddata_dbt/dbt/profiles.py | 4 + gooddata-dbt/gooddata_dbt/dbt_plugin.py | 155 ++++++++++--------- gooddata-dbt/gooddata_dbt/gooddata/config.py | 7 + gooddata-dbt/gooddata_dbt/sdk_wrapper.py | 51 +++--- gooddata-dbt/gooddata_example.yml | 20 ++- 6 files changed, 147 insertions(+), 99 deletions(-) diff --git a/gooddata-dbt/gooddata_dbt/args.py b/gooddata-dbt/gooddata_dbt/args.py index d52264063..18d648594 100644 --- a/gooddata-dbt/gooddata_dbt/args.py +++ b/gooddata-dbt/gooddata_dbt/args.py @@ -35,6 +35,15 @@ def set_gooddata_endpoint_args(parser: argparse.ArgumentParser) -> None: "When you connect to different hostname than where GoodData is running(proxies)", default=os.getenv("GOODDATA_OVERRIDE_HOST"), ) + # Alternative - use profile.yml file + parser.add_argument( + "-gp", + "--gooddata-profiles", + nargs="*", + help="Profiles in profile.yml file. Overrides gooddata-host, gooddata-token and gooddata-override-host." + + "You can use multiple profiles separated by space to deliver models/analytics to multiple organizations.", + default=os.getenv("GOODDATA_PROFILES", None), + ) def set_environment_id_arg(parser: argparse.ArgumentParser) -> None: diff --git a/gooddata-dbt/gooddata_dbt/dbt/profiles.py b/gooddata-dbt/gooddata_dbt/dbt/profiles.py index 266df04c2..786c3ef61 100644 --- a/gooddata-dbt/gooddata_dbt/dbt/profiles.py +++ b/gooddata-dbt/gooddata_dbt/dbt/profiles.py @@ -182,3 +182,7 @@ def target(self) -> DbtOutput: if output.name == self.args.target: return output raise ValueError(f"Target {self.args.target} not found in {self.profile.outputs}.") + + @property + def data_source_id(self) -> str: + return f"{self.args.profile}-{self.target.name}" diff --git a/gooddata-dbt/gooddata_dbt/dbt_plugin.py b/gooddata-dbt/gooddata_dbt/dbt_plugin.py index 8a742f9d7..1eb5d8bd7 100644 --- a/gooddata-dbt/gooddata_dbt/dbt_plugin.py +++ b/gooddata-dbt/gooddata_dbt/dbt_plugin.py @@ -5,7 +5,7 @@ from argparse import Namespace from pathlib import Path from time import time -from typing import List, Optional +from typing import Dict, List, Optional import tabulate import yaml @@ -13,7 +13,7 @@ from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution from gooddata_dbt.dbt.profiles import DbtOutput, DbtProfiles from gooddata_dbt.dbt.tables import DbtModelTables -from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigProduct +from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct from gooddata_dbt.logger import get_logger from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper from gooddata_dbt.utils import report_message_to_merge_request @@ -28,9 +28,6 @@ GOODDATA_LAYOUTS_DIR = Path("gooddata_layouts") -# TODO -# Tests, ... - def layout_model_path(data_product: GoodDataConfigProduct) -> Path: return GOODDATA_LAYOUTS_DIR / data_product.id @@ -80,18 +77,37 @@ def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str sdk.catalog_workspace.create_or_update(workspace=workspace) +DATA_SOURCE_CONTAINER: Dict[str, DbtModelTables] = {} + + def deploy_ldm( logger: logging.Logger, - sdk: GoodDataSdk, args: Namespace, - data_source_id: str, - dbt_tables: DbtModelTables, + all_model_ids: List[str], + sdk_wrapper: GoodDataSdkWrapper, model_ids: Optional[List[str]], workspace_id: str, ) -> None: + global DATA_SOURCE_CONTAINER logger.info("Generate and put LDM") - generate_and_put_ldm(sdk, data_source_id, workspace_id, dbt_tables, model_ids) - workspace_url = f"{args.gooddata_host}/modeler/#/{workspace_id}" + dbt_profiles = DbtProfiles(args) + dbt_target = dbt_profiles.target + data_source_id = dbt_profiles.data_source_id + # Parse dbt models only once and scan data source only once, not for each product/environment + dbt_tables = DATA_SOURCE_CONTAINER.get(data_source_id) + if dbt_tables is None: + logger.info(f"Process data source {data_source_id=}") + dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids) + if args.gooddata_upper_case: + dbt_target.schema = dbt_target.schema.upper() + dbt_target.database = dbt_target.database.upper() + register_data_source(logger, sdk_wrapper.sdk, data_source_id, dbt_target, dbt_tables) + DATA_SOURCE_CONTAINER[data_source_id] = dbt_tables + else: + logger.info(f"Data source already processed {data_source_id=} table_count={len(dbt_tables.tables)}") + + generate_and_put_ldm(sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids) + workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}" logger.info(f"LDM successfully loaded, verify here: {workspace_url}") @@ -101,18 +117,21 @@ def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id def deploy_analytics( - logger: logging.Logger, sdk: GoodDataSdk, args: Namespace, workspace_id: str, data_product: GoodDataConfigProduct + logger: logging.Logger, + sdk_wrapper: GoodDataSdkWrapper, + workspace_id: str, + data_product: GoodDataConfigProduct, ) -> None: logger.info(f"Deploy analytics {workspace_id=}") logger.info("Read analytics model from disk") - adm = sdk.catalog_workspace_content.load_analytics_model_from_disk(layout_model_path(data_product)) + adm = sdk_wrapper.sdk.catalog_workspace_content.load_analytics_model_from_disk(layout_model_path(data_product)) # Deploy analytics model into target workspace logger.info("Load analytics model into GoodData") - sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm) + sdk_wrapper.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm) - workspace_url = f"{args.gooddata_host}/dashboards/#/workspace/{workspace_id}" + workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/dashboards/#/workspace/{workspace_id}" logger.info(f"Analytics successfully loaded, verify here: {workspace_url}") @@ -166,34 +185,6 @@ def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDa ) -def deploy_models( - gooddata_upper_case: bool, - gooddata_environment_id: str, - logger: logging.Logger, - dbt_target: DbtOutput, - gd_config: GoodDataConfig, - sdk: GoodDataSdk, - args: Namespace, - data_source_id: str, -) -> None: - dbt_tables = DbtModelTables.from_local(gooddata_upper_case, gd_config.all_model_ids) - if gooddata_upper_case: - dbt_target.schema = dbt_target.schema.upper() - dbt_target.database = dbt_target.database.upper() - register_data_source(logger, sdk, data_source_id, dbt_target, dbt_tables) - for data_product in gd_config.data_products: - logger.info(f"Process product name={data_product.name}") - environments = gd_config.get_environment_workspaces(data_product.environment_setup_id) - for environment in environments: - if environment.id == gooddata_environment_id: - workspace_id = f"{data_product.id}_{environment.id}" - workspace_title = f"{data_product.name} ({environment.name})" - create_workspace(logger, sdk, workspace_id, workspace_title) - deploy_ldm(logger, sdk, args, data_source_id, dbt_tables, data_product.model_ids, workspace_id) - if data_product.localization: - create_localized_workspaces(data_product, sdk, workspace_id) - - def get_table(data: List[list], headers: List[str], fmt: str) -> str: return tabulate.tabulate(data, headers=headers, tablefmt=fmt) @@ -296,49 +287,69 @@ def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[s dbt_cloud_stats(args, logger, all_model_ids, environment_id) -def main() -> None: - args = parse_arguments("gooddata-dbt plugin for models management and invalidating caches(upload notification)") - logger = get_logger("gooddata-dbt", args.debug) - logger.info("Start") - sdk = GoodDataSdkWrapper(args, logger).sdk - with open(args.gooddata_config) as fp: - gd_config = GoodDataConfig.from_dict(yaml.safe_load(fp)) - +def process_organization( + args: Namespace, + logger: logging.Logger, + sdk_wrapper: GoodDataSdkWrapper, + gd_config: GoodDataConfig, + organization: Optional[GoodDataConfigOrganization] = None, +) -> None: if args.method == "dbt_cloud_run": dbt_cloud_run(args, logger, gd_config.all_model_ids) elif args.method == "dbt_cloud_stats": dbt_cloud_stats(args, logger, gd_config.all_model_ids, args.environment_id) - elif args.method in ["upload_notification", "deploy_models"]: - dbt_target = DbtProfiles(args).target - data_source_id = f"{args.profile}-{dbt_target.name}" - if args.method == "upload_notification": - # Caches are invalidated only per data source, not per data product - upload_notification(logger, sdk, data_source_id) - else: - deploy_models( - args.gooddata_upper_case, - args.gooddata_environment_id, - logger, - dbt_target, - gd_config, - sdk, - args, - data_source_id, - ) + elif args.method == "upload_notification": + dbt_profiles = DbtProfiles(args) + # Caches are invalidated only per data source, not per data product + upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id) else: - for data_product in gd_config.data_products: + if organization: + data_products = [dp for dp in gd_config.data_products if dp.id in organization.data_product_ids] + else: + data_products = gd_config.data_products + for data_product in data_products: logger.info(f"Process product name={data_product.name}") environments = gd_config.get_environment_workspaces(data_product.environment_setup_id) for environment in environments: if environment.id == args.gooddata_environment_id: workspace_id = f"{data_product.id}_{environment.id}" - if args.method == "store_analytics": - store_analytics(logger, sdk, workspace_id, data_product) + if args.method == "deploy_models": + workspace_title = f"{data_product.name} ({environment.name})" + # TODO - provision workspaces in a separate args.method? + # We will need to extend it by provisioning of child workspaces, ... + create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title) + deploy_ldm( + logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id + ) + if data_product.localization: + create_localized_workspaces(data_product, sdk_wrapper.sdk, workspace_id) + elif args.method == "store_analytics": + store_analytics(logger, sdk_wrapper.sdk, workspace_id, data_product) elif args.method == "deploy_analytics": - deploy_analytics(logger, sdk, args, workspace_id, data_product) + deploy_analytics(logger, sdk_wrapper, workspace_id, data_product) elif args.method == "test_insights": - test_insights(logger, sdk, workspace_id) + test_insights(logger, sdk_wrapper.sdk, workspace_id) else: raise Exception(f"Unsupported method requested in args: {args.method}") + +def main() -> None: + args = parse_arguments("gooddata-dbt plugin for models management and invalidating caches(upload notification)") + logger = get_logger("gooddata-dbt", args.debug) + logger.info("Start") + with open(args.gooddata_config) as fp: + gd_config = GoodDataConfig.from_dict(yaml.safe_load(fp)) + + if args.gooddata_profiles: + logger.info(f"Process multiple organizations profiles={args.gooddata_profiles}") + for organization in gd_config.organizations: + if organization.gooddata_profile in args.gooddata_profiles: + sdk_wrapper = GoodDataSdkWrapper(args, logger, profile=organization.gooddata_profile) + logger.info(f"Process organization profile={organization.gooddata_profile}") + process_organization(args, logger, sdk_wrapper, gd_config, organization) + else: + sdk_wrapper = GoodDataSdkWrapper(args, logger) + logger.info(f"Process single organization from env vars host={args.gooddata_host}") + process_organization(args, logger, sdk_wrapper, gd_config) + logger.info("End") diff --git a/gooddata-dbt/gooddata_dbt/gooddata/config.py b/gooddata-dbt/gooddata_dbt/gooddata/config.py index 1578da21f..c8a7e8935 100644 --- a/gooddata-dbt/gooddata_dbt/gooddata/config.py +++ b/gooddata-dbt/gooddata_dbt/gooddata/config.py @@ -40,10 +40,17 @@ class GoodDataConfigProduct(Base): localization: Optional[GoodDataConfigLocalization] = None +@attrs.define(auto_attribs=True, kw_only=True) +class GoodDataConfigOrganization(Base): + gooddata_profile: str + data_product_ids: List[str] = attr.field(default=list) + + @attrs.define(auto_attribs=True, kw_only=True) class GoodDataConfig(Base): environment_setups: List[GoodDataConfigEnvironmentSetup] data_products: List[GoodDataConfigProduct] + organizations: List[GoodDataConfigOrganization] @property def all_model_ids(self) -> List[str]: diff --git a/gooddata-dbt/gooddata_dbt/sdk_wrapper.py b/gooddata-dbt/gooddata_dbt/sdk_wrapper.py index 8d8986ca1..cd06113eb 100644 --- a/gooddata-dbt/gooddata_dbt/sdk_wrapper.py +++ b/gooddata-dbt/gooddata_dbt/sdk_wrapper.py @@ -8,41 +8,44 @@ class GoodDataSdkWrapper: # Timeout=600 because supporting waiting for All-in-one image starts - def __init__(self, args: argparse.Namespace, logger: Logger, timeout: int = 600) -> None: + def __init__( + self, args: argparse.Namespace, logger: Logger, profile: Optional[str] = None, timeout: int = 600 + ) -> None: self.args = args self.logger = logger self.timeout = timeout + self.profile = profile self.sdk = self.create_sdk() self.wait_for_gooddata_is_up(self.timeout) - @property - def host(self) -> str: - return self.args.gooddata_host - - @property - def token(self) -> str: - return self.args.gooddata_token - - @property - def override_host(self) -> Optional[str]: - return self.args.gooddata_override_host + def get_host_from_sdk(self) -> Optional[str]: + # TODO - make _hostname public in gooddata_sdk + return self.sdk.client._hostname def create_sdk(self) -> GoodDataSdk: - kwargs = {} - if self.override_host: - kwargs["Host"] = self.override_host - masked_token = f"{len(self.token[:-4]) * '#'}{self.token[-4:]}" - self.logger.info( - f"Connecting to GoodData host={self.host} token={masked_token} override_host={self.override_host}" - ) - sdk = GoodDataSdk.create(host_=self.host, token_=self.token, **kwargs) - return sdk + if self.profile: + self.logger.info(f"Connecting to GoodData using profile={self.profile}") + sdk = GoodDataSdk.create_from_profile(profile=self.profile) + return sdk + else: + host = self.args.gooddata_host + token = self.args.gooddata_token + override_host = self.args.gooddata_override_host + kwargs = {} + if override_host: + kwargs["Host"] = override_host + masked_token = f"{len(token[:-4]) * '#'}{token[-4:]}" + self.logger.info(f"Connecting to GoodData host={host} token={masked_token} override_host={override_host}") + sdk = GoodDataSdk.create(host_=host, token_=token, **kwargs) + return sdk def wait_for_gooddata_is_up(self, timeout: int) -> None: - # Wait for the GoodData.CN docker image to start up - self.logger.info(f"Waiting for {self.host} to be up") + # Wait for the GoodData.CN docker image to start up or prevent hiccups of cloud deployments + # We have to take hostname from sdk.client, because it can also be collected from profiles.yml file + host = self.get_host_from_sdk() + self.logger.info(f"Waiting for {host} to be up") self.sdk.support.wait_till_available(timeout=timeout) - self.logger.info(f"Host {self.host} is up") + self.logger.info(f"Host {host} is up") def pre_cache_insights(self, workspaces: Optional[List] = None) -> None: if not workspaces: diff --git a/gooddata-dbt/gooddata_example.yml b/gooddata-dbt/gooddata_example.yml index 5af0b0a4c..0ebe0dc44 100644 --- a/gooddata-dbt/gooddata_example.yml +++ b/gooddata-dbt/gooddata_example.yml @@ -19,9 +19,8 @@ data_products: - id: sales name: "Sales" environment_setup_id: default - model_id: github - - github - - faa + model_ids: + - salesforce localization: from_language: en to: @@ -29,3 +28,18 @@ data_products: language: fr - locale: zh-Hans language: "chinese (simplified)" + - id: marketing + name: "Marketing" + environment_setup_id: default + model_ids: + - hubspot + +# You can deliver data products to multiple organizations. Each organization can contain different (sub)set of products +organizations: + - gooddata_profile: local + data_product_ids: + - sales + - gooddata_profile: production + data_product_ids: + - sales + - marketing