Skip to content

Commit

Permalink
Merge pull request gooddata#441 from jaceksan/working
Browse files Browse the repository at this point in the history
TRIVIAL: gooddata-dbt - enable --dry-run for pre-merge pipelines

Reviewed-by: Jan Kadlec
             https://github.com/hkad98
  • Loading branch information
gdgate authored Nov 16, 2023
2 parents e65d7ca + c8a6bd4 commit 6249c53
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 39 deletions.
2 changes: 2 additions & 0 deletions gooddata-dbt/gooddata_dbt/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def set_dbt_cloud_stats_args(parser: argparse.ArgumentParser) -> None:
def parse_arguments(description: str) -> argparse.Namespace:
parser = get_parser(description)
parser.add_argument("--debug", action="store_true", default=False, help="Increase logging level to DEBUG")
parser.add_argument("--dry-run", action="store_true", default=False, help="Do not call GoodData APIs")
set_gooddata_endpoint_args(parser)

subparsers = parser.add_subparsers(help="actions")
Expand Down Expand Up @@ -175,6 +176,7 @@ def parse_arguments(description: str) -> argparse.Namespace:

upload_notification = subparsers.add_parser("upload_notification")
set_dbt_args(upload_notification)
set_gooddata_upper_case_args(upload_notification)
upload_notification.set_defaults(method="upload_notification")

deploy_analytics = subparsers.add_parser("deploy_analytics")
Expand Down
21 changes: 13 additions & 8 deletions gooddata-dbt/gooddata_dbt/dbt/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,19 @@ def read_dbt_models(dbt_catalog: Dict, upper_case: bool, all_model_ids: List[str
column.meta.gooddata.upper_case_names()
return tables

def set_data_types(self, scan_pdm: CatalogDeclarativeTables) -> None:
for table in self.tables:
scan_table = self.get_scan_table(scan_pdm, table.name)
for column in table.columns.values():
# dbt does not provide data types in manifest.json
# get it from GoodData scan API
scan_column = self.get_scan_column(scan_table, column.name)
column.data_type = scan_column.data_type
def set_data_types(self, scan_pdm: CatalogDeclarativeTables, dry_run: bool = False) -> None:
if dry_run:
for table in self.tables:
for column in table.columns.values():
column.data_type = "STRING"
else:
for table in self.tables:
scan_table = self.get_scan_table(scan_pdm, table.name)
for column in table.columns.values():
# dbt does not provide data types in manifest.json
# get it from GoodData scan API
scan_column = self.get_scan_column(scan_table, column.name)
column.data_type = scan_column.data_type

@property
def schema_name(self) -> str:
Expand Down
69 changes: 39 additions & 30 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
from gooddata_dbt.dbt.profiles import DbtProfiles
from gooddata_dbt.dbt.tables import DbtModelTables
from gooddata_dbt.gooddata.api_wrapper import GoodDataApiWrapper
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct
from gooddata_dbt.logger import get_logger
from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper
Expand All @@ -32,30 +33,35 @@ def layout_model_path(data_product: GoodDataConfigProduct) -> Path:

def generate_and_put_ldm(
logger: logging.Logger,
sdk: GoodDataSdk,
sdk_wrapper: GoodDataSdkWrapper,
data_source_id: str,
workspace_id: str,
dbt_tables: DbtModelTables,
model_ids: Optional[List[str]],
) -> None:
scan_request = CatalogScanModelRequest(scan_tables=True, scan_views=True)
logger.info(f"Scan data source {data_source_id=}")
scan_pdm = sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm
scan_pdm = sdk_wrapper.sdk_facade.scan_data_source(data_source_id, scan_request)
scan_pdm.store_to_disk(Path("test"))
# Store data types to dbt_tables class. It is used in make_declarative_datasets to inject data types to LDM.
dbt_tables.set_data_types(scan_pdm)
dbt_tables.set_data_types(scan_pdm, sdk_wrapper.sdk_facade.dry_run)
# Construct GoodData LDM from dbt models
declarative_datasets = dbt_tables.make_declarative_datasets(data_source_id, model_ids)
ldm = CatalogDeclarativeModel.from_dict({"ldm": declarative_datasets}, camel_case=False)
# Deploy logical into target workspace
sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, ldm)
sdk_wrapper.sdk_facade.put_declarative_ldm(workspace_id, ldm)


def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, workspace_title: str) -> None:
def create_workspace(
logger: logging.Logger,
sdk_wrapper: GoodDataSdkWrapper,
workspace_id: str,
workspace_title: str,
) -> None:
logger.info(f"Create workspace {workspace_id=} {workspace_title=}")
# Create workspaces, if they do not exist yet, otherwise update them
workspace = CatalogWorkspace(workspace_id=workspace_id, name=workspace_title)
sdk.catalog_workspace.create_or_update(workspace=workspace)
sdk_wrapper.sdk_facade.create_workspace(workspace=workspace)


