Skip to content

Commit

Permalink
Merge pull request gooddata#416 from jaceksan/gartner
Browse files Browse the repository at this point in the history
TRIVIAL: gooddata-dbt - remove PDM

Reviewed-by: Jan Kadlec
             https://github.com/hkad98
  • Loading branch information
gdgate authored Nov 13, 2023
2 parents c302646 + 5503a44 commit ebd05da
Show file tree
Hide file tree
Showing 33 changed files with 923 additions and 261 deletions.
20 changes: 15 additions & 5 deletions gooddata-dbt/gooddata_dbt/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,21 @@ def parse_arguments(description: str) -> argparse.Namespace:
set_gooddata_upper_case_args(dbt_cloud_stats)
dbt_cloud_stats.set_defaults(method="dbt_cloud_stats")

deploy_models = subparsers.add_parser("deploy_models")
set_dbt_args(deploy_models)
set_environment_id_arg(deploy_models)
set_gooddata_upper_case_args(deploy_models)
deploy_models.set_defaults(method="deploy_models")
provision_workspaces = subparsers.add_parser("provision_workspaces")
set_environment_id_arg(provision_workspaces)
provision_workspaces.set_defaults(method="provision_workspaces")

register_data_sources = subparsers.add_parser("register_data_sources")
set_dbt_args(register_data_sources)
set_environment_id_arg(register_data_sources)
set_gooddata_upper_case_args(register_data_sources)
register_data_sources.set_defaults(method="register_data_sources")

deploy_ldm = subparsers.add_parser("deploy_ldm")
set_dbt_args(deploy_ldm)
set_environment_id_arg(deploy_ldm)
set_gooddata_upper_case_args(deploy_ldm)
deploy_ldm.set_defaults(method="deploy_ldm")

upload_notification = subparsers.add_parser("upload_notification")
set_dbt_args(upload_notification)
Expand Down
3 changes: 3 additions & 0 deletions gooddata-dbt/gooddata_dbt/dbt/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ def profile(self) -> DbtProfile:
def target(self) -> DbtOutput:
for output in self.profile.outputs:
if output.name == self.args.target:
if self.args.gooddata_upper_case:
output.schema = output.schema.upper()
output.database = output.database.upper()
return output
raise ValueError(f"Target {self.args.target} not found in {self.profile.outputs}.")

Expand Down
29 changes: 6 additions & 23 deletions gooddata-dbt/gooddata_dbt/dbt/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,28 +257,6 @@ def get_scan_column(table: CatalogDeclarativeTable, column_name: str) -> Catalog
scan_columns = [s.name for s in table.columns]
raise Exception(f"get_scan_column table={table.id} column={column_name} not found in scan. {scan_columns=}")

def make_pdm(self, scan_pdm: CatalogDeclarativeTables) -> Dict:
self.set_data_types(scan_pdm)
tables = []
for table in self.tables:
scan_table = self.get_scan_table(scan_pdm, table.name)
columns = []
for column in table.columns.values():
# dbt does not propagate data types to manifest (not yet?)
scan_column = self.get_scan_column(scan_table, column.name)
column.data_type = column.data_type or scan_column.data_type

columns.append({"name": column.name, "data_type": column.data_type})
tables.append(
{
"id": table.name,
"path": [self.schema_name, table.name],
"type": "TABLE",
"columns": columns,
}
)
return {"tables": tables}

