Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRIVIAL: gooddata-dbt - better reporting to stdout and pipeline logs #375

Merged
2 commits merged into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 78 additions & 43 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from time import time
from typing import List, Optional

import tabulate
import yaml
from gooddata_dbt.args import parse_arguments
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
from gooddata_dbt.dbt.profiles import DbtOutput, DbtProfiles
from gooddata_dbt.dbt.tables import DbtModelTables
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigProduct
Expand Down Expand Up @@ -82,13 +83,16 @@ def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str
def deploy_ldm(
logger: logging.Logger,
sdk: GoodDataSdk,
args: Namespace,
data_source_id: str,
dbt_tables: DbtModelTables,
model_ids: Optional[List[str]],
workspace_id: str,
) -> None:
logger.info("Generate and put LDM")
generate_and_put_ldm(sdk, data_source_id, workspace_id, dbt_tables, model_ids)
workspace_url = f"{args.gooddata_host}/modeler/#/{workspace_id}"
logger.info(f"LDM successfully loaded, verify here: {workspace_url}")


def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str) -> None:
Expand All @@ -97,7 +101,7 @@ def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id


def deploy_analytics(
logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, data_product: GoodDataConfigProduct
logger: logging.Logger, sdk: GoodDataSdk, args: Namespace, workspace_id: str, data_product: GoodDataConfigProduct
) -> None:
logger.info(f"Deploy analytics {workspace_id=}")

Expand All @@ -108,6 +112,9 @@ def deploy_analytics(
logger.info("Load analytics model into GoodData")
sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)

workspace_url = f"{args.gooddata_host}/dashboards/#/workspace/{workspace_id}"
logger.info(f"Analytics successfully loaded, verify here: {workspace_url}")