def deploy_ldm(
Expand All @@ -70,7 +76,7 @@ def deploy_ldm(
dbt_profiles = DbtProfiles(args)
data_source_id = dbt_profiles.data_source_id
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
generate_and_put_ldm(logger, sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids)
generate_and_put_ldm(logger, sdk_wrapper, data_source_id, workspace_id, dbt_tables, model_ids)
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}"
logger.info(f"LDM successfully loaded, verify here: {workspace_url}")

Expand All @@ -89,12 +95,12 @@ def register_data_source(

logger.info(f"Register data source {data_source_id=} schema={dbt_tables.schema_name}")
data_source = dbt_target.to_gooddata(data_source_id, dbt_tables.schema_name)
sdk_wrapper.sdk.catalog_data_source.create_or_update_data_source(data_source)
sdk_wrapper.sdk_facade.create_or_update_data_source(data_source)


def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str) -> None:
def upload_notification(logger: logging.Logger, sdk_wrapper: GoodDataSdkWrapper, data_source_id: str) -> None:
logger.info(f"Upload notification {data_source_id=}")
sdk.catalog_data_source.register_upload_notification(data_source_id)
sdk_wrapper.sdk_facade.register_upload_notification(data_source_id)


def deploy_analytics(
Expand All @@ -110,7 +116,7 @@ def deploy_analytics(

# Deploy analytics model into target workspace
logger.info("Load analytics model into GoodData")
sdk_wrapper.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)
sdk_wrapper.sdk_facade.put_declarative_analytics_model(workspace_id, adm)

workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/dashboards/#/workspace/{workspace_id}"
logger.info(f"Analytics successfully loaded, verify here: {workspace_url}")
Expand All @@ -130,20 +136,20 @@ def store_analytics(
)


async def execute_insight(sdk: GoodDataSdk, workspace_id: str, insight: Insight) -> None:
sdk.tables.for_insight(workspace_id, insight)
async def execute_insight(sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, insight: Insight) -> None:
sdk_wrapper.sdk_facade.execute_insight(workspace_id, insight)


async def test_insight(
logger: logging.Logger,
sdk: GoodDataSdk,
sdk_wrapper: GoodDataSdkWrapper,
workspace_id: str,
insight: Insight,
) -> dict:
logger.info(f"Executing insight {insight.id=} {insight.title=} ...")
start = time()
try:
await execute_insight(sdk, workspace_id, insight)
await execute_insight(sdk_wrapper, workspace_id, insight)
duration = get_duration(start)
logger.info(f"Test successful {insight.id=} {insight.title=} duration={duration}(ms)")
return {"id": insight.id, "title": insight.title, "duration": duration, "status": "success"}
Expand All @@ -155,37 +161,37 @@ async def test_insight(

async def safe_test_insight(
logger: logging.Logger,
sdk: GoodDataSdk,
sdk_wrapper: GoodDataSdkWrapper,
workspace_id: str,
insight: Insight,
semaphore: Semaphore,
) -> dict:
async with semaphore: # semaphore limits num of simultaneous executions
return await test_insight(
logger,
sdk,
sdk_wrapper,
workspace_id,
insight,
)


async def test_insights(
logger: logging.Logger,
sdk: GoodDataSdk,
sdk_wrapper: GoodDataSdkWrapper,
workspace_id: str,
skip_tests: Optional[List[str]],
test_insights_parallelism: int = 1,
) -> None:
start = time()
logger.info(f"Test insights {workspace_id=}")
insights = sdk.insights.get_insights(workspace_id)
insights = sdk_wrapper.sdk_facade.get_insights(workspace_id)
semaphore = asyncio.Semaphore(test_insights_parallelism)
tasks = []
for insight in insights:
if skip_tests is not None and insight.id in skip_tests:
logger.info(f"Skip test insight={insight.title} (requested in gooddata.yaml)")
else:
tasks.append(safe_test_insight(logger, sdk, workspace_id, insight, semaphore))
tasks.append(safe_test_insight(logger, sdk_wrapper, workspace_id, insight, semaphore))
results = await asyncio.gather(*tasks)
duration = get_duration(start)
errors = [result for result in results if result["status"] == "failed"]
Expand All @@ -195,7 +201,11 @@ async def test_insights(
logger.info(f"Test insights finished {workspace_id=} {duration=}(ms)")


def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDataSdk, workspace_id: str) -> None:
def create_localized_workspaces(
data_product: GoodDataConfigProduct,
sdk_facade: GoodDataApiWrapper,
workspace_id: str,
) -> None:
if data_product.localization is None:
return
for to in data_product.localization.to:
Expand All @@ -205,15 +215,14 @@ def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDa
source=data_product.localization.from_language, target=to.language
).translate_batch
logging.info(f"create_localized_workspaces layout_root_path={GOODDATA_LAYOUTS_DIR / data_product.id}")
sdk.catalog_workspace.generate_localized_workspaces(
sdk_facade.generate_localized_workspaces(
workspace_id,
to_lang=to.language,
to_locale=to.locale,
from_lang=data_product.localization.from_language,
to=to,
data_product=data_product,
translator_func=translator_func,
layout_path=GOODDATA_LAYOUTS_DIR / data_product.id,
provision_workspace=True,
store_layouts=False,
layout_root_path=GOODDATA_LAYOUTS_DIR / data_product.id,
)


Expand Down Expand Up @@ -329,7 +338,7 @@ def process_organization(
if args.method == "upload_notification":
dbt_profiles = DbtProfiles(args)
# Caches are invalidated only per data source, not per data product
upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id)
upload_notification(logger, sdk_wrapper, dbt_profiles.data_source_id)
elif args.method == "register_data_sources":
register_data_source(logger, args, gd_config.all_model_ids, sdk_wrapper)
else:
Expand All @@ -345,21 +354,21 @@ def process_organization(
workspace_id = f"{data_product.id}_{environment.id}"
workspace_title = f"{data_product.name} ({environment.name})"
if args.method == "provision_workspaces":
create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title)
create_workspace(logger, sdk_wrapper, workspace_id, workspace_title)
elif args.method == "deploy_ldm":
deploy_ldm(
logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id
)
if data_product.localization:
create_localized_workspaces(data_product, sdk_wrapper.sdk, workspace_id)
create_localized_workspaces(data_product, sdk_wrapper.sdk_facade, workspace_id)
elif args.method == "store_analytics":
store_analytics(logger, sdk_wrapper.sdk, workspace_id, data_product)
elif args.method == "deploy_analytics":
deploy_analytics(logger, sdk_wrapper, workspace_id, data_product)
elif args.method == "test_insights":
parallelism = gd_config.global_properties.test_insights_parallelism or 1
asyncio.run(
test_insights(logger, sdk_wrapper.sdk, workspace_id, data_product.skip_tests, parallelism)
test_insights(logger, sdk_wrapper, workspace_id, data_product.skip_tests, parallelism)
)
else:
raise Exception(f"Unsupported method requested in args: {args.method}")
Expand Down
116 changes: 116 additions & 0 deletions gooddata-dbt/gooddata_dbt/gooddata/api_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# (C) 2023 GoodData Corporation
import logging
from pathlib import Path
from typing import Any, List, Union

from gooddata_dbt.gooddata.config import GoodDataConfigLocalizationTo, GoodDataConfigProduct

from gooddata_sdk import (
CatalogDataSourcePostgres,
CatalogDataSourceSnowflake,
CatalogDataSourceVertica,
CatalogDeclarativeAnalytics,
CatalogDeclarativeColumn,
CatalogDeclarativeModel,
CatalogDeclarativeTable,
CatalogDeclarativeTables,
CatalogScanModelRequest,
CatalogWorkspace,
GoodDataSdk,
Insight,
)

DataSource = Union[CatalogDataSourcePostgres, CatalogDataSourceSnowflake, CatalogDataSourceVertica]


class GoodDataApiWrapper:
def __init__(self, sdk: GoodDataSdk, logger: logging.Logger, dry_run: bool = False) -> None:
self.sdk = sdk
self.logger = logger
self.dry_run = dry_run

def get_insights(self, workspace_id: str) -> List[Insight]:
if self.dry_run:
self.logger.info("Dry run - skipping insights listing")
return []
else:
return self.sdk.insights.get_insights(workspace_id)

def execute_insight(self, workspace_id: str, insight: Insight) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping insights execution")
else:
self.sdk.tables.for_insight(workspace_id, insight)

def scan_data_source(self, data_source_id: str, scan_request: CatalogScanModelRequest) -> CatalogDeclarativeTables:
if self.dry_run:
self.logger.info("Dry run - skipping data source scanning")
return CatalogDeclarativeTables(
tables=[
CatalogDeclarativeTable(
id="dry_run",
type="DATA_SOURCE_TABLE",
path=["table"],
columns=[CatalogDeclarativeColumn(name="dry_run", data_type="STRING")],
)
]
)
else:
return self.sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm

def put_declarative_ldm(self, workspace_id: str, declarative_ldm: CatalogDeclarativeModel) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping declarative LDM put")
else:
self.sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, declarative_ldm)

def create_workspace(self, workspace: CatalogWorkspace) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping workspace creation")
else:
self.sdk.catalog_workspace.create_or_update(workspace=workspace)

def create_or_update_data_source(self, data_source: DataSource) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping data source creation")
else:
self.sdk.catalog_data_source.create_or_update_data_source(data_source)

def register_upload_notification(self, data_source_id: str) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping upload notification registration")
else:
self.sdk.catalog_data_source.register_upload_notification(data_source_id)

def put_declarative_analytics_model(self, workspace_id: str, adm: CatalogDeclarativeAnalytics) -> None:
if self.dry_run:
self.logger.info("Dry run - skipping declarative analytics model put")
else:
self.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)

def generate_localized_workspaces(
self,
workspace_id: str,
to: GoodDataConfigLocalizationTo,
data_product: GoodDataConfigProduct,
translator_func: Any,
layout_path: Path,
provision_workspace: bool = True,
store_layouts: bool = False,
) -> None:
from_language = "en"
if data_product.localization is not None:
from_language = data_product.localization.from_language
if self.dry_run:
self.logger.info("Dry run - skipping localized workspaces generation")
else:
self.sdk.catalog_workspace.generate_localized_workspaces(
workspace_id,
to_lang=to.language,
to_locale=to.locale,
from_lang=from_language,
translator_func=translator_func,
layout_root_path=layout_path,
provision_workspace=provision_workspace,
store_layouts=store_layouts,
)
Loading

0 comments on commit 6249c53

Please sign in to comment.