Skip to content

Commit

Permalink
refactor: replace legacy typing with the new one
Browse files Browse the repository at this point in the history
JIRA: PSDK-203
risk: low
  • Loading branch information
hkad98 committed Aug 14, 2024
1 parent 2e80f8b commit 2b3cce4
Show file tree
Hide file tree
Showing 112 changed files with 698 additions and 718 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Getting Started

1. Ensure you have at minimum Python 3.12 installed; Python 3.10 and 3.9 are optional for multi-environment tests
1. Ensure you have at minimum Python 3.12 installed; Python 3.11, 3.10 and 3.9 are optional for multi-environment tests

This repo uses [tox](https://tox.readthedocs.io/en/latest/) and by default will try to run tests against all
supported versions. If you have only subset of supported python interpreters installed, see
Expand Down
2 changes: 1 addition & 1 deletion fmt-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ruff==0.3.5
ruff==0.5.7
6 changes: 3 additions & 3 deletions gooddata-dbt/gooddata_dbt/dbt/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# (C) 2023 GoodData Corporation
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Type, TypeVar
from typing import Any, TypeVar

import attrs
from cattrs import structure
Expand Down Expand Up @@ -71,13 +71,13 @@ class DbtTests(Enum):
@attrs.define
class Base:
@classmethod
def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
def from_dict(cls: type[T], data: dict[str, Any]) -> T:
"""
Creates object from dictionary.
"""
return structure(data, cls)

def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""
Converts object into dictionary.
"""
Expand Down
28 changes: 14 additions & 14 deletions gooddata-dbt/gooddata_dbt/dbt/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import time
from pathlib import Path
from typing import Dict, List, Tuple, Union
from typing import Union

import attrs
import requests
Expand Down Expand Up @@ -74,7 +74,7 @@ def can_connect(self) -> bool:
return response.status_code == 200

@property
def bearer_token_header(self) -> Dict:
def bearer_token_header(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}


Expand All @@ -84,7 +84,7 @@ class DbtExecutionInfo:
execution_time: float

@classmethod
def from_dict(cls, data: Dict) -> "DbtExecutionInfo":
def from_dict(cls, data: dict) -> "DbtExecutionInfo":
return structure(data, cls)


Expand All @@ -95,7 +95,7 @@ class DbtExecutionHistoryInfo:
status: str

@classmethod
def from_dict(cls, data: Dict) -> "DbtExecutionHistoryInfo":
def from_dict(cls, data: dict) -> "DbtExecutionHistoryInfo":
return structure(data, cls)


Expand All @@ -106,7 +106,7 @@ class DbtExecution:
execution_info: DbtExecutionInfo

@classmethod
def from_dict(cls, data: Dict) -> "DbtExecution":
def from_dict(cls, data: dict) -> "DbtExecution":
return structure(data, cls)


Expand All @@ -119,7 +119,7 @@ def __attrs_post_init__(self) -> None:
if not can_connect:
raise ValueError("Cannot connect to dbt Cloud. Please, check credentials.")

def _post_rest(self, url: str, data: Dict) -> Dict:
def _post_rest(self, url: str, data: dict) -> dict:
response = requests.post(url, headers=self.credentials.bearer_token_header, data=data)
if response.status_code != 200:
raise ValueError(
Expand All @@ -139,7 +139,7 @@ def _post_graphql(self, query: str, variables: dict) -> dict:
)
return response.json()

def run_job(self, logger: logging.Logger, job_id: str) -> Tuple[str, str]:
def run_job(self, logger: logging.Logger, job_id: str) -> tuple[str, str]:
url = f"{self.base_v2}/accounts/{self.credentials.account_id}/jobs/{job_id}/run/"
data = {"cause": "Triggered via API by gooddata-dbt plugin"}
# Allow testing from localhost where COMMIT_SHA is not set
Expand All @@ -161,13 +161,13 @@ def run_job(self, logger: logging.Logger, job_id: str) -> Tuple[str, str]:
return run_id, run_href

@staticmethod
def _was_fetch_success(response: Dict) -> bool:
def _was_fetch_success(response: dict) -> bool:
is_complete = safeget(response, ["data", "is_complete"])
is_success = safeget(response, ["data", "is_success"])
return is_complete and is_success

@staticmethod
def _is_fetch_done(response: Dict) -> bool:
def _is_fetch_done(response: dict) -> bool:
is_complete = safeget(response, ["data", "is_complete"])
is_error = safeget(response, ["data", "is_error"])
if is_error:
Expand Down Expand Up @@ -224,7 +224,7 @@ def download_artifact(self, run_id: str, identifier: str, path: Union[str, Path]
def download_manifest(self, run_id: str, path: Union[str, Path] = Path("target")) -> None:
self.download_artifact(run_id, "manifest.json", path)

def read_env_vars(self, project_id: str, job_id: str) -> Dict:
def read_env_vars(self, project_id: str, job_id: str) -> dict:
url = (
f"{self.base_v3}/accounts/{self.credentials.account_id}/projects/{project_id}"
f"/environment-variables/job/?job_definition_id={job_id}"
Expand Down Expand Up @@ -282,7 +282,7 @@ def make_profiles(
def string_camel_to_snake(element: str) -> str:
return "".join(["_" + c.lower() if c.isupper() else c for c in element]).lstrip("_")

def dict_camel_to_snake(self, data: Union[Dict, List]) -> Union[Dict, List]:
def dict_camel_to_snake(self, data: Union[dict, list]) -> Union[dict, list]:
if isinstance(data, list):
result = []
for record in data:
Expand All @@ -299,15 +299,15 @@ def dict_camel_to_snake(self, data: Union[Dict, List]) -> Union[Dict, List]:
result[self.string_camel_to_snake(key)] = value # type: ignore
return result

def get_last_execution(self, environment_id: str, model_count: int) -> List[DbtExecution]:
def get_last_execution(self, environment_id: str, model_count: int) -> list[DbtExecution]:
variables = {"environmentId": environment_id, "first": model_count}
result = self._post_graphql(self.graphql_applied_models, variables)
model_edges = self.dict_camel_to_snake(safeget(result, ["data", "environment", "applied", "models", "edges"]))
return [DbtExecution.from_dict(m["node"]) for m in model_edges]

def get_average_times(
self, logger: logging.Logger, models: List[DbtExecution], environment_id: str, history_count: int
) -> Dict[str, float]:
self, logger: logging.Logger, models: list[DbtExecution], environment_id: str, history_count: int
) -> dict[str, float]:
models_history_avg_execution_times = {}
for model in models:
variables = {"environmentId": environment_id, "modelId": model.unique_id, "first": history_count}
Expand Down
5 changes: 2 additions & 3 deletions gooddata-dbt/gooddata_dbt/dbt/environment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# (C) 2023 GoodData Corporation
from typing import Dict

import attrs
from cattrs import structure
Expand Down Expand Up @@ -37,10 +36,10 @@ class DbtCloudEnvironment:
credentials: DbtCloudEnvironmentCredentials

@classmethod
def from_dict(cls, data: Dict) -> "DbtCloudEnvironment":
def from_dict(cls, data: dict) -> "DbtCloudEnvironment":
return structure(data, cls)

def to_profile(self, password: str, schema_name: str) -> Dict:
def to_profile(self, password: str, schema_name: str) -> dict:
# TODO: add support for other DB types, now it is hardcoded to Snowflake
return {
"title": f"{self.project.connection.name} ({self.name} dbtCloud)",
Expand Down
12 changes: 6 additions & 6 deletions gooddata-dbt/gooddata_dbt/dbt/metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# (C) 2023 GoodData Corporation
import json
import re
from typing import List, Optional
from typing import Optional

import attrs
from gooddata_sdk import CatalogDeclarativeMetric, CatalogDeclarativeModel
Expand Down Expand Up @@ -49,18 +49,18 @@ class DbtModelMetric(DbtModelBase):
model: str
calculation_method: str
expression: str
filters: Optional[List[DbtModelMetricFilter]] = None
filters: Optional[list[DbtModelMetricFilter]] = None


class DbtModelMetrics:
def __init__(self, model_ids: Optional[List[str]], ldm: CatalogDeclarativeModel) -> None:
def __init__(self, model_ids: Optional[list[str]], ldm: CatalogDeclarativeModel) -> None:
self.model_ids = model_ids
self.ldm = ldm
with open(DBT_PATH_TO_MANIFEST) as fp:
self.dbt_catalog = json.load(fp)

@property
def metrics(self) -> List[DbtModelMetric]:
def metrics(self) -> list[DbtModelMetric]:
result = []
for metric_def in self.dbt_catalog["metrics"].values():
result.append(DbtModelMetric.from_dict(metric_def))
Expand Down Expand Up @@ -127,7 +127,7 @@ def resolve_entities_in_expression(self, expression: str, table_name: str) -> st
result_tokens.append(entity_id or token)
return " ".join(result_tokens)

def make_gooddata_filter(self, table_name: str, dbt_filters: Optional[List[DbtModelMetricFilter]] = None) -> str:
def make_gooddata_filter(self, table_name: str, dbt_filters: Optional[list[DbtModelMetricFilter]] = None) -> str:
# TODO - Quite naive implementation
# e.g. missing polishing of values (e.g. SQL vs MAQL enclosers)
gd_maql_filters = []
Expand All @@ -141,7 +141,7 @@ def make_gooddata_filter(self, table_name: str, dbt_filters: Optional[List[DbtMo
else:
return ""

def make_gooddata_metrics(self) -> List[CatalogDeclarativeMetric]:
def make_gooddata_metrics(self) -> list[CatalogDeclarativeMetric]:
gd_metrics = []
for dbt_metric in self.metrics:
calculation_method = DBT_TO_GD_CALC_METHODS.get(dbt_metric.calculation_method)
Expand Down
10 changes: 5 additions & 5 deletions gooddata-dbt/gooddata_dbt/dbt/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import argparse
import os
import re
from typing import Dict, List, Optional, Union
from typing import Optional, Union
from urllib.parse import quote_plus

import attrs
Expand Down Expand Up @@ -191,7 +191,7 @@ def to_gooddata(self, data_source_id: str, schema_name: str) -> CatalogDataSourc
@attrs.define(auto_attribs=True, kw_only=True)
class DbtProfile(Base):
name: str
outputs: List[DbtOutput]
outputs: list[DbtOutput]


class DbtProfiles:
Expand All @@ -207,7 +207,7 @@ def __init__(self, args: argparse.Namespace) -> None:
self.dbt_profiles = yaml.safe_load(fp)

@staticmethod
def inject_env_vars(output_def: Dict) -> None:
def inject_env_vars(output_def: dict) -> None:
env_re = re.compile(r"\{\{ env_var\('([^']+)'(,\s*'([^']+)')?\) \}\}")
for output_key, output_value in output_def.items():
if (env_match := env_re.search(str(output_value))) is not None:
Expand All @@ -222,7 +222,7 @@ def inject_env_vars(output_def: Dict) -> None:
# else do nothing, real value seems to be stored in dbt profile

@staticmethod
def to_data_class(output: str, output_def: Dict) -> Optional[DbtOutput]:
def to_data_class(output: str, output_def: dict) -> Optional[DbtOutput]:
db_type = output_def["type"]
if db_type == "postgres":
return DbtOutputPostgreSQL.from_dict({"name": output, **output_def})
Expand All @@ -241,7 +241,7 @@ def to_data_class(output: str, output_def: Dict) -> Optional[DbtOutput]:
raise Exception(f"Unsupported database type {output=} {db_type=}")

@property
def profiles(self) -> List[DbtProfile]:
def profiles(self) -> list[DbtProfile]:
profiles = []
for profile, profile_def in self.dbt_profiles.items():
outputs = []
Expand Down
Loading

0 comments on commit 2b3cce4

Please sign in to comment.