@staticmethod
def get_ldm_title(column: DbtModelColumn) -> str:
return column.description or column.name
Expand Down Expand Up @@ -351,6 +329,7 @@ def make_references(self, table: DbtModelTable, role_playing_tables: Dict) -> Li
"identifier": {"id": referenced_object_id, "type": "dataset"},
"multivalue": False,
"source_columns": [column.name],
"source_column_data_types": [column.data_type],
}
)
return references
Expand All @@ -367,6 +346,7 @@ def make_facts(table: DbtModelTable) -> List[Dict]:
"title": column.gooddata_ldm_title,
"description": column.gooddata_ldm_description,
"source_column": column.name,
"source_column_data_type": column.data_type,
"tags": [table.gooddata_ldm_title] + column.tags,
}
)
Expand All @@ -383,6 +363,7 @@ def make_labels(table: DbtModelTable, attribute_column: DbtModelColumn) -> List[
"title": column.gooddata_ldm_title,
"description": column.gooddata_ldm_description,
"source_column": column.name,
"source_column_data_type": column.data_type,
"value_type": column.meta.gooddata.label_type,
"tags": [table.gooddata_ldm_title] + column.tags,
}
Expand All @@ -400,6 +381,7 @@ def make_attributes(self, table: DbtModelTable) -> List[Dict]:
"title": column.gooddata_ldm_title,
"description": column.gooddata_ldm_description,
"source_column": column.name,
"source_column_data_type": column.data_type,
"tags": [table.gooddata_ldm_title] + column.tags,
"labels": self.make_labels(table, column),
}
Expand Down Expand Up @@ -444,7 +426,8 @@ def make_dataset(self, data_source_id: str, table: DbtModelTable, role_playing_t
"tags": [table.gooddata_ldm_title] + table.tags,
"data_source_table_id": {
"data_source_id": data_source_id,
"id": table.name, # TODO - may not be unique
"id": f"{self.schema_name}__{table.name}",
"path": [self.schema_name, table.name],
"type": "dataSource",
},
"grain": grain,
Expand Down
174 changes: 103 additions & 71 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
# (C) 2023 GoodData Corporation
import asyncio
import logging
import os
import sys
from argparse import Namespace
from asyncio import Semaphore
from pathlib import Path
from time import time
from typing import Dict, List, Optional
from typing import List, Optional

import tabulate
import yaml
from gooddata_dbt.args import parse_arguments
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
from gooddata_dbt.dbt.profiles import DbtOutput, DbtProfiles
from gooddata_dbt.dbt.profiles import DbtProfiles
from gooddata_dbt.dbt.tables import DbtModelTables
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct
from gooddata_dbt.logger import get_logger
from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper
from gooddata_dbt.utils import report_message_to_merge_request
from gooddata_dbt.utils import get_duration, report_message_to_merge_request

from gooddata_sdk import (
CatalogDeclarativeModel,
CatalogDeclarativeTables,
CatalogScanModelRequest,
CatalogWorkspace,
GoodDataSdk,
)
from gooddata_sdk import CatalogDeclarativeModel, CatalogScanModelRequest, CatalogWorkspace, GoodDataSdk, Insight

# TODO - upgrade AIO, cleanup, start from scratch, test everything

GOODDATA_LAYOUTS_DIR = Path("gooddata_layouts")

Expand All @@ -33,53 +30,34 @@ def layout_model_path(data_product: GoodDataConfigProduct) -> Path:
return GOODDATA_LAYOUTS_DIR / data_product.id


def generate_and_put_pdm(
logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str, dbt_tables: DbtModelTables
def generate_and_put_ldm(
logger: logging.Logger,
sdk: GoodDataSdk,
data_source_id: str,
workspace_id: str,
dbt_tables: DbtModelTables,
model_ids: Optional[List[str]],
) -> None:
# Construct GoodData PDM from dbt models and put it to the server
# GoodData caches the metadata to reduce querying them (costly) in runtime.
scan_request = CatalogScanModelRequest(scan_tables=True, scan_views=True)
logger.info(f"Scan data source {data_source_id=}")
scan_pdm = sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm

logger.info(f"Generate and put PDM {data_source_id=}")
pdm = dbt_tables.make_pdm(scan_pdm)
declarative_tables = CatalogDeclarativeTables.from_dict(pdm, camel_case=False)
sdk.catalog_data_source.put_declarative_pdm(data_source_id, declarative_tables)


def generate_and_put_ldm(
sdk: GoodDataSdk, data_source_id: str, workspace_id: str, dbt_tables: DbtModelTables, model_ids: Optional[List[str]]
) -> None:
scan_pdm.store_to_disk(Path("test"))
# Store data types to dbt_tables class. It is used in make_declarative_datasets to inject data types to LDM.
dbt_tables.set_data_types(scan_pdm)
# Construct GoodData LDM from dbt models
declarative_datasets = dbt_tables.make_declarative_datasets(data_source_id, model_ids)
ldm = CatalogDeclarativeModel.from_dict({"ldm": declarative_datasets}, camel_case=False)

# Deploy logical into target workspace
sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, ldm)


def register_data_source(
logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str, dbt_target: DbtOutput, dbt_tables: DbtModelTables
) -> None:
logger.info(f"Register data source {data_source_id=} schema={dbt_tables.schema_name}")
data_source = dbt_target.to_gooddata(data_source_id, dbt_tables.schema_name)
sdk.catalog_data_source.create_or_update_data_source(data_source)

logger.info("Generate and put PDM")
generate_and_put_pdm(logger, sdk, data_source_id, dbt_tables)


def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, workspace_title: str) -> None:
logger.info(f"Create workspace {workspace_id=} {workspace_title=}")
# Create workspaces, if they do not exist yet, otherwise update them
workspace = CatalogWorkspace(workspace_id=workspace_id, name=workspace_title)
sdk.catalog_workspace.create_or_update(workspace=workspace)


