Skip to content

Commit

Permalink
Merge pull request #387 from jaceksan/gartner
Browse files Browse the repository at this point in the history
TRIVIAL: gooddata-dbt - allow running against multiple organizations

Reviewed-by: Jan Kadlec
             https://github.com/hkad98
  • Loading branch information
gdgate authored Oct 12, 2023
2 parents c3772ac + eb7c7fd commit a103872
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 99 deletions.
9 changes: 9 additions & 0 deletions gooddata-dbt/gooddata_dbt/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def set_gooddata_endpoint_args(parser: argparse.ArgumentParser) -> None:
"When you connect to different hostname than where GoodData is running(proxies)",
default=os.getenv("GOODDATA_OVERRIDE_HOST"),
)
# Alternative - use profile.yml file
parser.add_argument(
"-gp",
"--gooddata-profiles",
nargs="*",
help="Profiles in profile.yml file. Overrides gooddata-host, gooddata-token and gooddata-override-host."
+ "You can use multiple profiles separated by space to deliver models/analytics to multiple organizations.",
default=os.getenv("GOODDATA_PROFILES", None),
)


def set_environment_id_arg(parser: argparse.ArgumentParser) -> None:
Expand Down
4 changes: 4 additions & 0 deletions gooddata-dbt/gooddata_dbt/dbt/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,7 @@ def target(self) -> DbtOutput:
if output.name == self.args.target:
return output
raise ValueError(f"Target {self.args.target} not found in {self.profile.outputs}.")

@property
def data_source_id(self) -> str:
return f"{self.args.profile}-{self.target.name}"
155 changes: 83 additions & 72 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from argparse import Namespace
from pathlib import Path
from time import time
from typing import List, Optional
from typing import Dict, List, Optional

import tabulate
import yaml
from gooddata_dbt.args import parse_arguments
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
from gooddata_dbt.dbt.profiles import DbtOutput, DbtProfiles
from gooddata_dbt.dbt.tables import DbtModelTables
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigProduct
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct
from gooddata_dbt.logger import get_logger
from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper
from gooddata_dbt.utils import report_message_to_merge_request
Expand All @@ -28,9 +28,6 @@

GOODDATA_LAYOUTS_DIR = Path("gooddata_layouts")

# TODO
# Tests, ...


def layout_model_path(data_product: GoodDataConfigProduct) -> Path:
return GOODDATA_LAYOUTS_DIR / data_product.id
Expand Down Expand Up @@ -80,18 +77,37 @@ def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str
sdk.catalog_workspace.create_or_update(workspace=workspace)


DATA_SOURCE_CONTAINER: Dict[str, DbtModelTables] = {}


def deploy_ldm(
logger: logging.Logger,
sdk: GoodDataSdk,
args: Namespace,
data_source_id: str,
dbt_tables: DbtModelTables,
all_model_ids: List[str],
sdk_wrapper: GoodDataSdkWrapper,
model_ids: Optional[List[str]],
workspace_id: str,
) -> None:
global DATA_SOURCE_CONTAINER
logger.info("Generate and put LDM")
generate_and_put_ldm(sdk, data_source_id, workspace_id, dbt_tables, model_ids)
workspace_url = f"{args.gooddata_host}/modeler/#/{workspace_id}"
dbt_profiles = DbtProfiles(args)
dbt_target = dbt_profiles.target
data_source_id = dbt_profiles.data_source_id
# Parse dbt models only once and scan data source only once, not for each product/environment
dbt_tables = DATA_SOURCE_CONTAINER.get(data_source_id)
if dbt_tables is None:
logger.info(f"Process data source {data_source_id=}")
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
if args.gooddata_upper_case:
dbt_target.schema = dbt_target.schema.upper()
dbt_target.database = dbt_target.database.upper()
register_data_source(logger, sdk_wrapper.sdk, data_source_id, dbt_target, dbt_tables)
DATA_SOURCE_CONTAINER[data_source_id] = dbt_tables
else:
logger.info(f"Data source already processed {data_source_id=} table_count={len(dbt_tables.tables)}")

generate_and_put_ldm(sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids)
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}"
logger.info(f"LDM successfully loaded, verify here: {workspace_url}")


Expand All @@ -101,18 +117,21 @@ def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id


def deploy_analytics(
logger: logging.Logger, sdk: GoodDataSdk, args: Namespace, workspace_id: str, data_product: GoodDataConfigProduct
logger: logging.Logger,
sdk_wrapper: GoodDataSdkWrapper,
workspace_id: str,
data_product: GoodDataConfigProduct,
) -> None:
logger.info(f"Deploy analytics {workspace_id=}")

logger.info("Read analytics model from disk")
adm = sdk.catalog_workspace_content.load_analytics_model_from_disk(layout_model_path(data_product))
adm = sdk_wrapper.sdk.catalog_workspace_content.load_analytics_model_from_disk(layout_model_path(data_product))

# Deploy analytics model into target workspace
logger.info("Load analytics model into GoodData")
sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)
sdk_wrapper.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)

