Skip to content

Commit

Permalink
TRIVIAL: gooddata-dbt - better report stats from db cloud
Browse files Browse the repository at this point in the history
pretty tables
report always, just change the prefix based on if there are any degradations
  • Loading branch information
jaceksan committed Oct 4, 2023
1 parent 4a5c236 commit 2cb17e9
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 42 deletions.
113 changes: 71 additions & 42 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from time import time
from typing import List, Optional

import tabulate
import yaml
from gooddata_dbt.args import parse_arguments
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
from gooddata_dbt.dbt.profiles import DbtOutput, DbtProfiles
from gooddata_dbt.dbt.tables import DbtModelTables
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigProduct
Expand Down Expand Up @@ -172,6 +173,7 @@ def deploy_models(
dbt_target: DbtOutput,
gd_config: GoodDataConfig,
sdk: GoodDataSdk,
args: Namespace,
data_source_id: str,
) -> None:
dbt_tables = DbtModelTables.from_local(gooddata_upper_case, gd_config.all_model_ids)
Expand All @@ -187,11 +189,69 @@ def deploy_models(
workspace_id = f"{data_product.id}_{environment.id}"
workspace_title = f"{data_product.name} ({environment.name})"
create_workspace(logger, sdk, workspace_id, workspace_title)
deploy_ldm(logger, sdk, data_source_id, dbt_tables, data_product.model_ids, workspace_id)
deploy_ldm(logger, sdk, args, data_source_id, dbt_tables, data_product.model_ids, workspace_id)
if data_product.localization:
create_localized_workspaces(data_product, sdk, workspace_id)


def get_table(data: list[list], headers: list[str], fmt: str) -> str:
return tabulate.tabulate(data, headers=headers, tablefmt=fmt)


def dbt_cloud_stats_degradations(
args: Namespace,
logger: logging.Logger,
environment_id: str,
model_executions: List[DbtExecution],
dbt_conn: DbtConnection,
):
logger.info("Get stats for historical executions...")
models_history_avg_execution_times = dbt_conn.get_average_times(logger, model_executions, environment_id, 5)
differences = []
degradations = 0
for execution in model_executions:
for model_id, avg_time in models_history_avg_execution_times.items():
last_execution = execution.execution_info.execution_time
difference = float(last_execution - avg_time) / avg_time * 100
degradation = difference > args.allowed_degradation
if model_id == execution.unique_id:
differences.append(
[
model_id,
round(last_execution, 2),
round(avg_time, 2),
round(difference, 2),
str(degradation),
]
)
if degradation:
degradations += 1
headers = ["Model ID", "Last duration(s)", "Average duration(s)", "Difference(%)", "Degradation"]
pretty_table = get_table(differences, headers, "outline")
if degradations > 0:
logger.warning(
"Stats for historical executions contain degradations! "
+ f"Threshold={args.allowed_degradation}%\n{pretty_table}"
)
else:
logger.info(f"Stats for historical executions:\n{pretty_table}")

gitlab_token = os.getenv("GITLAB_TOKEN")
if os.getenv("CI_MERGE_REQUEST_IID") and gitlab_token:
# Running in Gitlab CI pipeline, report performance of executions to the merge request to notify code reviewer
git_table = get_table(differences, headers, "github")
logger.info("Sending report to related Gitlab merge request as comment")
if degradations > 0:
message = (
"WARNING: some executions of dbt models in this merge request are slower than average!"
+ f"Threshold={args.allowed_degradation}%\n\n{git_table}"
)
report_message_to_merge_request(gitlab_token, message)
else:
message = f"INFO: performance of all executions of dbt models is OK!\n\n{git_table}"
report_message_to_merge_request(gitlab_token, message)


def dbt_cloud_stats(
args: Namespace,
logger: logging.Logger,
Expand All @@ -202,47 +262,15 @@ def dbt_cloud_stats(
dbt_conn = DbtConnection(credentials=DbtCredentials(account_id=args.account_id, token=args.token))
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
model_executions = dbt_conn.get_last_execution(environment_id, len(dbt_tables.tables))
exec_list = []
for execution in model_executions:
logger.info(f"Model {execution.unique_id} finished in {execution.execution_info.execution_time:.2f}s")
exec_list.append([execution.unique_id, round(execution.execution_info.execution_time, 2)])

logger.info("Get stats for historical executions...")
models_history_avg_execution_times = dbt_conn.get_average_times(logger, model_executions, environment_id, 5)
degradations = {}
for execution in model_executions:
for model_id, avg_time in models_history_avg_execution_times.items():
degradation = float(execution.execution_info.execution_time - avg_time) / avg_time * 100
if model_id == execution.unique_id and degradation > args.allowed_degradation:
degradations[model_id] = {
"last_time": execution.execution_info.execution_time,
"avg_time": avg_time,
"degradation": degradation,
}
degradation_md = (
"**WARNING:** The performance of some dbt cloud jobs degraded "
+ f"by more than {args.allowed_degradation}%\n\n"
+ "| Model | Last duration(s) | Average duration(s) | Degradation |\n"
+ "|-------|------------------|---------------------|-------------|"
)
if len(degradations) > 0:
for model_id, data in degradations.items():
model_degradation_text = (
f"Model {model_id} - performance degraded: "
+ f"last_time={data['last_time']:.2f} avg_time={data['avg_time']:.2f} "
+ f"degradation={data['degradation']:.2f}%"
)
logger.warning(model_degradation_text)
model_degradation_md = "| {model_id} | {last_time} | {avg_time} | {degradation}% |".format(
model_id=model_id,
last_time=f"{data['last_time']:.2f}",
avg_time=f"{data['avg_time']:.2f}",
degradation=f"{data['degradation']:.2f}",
)
degradation_md += f"\n{model_degradation_md}"
gitlab_token = os.getenv("GITLAB_TOKEN")
if os.getenv("CI_MERGE_REQUEST_IID") and gitlab_token:
logger.info("Sending report of degradations to related Gitlab merge request as comment")
# Running in Gitlab CI pipeline, report degradations to the merge request to notify code reviewer
report_message_to_merge_request(gitlab_token, degradation_md)
headers = ["Model ID", "Duration(s)"]
pretty_table = get_table(exec_list, headers, "outline")
logger.info(f"Stats for last execution:\n{pretty_table}")

dbt_cloud_stats_degradations(args, logger, environment_id, model_executions, dbt_conn)


def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[str]) -> None:
Expand Down Expand Up @@ -294,6 +322,7 @@ def main() -> None:
dbt_target,
gd_config,
sdk,
args,
data_source_id,
)
else:
Expand All @@ -306,7 +335,7 @@ def main() -> None:
if args.method == "store_analytics":
store_analytics(logger, sdk, workspace_id, data_product)
elif args.method == "deploy_analytics":
deploy_analytics(logger, sdk, workspace_id, data_product)
deploy_analytics(logger, sdk, args, workspace_id, data_product)
elif args.method == "test_insights":
test_insights(logger, sdk, workspace_id)
else:
Expand Down
1 change: 1 addition & 0 deletions gooddata-dbt/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pyyaml>=5.1
attrs==21.4.0
cattrs==22.1.0
requests~=2.31.0
tabulate~=0.8.10

0 comments on commit 2cb17e9

Please sign in to comment.