DATA_SOURCE_CONTAINER: Dict[str, DbtModelTables] = {}


def deploy_ldm(
logger: logging.Logger,
args: Namespace,
Expand All @@ -88,29 +66,32 @@ def deploy_ldm(
model_ids: Optional[List[str]],
workspace_id: str,
) -> None:
global DATA_SOURCE_CONTAINER
logger.info("Generate and put LDM")
dbt_profiles = DbtProfiles(args)
dbt_target = dbt_profiles.target
data_source_id = dbt_profiles.data_source_id
# Parse dbt models only once and scan data source only once, not for each product/environment
dbt_tables = DATA_SOURCE_CONTAINER.get(data_source_id)
if dbt_tables is None:
logger.info(f"Process data source {data_source_id=}")
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
if args.gooddata_upper_case:
dbt_target.schema = dbt_target.schema.upper()
dbt_target.database = dbt_target.database.upper()
register_data_source(logger, sdk_wrapper.sdk, data_source_id, dbt_target, dbt_tables)
DATA_SOURCE_CONTAINER[data_source_id] = dbt_tables
else:
logger.info(f"Data source already processed {data_source_id=} table_count={len(dbt_tables.tables)}")

generate_and_put_ldm(sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids)
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
generate_and_put_ldm(logger, sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids)
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}"
logger.info(f"LDM successfully loaded, verify here: {workspace_url}")


def register_data_source(
logger: logging.Logger,
args: Namespace,
all_model_ids: List[str],
sdk_wrapper: GoodDataSdkWrapper,
) -> None:
dbt_profiles = DbtProfiles(args)
dbt_target = dbt_profiles.target
data_source_id = dbt_profiles.data_source_id
logger.info(f"Process data source {data_source_id=}")
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)

logger.info(f"Register data source {data_source_id=} schema={dbt_tables.schema_name}")
data_source = dbt_target.to_gooddata(data_source_id, dbt_tables.schema_name)
sdk_wrapper.sdk.catalog_data_source.create_or_update_data_source(data_source)


def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str) -> None:
logger.info(f"Upload notification {data_source_id=}")
sdk.catalog_data_source.register_upload_notification(data_source_id)
Expand Down Expand Up @@ -149,22 +130,69 @@ def store_analytics(
)


def test_insights(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, skip_tests: Optional[List[str]]) -> None:
async def execute_insight(sdk: GoodDataSdk, workspace_id: str, insight: Insight) -> None:
sdk.tables.for_insight(workspace_id, insight)


async def test_insight(
logger: logging.Logger,
sdk: GoodDataSdk,
workspace_id: str,
insight: Insight,
) -> dict:
logger.info(f"Executing insight {insight.id=} {insight.title=} ...")
start = time()
try:
await execute_insight(sdk, workspace_id, insight)
duration = get_duration(start)
logger.info(f"Test successful {insight.id=} {insight.title=} duration={duration}(ms)")
return {"id": insight.id, "title": insight.title, "duration": duration, "status": "success"}
except Exception as e:
duration = get_duration(start)
logger.error(f"Test failed {insight.id=} {insight.title=} duration={duration}(ms) reason={str(e)}")
return {"id": insight.id, "title": insight.title, "duration": duration, "status": "failed", "reason": str(e)}