workspace_url = f"{args.gooddata_host}/dashboards/#/workspace/{workspace_id}"
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/dashboards/#/workspace/{workspace_id}"
logger.info(f"Analytics successfully loaded, verify here: {workspace_url}")


Expand Down Expand Up @@ -166,34 +185,6 @@ def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDa
)


def deploy_models(
gooddata_upper_case: bool,
gooddata_environment_id: str,
logger: logging.Logger,
dbt_target: DbtOutput,
gd_config: GoodDataConfig,
sdk: GoodDataSdk,
args: Namespace,
data_source_id: str,
) -> None:
dbt_tables = DbtModelTables.from_local(gooddata_upper_case, gd_config.all_model_ids)
if gooddata_upper_case:
dbt_target.schema = dbt_target.schema.upper()
dbt_target.database = dbt_target.database.upper()
register_data_source(logger, sdk, data_source_id, dbt_target, dbt_tables)
for data_product in gd_config.data_products:
logger.info(f"Process product name={data_product.name}")
environments = gd_config.get_environment_workspaces(data_product.environment_setup_id)
for environment in environments:
if environment.id == gooddata_environment_id:
workspace_id = f"{data_product.id}_{environment.id}"
workspace_title = f"{data_product.name} ({environment.name})"
create_workspace(logger, sdk, workspace_id, workspace_title)
deploy_ldm(logger, sdk, args, data_source_id, dbt_tables, data_product.model_ids, workspace_id)
if data_product.localization:
create_localized_workspaces(data_product, sdk, workspace_id)


def get_table(data: List[list], headers: List[str], fmt: str) -> str:
return tabulate.tabulate(data, headers=headers, tablefmt=fmt)

Expand Down Expand Up @@ -296,49 +287,69 @@ def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[s
dbt_cloud_stats(args, logger, all_model_ids, environment_id)


def main() -> None:
args = parse_arguments("gooddata-dbt plugin for models management and invalidating caches(upload notification)")
logger = get_logger("gooddata-dbt", args.debug)
logger.info("Start")
sdk = GoodDataSdkWrapper(args, logger).sdk
with open(args.gooddata_config) as fp:
gd_config = GoodDataConfig.from_dict(yaml.safe_load(fp))

def process_organization(
args: Namespace,
logger: logging.Logger,
sdk_wrapper: GoodDataSdkWrapper,
gd_config: GoodDataConfig,
organization: Optional[GoodDataConfigOrganization] = None,
) -> None:
if args.method == "dbt_cloud_run":
dbt_cloud_run(args, logger, gd_config.all_model_ids)
elif args.method == "dbt_cloud_stats":
dbt_cloud_stats(args, logger, gd_config.all_model_ids, args.environment_id)
elif args.method in ["upload_notification", "deploy_models"]:
dbt_target = DbtProfiles(args).target
data_source_id = f"{args.profile}-{dbt_target.name}"
if args.method == "upload_notification":
# Caches are invalidated only per data source, not per data product
upload_notification(logger, sdk, data_source_id)
else:
deploy_models(
args.gooddata_upper_case,
args.gooddata_environment_id,
logger,
dbt_target,
gd_config,
sdk,
args,
data_source_id,
)
elif args.method == "upload_notification":
dbt_profiles = DbtProfiles(args)
# Caches are invalidated only per data source, not per data product
upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id)
else:
for data_product in gd_config.data_products:
if organization:
data_products = [dp for dp in gd_config.data_products if dp.id in organization.data_product_ids]
else:
data_products = gd_config.data_products
for data_product in data_products:
logger.info(f"Process product name={data_product.name}")
environments = gd_config.get_environment_workspaces(data_product.environment_setup_id)
for environment in environments:
if environment.id == args.gooddata_environment_id:
workspace_id = f"{data_product.id}_{environment.id}"
if args.method == "store_analytics":
store_analytics(logger, sdk, workspace_id, data_product)
if args.method == "deploy_models":
workspace_title = f"{data_product.name} ({environment.name})"
# TODO - provision workspaces in a separate args.method?
# We will need to extend it by provisioning of child workspaces, ...
create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title)
deploy_ldm(
logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id
)
if data_product.localization:
create_localized_workspaces(data_product, sdk_wrapper.sdk, workspace_id)
elif args.method == "store_analytics":
store_analytics(logger, sdk_wrapper.sdk, workspace_id, data_product)
elif args.method == "deploy_analytics":
deploy_analytics(logger, sdk, args, workspace_id, data_product)
deploy_analytics(logger, sdk_wrapper, workspace_id, data_product)
elif args.method == "test_insights":
test_insights(logger, sdk, workspace_id)
test_insights(logger, sdk_wrapper.sdk, workspace_id)
else:
raise Exception(f"Unsupported method requested in args: {args.method}")


def main() -> None:
args = parse_arguments("gooddata-dbt plugin for models management and invalidating caches(upload notification)")
logger = get_logger("gooddata-dbt", args.debug)
logger.info("Start")
with open(args.gooddata_config) as fp:
gd_config = GoodDataConfig.from_dict(yaml.safe_load(fp))