def store_analytics(
logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, data_product: GoodDataConfigProduct
Expand Down Expand Up @@ -166,6 +173,7 @@ def deploy_models(
dbt_target: DbtOutput,
gd_config: GoodDataConfig,
sdk: GoodDataSdk,
args: Namespace,
data_source_id: str,
) -> None:
dbt_tables = DbtModelTables.from_local(gooddata_upper_case, gd_config.all_model_ids)
Expand All @@ -181,11 +189,69 @@ def deploy_models(
workspace_id = f"{data_product.id}_{environment.id}"
workspace_title = f"{data_product.name} ({environment.name})"
create_workspace(logger, sdk, workspace_id, workspace_title)
deploy_ldm(logger, sdk, data_source_id, dbt_tables, data_product.model_ids, workspace_id)
deploy_ldm(logger, sdk, args, data_source_id, dbt_tables, data_product.model_ids, workspace_id)
if data_product.localization:
create_localized_workspaces(data_product, sdk, workspace_id)


def get_table(data: List[list], headers: List[str], fmt: str) -> str:
return tabulate.tabulate(data, headers=headers, tablefmt=fmt)


def dbt_cloud_stats_degradations(
args: Namespace,
logger: logging.Logger,
environment_id: str,
model_executions: List[DbtExecution],
dbt_conn: DbtConnection,
) -> None:
logger.info("Get stats for historical executions...")
models_history_avg_execution_times = dbt_conn.get_average_times(logger, model_executions, environment_id, 5)
differences = []
degradations = 0
for execution in model_executions:
for model_id, avg_time in models_history_avg_execution_times.items():
last_execution = execution.execution_info.execution_time
difference = float(last_execution - avg_time) / avg_time * 100
degradation = difference > args.allowed_degradation
if model_id == execution.unique_id:
differences.append(
[
model_id,
round(last_execution, 2),
round(avg_time, 2),
round(difference, 2),
str(degradation),
]
)
if degradation:
degradations += 1
headers = ["Model ID", "Last duration(s)", "Average duration(s)", "Difference(%)", "Degradation"]
pretty_table = get_table(differences, headers, "outline")
if degradations > 0:
logger.warning(
"Stats for historical executions contain degradations! "
+ f"Threshold={args.allowed_degradation}%\n{pretty_table}"
)
else:
logger.info(f"Stats for historical executions:\n{pretty_table}")

gitlab_token = os.getenv("GITLAB_TOKEN")
if os.getenv("CI_MERGE_REQUEST_IID") and gitlab_token:
# Running in Gitlab CI pipeline, report performance of executions to the merge request to notify code reviewer
git_table = get_table(differences, headers, "github")
logger.info("Sending report to related Gitlab merge request as comment")
if degradations > 0:
message = (
"WARNING: some executions of dbt models in this merge request are slower than average!"
+ f"Threshold={args.allowed_degradation}%\n\n{git_table}"
)
report_message_to_merge_request(gitlab_token, message)
else:
message = f"INFO: performance of all executions of dbt models is OK!\n\n{git_table}"
report_message_to_merge_request(gitlab_token, message)


def dbt_cloud_stats(
args: Namespace,
logger: logging.Logger,
Expand All @@ -196,47 +262,15 @@ def dbt_cloud_stats(
dbt_conn = DbtConnection(credentials=DbtCredentials(account_id=args.account_id, token=args.token))
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
model_executions = dbt_conn.get_last_execution(environment_id, len(dbt_tables.tables))
exec_list = []
for execution in model_executions:
logger.info(f"Model {execution.unique_id} finished in {execution.execution_info.execution_time:.2f}s")
exec_list.append([execution.unique_id, round(execution.execution_info.execution_time, 2)])

logger.info("Get stats for historical executions...")
models_history_avg_execution_times = dbt_conn.get_average_times(logger, model_executions, environment_id, 5)
degradations = {}
for execution in model_executions:
for model_id, avg_time in models_history_avg_execution_times.items():
degradation = float(execution.execution_info.execution_time - avg_time) / avg_time * 100
if model_id == execution.unique_id and degradation > args.allowed_degradation:
degradations[model_id] = {
"last_time": execution.execution_info.execution_time,
"avg_time": avg_time,
"degradation": degradation,
}
degradation_md = (
"**WARNING:** The performance of some dbt cloud jobs degraded "
+ f"by more than {args.allowed_degradation}%\n\n"
+ "| Model | Last duration(s) | Average duration(s) | Degradation |\n"
+ "|-------|------------------|---------------------|-------------|"
)
if len(degradations) > 0:
for model_id, data in degradations.items():
model_degradation_text = (
f"Model {model_id} - performance degraded: "
+ f"last_time={data['last_time']:.2f} avg_time={data['avg_time']:.2f} "
+ f"degradation={data['degradation']:.2f}%"
)
logger.warning(model_degradation_text)
model_degradation_md = "| {model_id} | {last_time} | {avg_time} | {degradation}% |".format(
model_id=model_id,
last_time=f"{data['last_time']:.2f}",
avg_time=f"{data['avg_time']:.2f}",
degradation=f"{data['degradation']:.2f}",
)
degradation_md += f"\n{model_degradation_md}"
gitlab_token = os.getenv("GITLAB_TOKEN")
if os.getenv("CI_MERGE_REQUEST_IID") and gitlab_token:
logger.info("Sending report of degradations to related Gitlab merge request as comment")
# Running in Gitlab CI pipeline, report degradations to the merge request to notify code reviewer
report_message_to_merge_request(gitlab_token, degradation_md)
headers = ["Model ID", "Duration(s)"]
pretty_table = get_table(exec_list, headers, "outline")
logger.info(f"Stats for last execution:\n{pretty_table}")

dbt_cloud_stats_degradations(args, logger, environment_id, model_executions, dbt_conn)


def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[str]) -> None:
Expand Down Expand Up @@ -288,6 +322,7 @@ def main() -> None:
dbt_target,
gd_config,
sdk,
args,
data_source_id,
)
else:
Expand All @@ -300,7 +335,7 @@ def main() -> None:
if args.method == "store_analytics":
store_analytics(logger, sdk, workspace_id, data_product)
elif args.method == "deploy_analytics":
deploy_analytics(logger, sdk, workspace_id, data_product)
deploy_analytics(logger, sdk, args, workspace_id, data_product)
elif args.method == "test_insights":
test_insights(logger, sdk, workspace_id)
else:
Expand Down
3 changes: 3 additions & 0 deletions gooddata-dbt/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ ignore_missing_imports = True

[mypy-requests.*]
ignore_missing_imports = True

[mypy-tabulate.*]
ignore_missing_imports = True
1 change: 1 addition & 0 deletions gooddata-dbt/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pyyaml>=5.1
attrs==21.4.0
cattrs==22.1.0
requests~=2.31.0
tabulate~=0.8.10
9 changes: 8 additions & 1 deletion gooddata-dbt/setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
# (C) 2023 GoodData Corporation
from setuptools import find_packages, setup

REQUIRES = ["gooddata-sdk~=1.7.0", "pyyaml>=5.1", "attrs==21.4.0", "cattrs==22.1.0", "requests~=2.31.0"]
REQUIRES = [
"gooddata-sdk~=1.7.0",
"pyyaml>=5.1",
"attrs==21.4.0",
"cattrs==22.1.0",
"requests~=2.31.0",
"tabulate~=0.8.10",
]

setup(
name="gooddata-dbt",
Expand Down