Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1272 Support ClickHouse GCS S3 compatibility mode in filesystem destination #1423

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
from copy import deepcopy
from textwrap import dedent
from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple
from urllib.parse import urlparse

Expand Down Expand Up @@ -201,22 +202,23 @@ def __init__(
compression = "none" if config.get("data_writer.disable_compression") else "gz"

if bucket_scheme in ("s3", "gs", "gcs"):
# get auth and bucket url
bucket_http_url = convert_storage_to_http_scheme(bucket_url)
access_key_id: str = None
secret_access_key: str = None
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
bucket_http_url = convert_storage_to_http_scheme(
bucket_url, endpoint=staging_credentials.endpoint_url
)
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
elif isinstance(staging_credentials, GcpCredentials):
access_key_id = client.credentials.gcp_access_key_id
secret_access_key = client.credentials.gcp_secret_access_key
if not access_key_id or not secret_access_key:
raise DestinationTransientException(
"You have tried loading from gcs with clickhouse. Please provide valid"
" 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as"
" outlined in the dlthub docs."
)
else:
raise LoadJobTerminalException(
file_path,
dedent(
"""
Google Cloud Storage buckets must be configured using the S3 compatible access pattern.
Please provide the necessary S3 credentials (access key ID and secret access key), to access the GCS bucket through the S3 API.
Refer to https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage.
""",
).strip(),
)

auth = "NOSIGN"
if access_key_id and secret_access_key:
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/clickhouse/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ def convert_storage_to_http_scheme(
protocol = "https" if use_https else "http"

if endpoint:
domain = endpoint
domain = endpoint.replace("https://", "").replace("http://", "")
elif region and parsed_url.scheme == "s3":
domain = f"s3-{region}.amazonaws.com"
else:
# TODO: Incorporate dlt.config endpoint.
storage_domains = {
"s3": "s3.amazonaws.com",
"gs": "storage.googleapis.com",
Expand Down
59 changes: 26 additions & 33 deletions docs/website/docs/dlt-ecosystem/destinations/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ destination.

The `clickhouse` destination has a few specific deviations from the default sql destinations:

1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need this feature, get in touch with our Slack community, and we will consider adding it.
1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need
this feature, get in touch with our Slack community, and we will consider adding it.
2. `Clickhouse` does not support the `time` datatype. Time will be loaded to a `text` column.
3. `Clickhouse` does not support the `binary` datatype. Binary will be loaded to a `text` column. When loading from `jsonl`, this will be a base64 string, when loading from parquet this will be
the `binary` object converted to `text`.
4. `Clickhouse` accepts adding columns to a populated table that are not null.
5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example.
5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value
12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example.

## Supported column hints

Expand Down Expand Up @@ -173,51 +175,42 @@ pipeline = dlt.pipeline(
)
```

### Using Google Cloud Storage as a Staging Area
### Using Google Cloud or S3-Compatible Storage as a Staging Area

dlt supports using Google Cloud Storage (GCS) as a staging area when loading data into ClickHouse. This is handled automatically by
ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs) which dlt uses under the hood.
dlt supports using S3-compatible storage services, including Google Cloud Storage (GCS), as a staging area when loading data into ClickHouse.
This is handled automatically by
ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs), which dlt uses under the hood.

The clickhouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys. To enable this, GCS provides an S3 compatibility mode that emulates
the Amazon S3
API. ClickHouse takes advantage of this to allow accessing GCS buckets via its S3 integration.
The ClickHouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys, which is compatible with the Amazon S3 API.
To enable this, GCS provides an S3
compatibility mode that emulates the S3 API, allowing ClickHouse to access GCS buckets via its S3 integration.

For detailed instructions on setting up S3-compatible storage with dlt, including AWS S3, MinIO, and Cloudflare R2, refer to
the [dlt documentation on filesystem destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage).

To set up GCS staging with HMAC authentication in dlt:

1. Create HMAC keys for your GCS service account by following the [Google Cloud guide](https://cloud.google.com/storage/docs/authentication/managing-hmackeys#create).

2. Configure the HMAC keys as well as the `client_email`, `project_id` and `private_key` for your service account in your dlt project's ClickHouse destination settings in `config.toml`:
2. Configure the HMAC keys (`aws_access_key_id` and `aws_secret_access_key`) in your dlt project's ClickHouse destination settings in `config.toml`, similar to how you would configure AWS S3
credentials:

```toml
[destination.filesystem]
bucket_url = "gs://dlt-ci"
bucket_url = "s3://my_awesome_bucket"

[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"

[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"
aws_access_key_id = "JFJ$$*f2058024835jFffsadf"
aws_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"
project_id = "my-awesome-project"
endpoint_url = "https://storage.googleapis.com"
```

Note: In addition to the HMAC keys (`gcp_access_key_id` and `gcp_secret_access_key`), you now need to provide the `client_email`, `project_id` and `private_key` for your service account
under `[destination.filesystem.credentials]`.
This is because the GCS staging support is now implemented as a temporary workaround and is still unoptimized.

dlt will pass these credentials to ClickHouse which will handle the authentication and GCS access.

There is active work in progress to simplify and improve the GCS staging setup for the ClickHouse dlt destination in the future. Proper GCS staging support is being tracked in these GitHub issues:

- [Make filesystem destination work with gcs in s3 compatibility mode](https://github.com/dlt-hub/dlt/issues/1272)
- [GCS staging area support](https://github.com/dlt-hub/dlt/issues/1181)
:::caution
When configuring the `bucket_url` for S3-compatible storage services like Google Cloud Storage (GCS) with ClickHouse in dlt, ensure that the URL is prepended with `s3://` instead of `gs://`. This is
because the ClickHouse GCS table function requires the use of HMAC credentials, which are compatible with the S3 API. Prepending with `s3://` allows the HMAC credentials to integrate properly with
dlt's staging mechanisms for ClickHouse.
:::

### dbt support

Expand Down
28 changes: 28 additions & 0 deletions tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Generator, Dict

import pytest

import dlt
from dlt.destinations import filesystem
from tests.load.utils import GCS_BUCKET
from tests.pipeline.utils import assert_load_info


@pytest.mark.essential
def test_clickhouse_gcs_s3_compatibility() -> None:
@dlt.resource
def dummy_data() -> Generator[Dict[str, int], None, None]:
yield {"field1": 1, "field2": 2}

gcp_bucket = filesystem(
GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp"
)

pipe = dlt.pipeline(
pipeline_name="gcs_s3_compatibility",
destination="clickhouse",
staging=gcp_bucket,
full_refresh=True,
)
pack = pipe.run([dummy_data])
assert_load_info(pack)
14 changes: 0 additions & 14 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,6 @@ def destinations_configs(
extra_info="az-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=GCS_BUCKET,
extra_info="gcs-authorization",
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
Expand All @@ -373,13 +366,6 @@ def destinations_configs(
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="jsonl",
bucket_url=GCS_BUCKET,
extra_info="gcs-authorization",
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
Expand Down
Loading