async def safe_test_insight(
logger: logging.Logger,
sdk: GoodDataSdk,
workspace_id: str,
insight: Insight,
semaphore: Semaphore,
) -> dict:
async with semaphore: # semaphore limits num of simultaneous executions
return await test_insight(
logger,
sdk,
workspace_id,
insight,
)


async def test_insights(
logger: logging.Logger,
sdk: GoodDataSdk,
workspace_id: str,
skip_tests: Optional[List[str]],
test_insights_parallelism: int = 1,
) -> None:
start = time()
logger.info(f"Test insights {workspace_id=}")
insights = sdk.insights.get_insights(workspace_id)

semaphore = asyncio.Semaphore(test_insights_parallelism)
tasks = []
for insight in insights:
logger.info(f"Executing insight {insight.id=} {insight.title=} ...")
if skip_tests is not None and insight.id in skip_tests:
logger.info(f"Skip test insight={insight.title} (requested in gooddata.yaml)")
else:
try:
start = time()
sdk.tables.for_insight(workspace_id, insight)
duration = int((time() - start) * 1000)
logger.info(f"Test successful {insight.id=} {insight.title=} duration={duration}(ms)")
except RuntimeError:
sys.exit()
tasks.append(safe_test_insight(logger, sdk, workspace_id, insight, semaphore))
results = await asyncio.gather(*tasks)
duration = get_duration(start)
errors = [result for result in results if result["status"] == "failed"]
if len(errors) > 0:
raise Exception(f"Test insights failed {workspace_id=} {duration=}(ms) {errors=}")
else:
logger.info(f"Test insights finished {workspace_id=} {duration=}(ms)")


def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDataSdk, workspace_id: str) -> None:
Expand Down Expand Up @@ -302,6 +330,8 @@ def process_organization(
dbt_profiles = DbtProfiles(args)
# Caches are invalidated only per data source, not per data product
upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id)
elif args.method == "register_data_sources":
register_data_source(logger, args, gd_config.all_model_ids, sdk_wrapper)
else:
if organization:
data_products = [dp for dp in gd_config.data_products if dp.id in organization.data_product_ids]
Expand All @@ -313,11 +343,10 @@ def process_organization(
for environment in environments:
if environment.id == args.gooddata_environment_id:
workspace_id = f"{data_product.id}_{environment.id}"
if args.method == "deploy_models":
workspace_title = f"{data_product.name} ({environment.name})"
# TODO - provision workspaces in a separate args.method?
# We will need to extend it by provisioning of child workspaces, ...
workspace_title = f"{data_product.name} ({environment.name})"
if args.method == "provision_workspaces":
create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title)
elif args.method == "deploy_ldm":
deploy_ldm(
logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id
)
Expand All @@ -328,7 +357,10 @@ def process_organization(
elif args.method == "deploy_analytics":
deploy_analytics(logger, sdk_wrapper, workspace_id, data_product)
elif args.method == "test_insights":
test_insights(logger, sdk_wrapper.sdk, workspace_id, data_product.skip_tests)
parallelism = gd_config.global_properties.test_insights_parallelism or 1
asyncio.run(
test_insights(logger, sdk_wrapper.sdk, workspace_id, data_product.skip_tests, parallelism)
)
else:
raise Exception(f"Unsupported method requested in args: {args.method}")

Expand Down
6 changes: 6 additions & 0 deletions gooddata-dbt/gooddata_dbt/gooddata/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ class GoodDataConfigOrganization(Base):
data_product_ids: List[str] = attr.field(default=list)


@attrs.define(auto_attribs=True, kw_only=True)
class GoodDataGlobalConfig(Base):
test_insights_parallelism: Optional[int] = 1


@attrs.define(auto_attribs=True, kw_only=True)
class GoodDataConfig(Base):
environment_setups: List[GoodDataConfigEnvironmentSetup]
data_products: List[GoodDataConfigProduct]
organizations: List[GoodDataConfigOrganization]
global_properties: GoodDataGlobalConfig

@property
def all_model_ids(self) -> List[str]:
Expand Down
Loading

0 comments on commit ebd05da

Please sign in to comment.