if args.gooddata_profiles:
logger.info(f"Process multiple organizations profiles={args.gooddata_profiles}")
for organization in gd_config.organizations:
if organization.gooddata_profile in args.gooddata_profiles:
sdk_wrapper = GoodDataSdkWrapper(args, logger, profile=organization.gooddata_profile)
logger.info(f"Process organization profile={organization.gooddata_profile}")
process_organization(args, logger, sdk_wrapper, gd_config, organization)
else:
sdk_wrapper = GoodDataSdkWrapper(args, logger)
logger.info(f"Process single organization from env vars host={args.gooddata_host}")
process_organization(args, logger, sdk_wrapper, gd_config)

logger.info("End")
7 changes: 7 additions & 0 deletions gooddata-dbt/gooddata_dbt/gooddata/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ class GoodDataConfigProduct(Base):
localization: Optional[GoodDataConfigLocalization] = None


@attrs.define(auto_attribs=True, kw_only=True)
class GoodDataConfigOrganization(Base):
gooddata_profile: str
data_product_ids: List[str] = attr.field(default=list)


@attrs.define(auto_attribs=True, kw_only=True)
class GoodDataConfig(Base):
environment_setups: List[GoodDataConfigEnvironmentSetup]
data_products: List[GoodDataConfigProduct]
organizations: List[GoodDataConfigOrganization]

@property
def all_model_ids(self) -> List[str]:
Expand Down
51 changes: 27 additions & 24 deletions gooddata-dbt/gooddata_dbt/sdk_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,44 @@

class GoodDataSdkWrapper:
# Timeout=600 because supporting waiting for All-in-one image starts
def __init__(self, args: argparse.Namespace, logger: Logger, timeout: int = 600) -> None:
def __init__(
self, args: argparse.Namespace, logger: Logger, profile: Optional[str] = None, timeout: int = 600
) -> None:
self.args = args
self.logger = logger
self.timeout = timeout
self.profile = profile
self.sdk = self.create_sdk()
self.wait_for_gooddata_is_up(self.timeout)

@property
def host(self) -> str:
return self.args.gooddata_host

@property
def token(self) -> str:
return self.args.gooddata_token

@property
def override_host(self) -> Optional[str]:
return self.args.gooddata_override_host
def get_host_from_sdk(self) -> Optional[str]:
# TODO - make _hostname public in gooddata_sdk
return self.sdk.client._hostname

def create_sdk(self) -> GoodDataSdk:
kwargs = {}
if self.override_host:
kwargs["Host"] = self.override_host
masked_token = f"{len(self.token[:-4]) * '#'}{self.token[-4:]}"
self.logger.info(
f"Connecting to GoodData host={self.host} token={masked_token} override_host={self.override_host}"
)
sdk = GoodDataSdk.create(host_=self.host, token_=self.token, **kwargs)
return sdk
if self.profile:
self.logger.info(f"Connecting to GoodData using profile={self.profile}")
sdk = GoodDataSdk.create_from_profile(profile=self.profile)
return sdk
else:
host = self.args.gooddata_host
token = self.args.gooddata_token
override_host = self.args.gooddata_override_host
kwargs = {}
if override_host:
kwargs["Host"] = override_host
masked_token = f"{len(token[:-4]) * '#'}{token[-4:]}"
self.logger.info(f"Connecting to GoodData host={host} token={masked_token} override_host={override_host}")
sdk = GoodDataSdk.create(host_=host, token_=token, **kwargs)
return sdk

def wait_for_gooddata_is_up(self, timeout: int) -> None:
# Wait for the GoodData.CN docker image to start up
self.logger.info(f"Waiting for {self.host} to be up")
# Wait for the GoodData.CN docker image to start up or prevent hiccups of cloud deployments
# We have to take hostname from sdk.client, because it can also be collected from profiles.yml file
host = self.get_host_from_sdk()
self.logger.info(f"Waiting for {host} to be up")
self.sdk.support.wait_till_available(timeout=timeout)
self.logger.info(f"Host {self.host} is up")
self.logger.info(f"Host {host} is up")

def pre_cache_insights(self, workspaces: Optional[List] = None) -> None:
if not workspaces:
Expand Down
20 changes: 17 additions & 3 deletions gooddata-dbt/gooddata_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,27 @@ data_products:
- id: sales
name: "Sales"
environment_setup_id: default
model_id: github
- github
- faa
model_ids:
- salesforce
localization:
from_language: en
to:
- locale: fr-FR
language: fr
- locale: zh-Hans
language: "chinese (simplified)"
- id: marketing
name: "Marketing"
environment_setup_id: default
model_ids:
- hubspot

# You can deliver data products to multiple organizations. Each organization can contain different (sub)set of products
organizations:
- gooddata_profile: local
data_product_ids:
- sales
- gooddata_profile: production
data_product_ids:
- sales
- marketing

0 comments on commit a103872

Please sign in to comment.