Skip to content

Commit

Permalink
#2020 #2151 - fixes dbt 1.8.6 and arrow dict types (#2175)
Browse files Browse the repository at this point in the history
* fix links (#1977)

* correctly converts dict arrow types into dlt types

* drops dbt compat code for versions below 1.5

* ignores encoding errors when reading from process pipe

---------

Co-authored-by: adrianbr <[email protected]>
  • Loading branch information
rudolfix and adrianbr authored Dec 23, 2024
1 parent fb17e99 commit 5afa6ce
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 32 deletions.
4 changes: 4 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
return dict(data_type="decimal", precision=dtype.precision, scale=dtype.scale)
elif pyarrow.types.is_nested(dtype):
return dict(data_type="json")
elif pyarrow.types.is_dictionary(dtype):
# Dictionary types are essentially categorical encodings. The underlying value_type
# dictates the "logical" type. We simply delegate to the underlying value_type.
return get_column_type_from_py_arrow(dtype.value_type)
else:
raise ValueError(dtype)

Expand Down
16 changes: 14 additions & 2 deletions dlt/common/runners/stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ def iter_std(
Use -u in scripts_args for unbuffered python execution
"""
with venv.start_command(
command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True
command,
*script_args,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
text=True,
errors="backslashreplace",
) as process:
exit_code: int = None
q_: queue.Queue[Tuple[OutputStdStreamNo, str]] = queue.Queue()
Expand Down Expand Up @@ -72,7 +78,13 @@ def _r_q(std_: OutputStdStreamNo) -> None:
def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]:
# start a process in virtual environment, assume that text comes from stdout
with venv.start_command(
command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True
command,
*script_args,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
text=True,
errors="backslashreplace",
) as process:
exit_code: int = None
line = ""
Expand Down
16 changes: 12 additions & 4 deletions dlt/common/runners/venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,18 @@ def run_command(self, entry_point: str, *script_args: Any) -> str:
# runs one of installed entry points typically CLIs coming with packages and installed into PATH
command = os.path.join(self.context.bin_path, entry_point)
cmd = [command, *script_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)

def run_script(self, script_path: str, *script_args: Any) -> str:
"""Runs a python `script` source with specified `script_args`. Current `os.environ` and cwd is passed to executed process"""
# os.environ is passed to executed process
cmd = [self.context.env_exe, os.path.abspath(script_path), *script_args]
try:
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)
except subprocess.CalledProcessError as cpe:
if cpe.returncode == 2:
raise FileNotFoundError(script_path)
Expand All @@ -115,7 +119,9 @@ def run_script(self, script_path: str, *script_args: Any) -> str:
def run_module(self, module: str, *module_args: Any) -> str:
"""Runs a python `module` with specified `module_args`. Current `os.environ` and cwd is passed to executed process"""
cmd = [self.context.env_exe, "-m", module, *module_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)

def add_dependencies(self, dependencies: List[str] = None) -> None:
Venv._install_deps(self.context, dependencies)
Expand All @@ -134,7 +140,9 @@ def _install_deps(context: types.SimpleNamespace, dependencies: List[str]) -> No
cmd = [context.env_exe, "-Im", Venv.PIP_TOOL, "install"]

try:
subprocess.check_output(cmd + dependencies, stderr=subprocess.STDOUT)
subprocess.check_output(
cmd + dependencies, stderr=subprocess.STDOUT, errors="backslashreplace"
)
except subprocess.CalledProcessError as exc:
raise CannotInstallDependencies(dependencies, context.env_exe, exc.output)

Expand Down
32 changes: 10 additions & 22 deletions dlt/helpers/dbt/dbt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,16 @@

# can only import DBT after redirect is disabled
# https://stackoverflow.com/questions/48619517/call-a-click-command-from-code
except ImportError:
pass

try:
import dbt.logger
from dbt.contracts import results as dbt_results
except ModuleNotFoundError:
raise MissingDependencyException("DBT Core", ["dbt-core"])

try:
# dbt <1.5
from dbt.main import handle_and_check # type: ignore[import-not-found]
except ImportError:
# dbt >=1.5
from dbt.cli.main import dbtRunner

try:
from dbt.exceptions import FailFastException # type: ignore
from dbt.exceptions import FailFastError
except ImportError:
from dbt.exceptions import FailFastError as FailFastException
raise MissingDependencyException("DBT Core", ["dbt-core"])

_DBT_LOGGER_INITIALIZED = False

Expand Down Expand Up @@ -135,15 +128,10 @@ def run_dbt_command(
runner_args = (global_args or []) + [command] + args # type: ignore

with dbt.logger.log_manager.applicationbound():
try:
# dbt 1.5
runner = dbtRunner()
run_result = runner.invoke(runner_args)
success = run_result.success
results = run_result.result # type: ignore
except NameError:
# dbt < 1.5
results, success = handle_and_check(runner_args)
runner = dbtRunner()
run_result = runner.invoke(runner_args)
success = run_result.success
results = run_result.result # type: ignore

assert type(success) is bool
parsed_results = parse_dbt_execution_results(results)
Expand All @@ -157,7 +145,7 @@ def run_dbt_command(
except SystemExit as sys_ex:
# oftentimes dbt tries to exit on error
raise DBTProcessingError(command, None, sys_ex)
except FailFastException as ff:
except FailFastError as ff:
dbt_exc = DBTProcessingError(command, parse_dbt_execution_results(ff.result), ff.result)
# detect incremental model out of sync
if is_incremental_schema_out_of_sync_error(ff.result):
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/general-usage/credentials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ keywords: [credentials, secrets.toml, secrets, config, configuration, environmen
---
import DocCardList from '@theme/DocCardList';

`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](setup):
`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](./setup):

1. Environment variables
2. Configuration files (`secrets.toml` and `config.toml`)
3. Key managers and vaults

`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](setup/#naming-convention). It then [injects](advanced/#injection-mechanism) these values where needed in code.
`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](./setup/#naming-convention). It then [injects](./advanced/#injection-mechanism) these values where needed in code.

# Learn details about

Expand Down
4 changes: 2 additions & 2 deletions tests/helpers/dbt_tests/test_runner_dbt_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def client() -> Iterator[PostgresClient]:
PACKAGE_PARAMS = [
("postgres", "1.5.2"),
("postgres", "1.6.13"),
("postgres", "1.8.1"),
("postgres", "1.8.6"),
("postgres", None),
("snowflake", "1.5.2"),
("snowflake", "1.6.13"),
("snowflake", "1.8.1"),
("snowflake", "1.8.6"),
("snowflake", None),
]
PACKAGE_IDS = [
Expand Down
12 changes: 12 additions & 0 deletions tests/libs/pyarrow/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ def test_py_arrow_to_table_schema_columns():
assert result == dlt_schema


def test_py_arrow_dict_to_column() -> None:
array_1 = pa.array(["a", "b", "c"], type=pa.dictionary(pa.int8(), pa.string()))
array_2 = pa.array([1, 2, 3], type=pa.dictionary(pa.int8(), pa.int64()))
table = pa.table({"strings": array_1, "ints": array_2})
columns = py_arrow_to_table_schema_columns(table.schema)
assert columns == {
"strings": {"name": "strings", "nullable": True, "data_type": "text"},
"ints": {"name": "ints", "nullable": True, "data_type": "bigint"},
}
assert table.to_pydict() == {"strings": ["a", "b", "c"], "ints": [1, 2, 3]}


def test_to_arrow_scalar() -> None:
naive_dt = get_py_arrow_timestamp(6, tz=None)
# print(naive_dt)
Expand Down

0 comments on commit 5afa6ce

Please sign in to comment.