Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(doris): add catalog support for Apache Doris #31580

Merged
merged 8 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions superset/db_engine_specs/doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

from flask_babel import gettext as __
from sqlalchemy import Float, Integer, Numeric, String, TEXT, types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
from sqlalchemy.sql.type_api import TypeEngine

from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.errors import SupersetErrorType
from superset.models.core import Database
from superset.utils.core import GenericDataType

# Regular expressions to catch custom errors
Expand Down Expand Up @@ -111,6 +113,7 @@
)
encryption_parameters = {"ssl": "0"}
supports_dynamic_schema = True
supports_catalog = supports_dynamic_catalog = True

column_type_mappings = ( # type: ignore
(
Expand Down Expand Up @@ -245,17 +248,45 @@
catalog: Optional[str] = None,
schema: Optional[str] = None,
) -> tuple[URL, dict[str, Any]]:
database = uri.database
if schema and database:
schema = parse.quote(schema, safe="")
if "." in database:
database = database.split(".")[0] + "." + schema
else:
database = "internal." + schema
uri = uri.set(database=database)

if catalog:
pass

Check warning on line 252 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L252

Added line #L252 was not covered by tests
elif uri.database and "." in uri.database:
catalog, _ = uri.database.split(".", 1)
else:
catalog = "internal"

# In Apache Doris, each catalog has an information_schema for BI tool
# compatibility. See: https://github.com/apache/doris/pull/28919
schema = schema or "information_schema"
database = ".".join([catalog, schema])
villebro marked this conversation as resolved.
Show resolved Hide resolved
uri = uri.set(database=database)
return uri, connect_args

@classmethod
def get_default_catalog(cls, database: Database) -> Optional[str]:
"""
Return the default catalog.
"""
if database.url_object.database is None:
return None

return database.url_object.database.split(".")[0]

@classmethod
def get_catalog_names(
cls,
database: Database,
inspector: Inspector,
) -> set[str]:
"""
Get all catalogs.
For Doris, the SHOW CATALOGS command returns multiple columns:
CatalogId, CatalogName, Type, IsCurrent, CreateTime, LastUpdateTime, Comment
We need to extract just the CatalogName column.
"""
result = inspector.bind.execute("SHOW CATALOGS")
return {row.CatalogName for row in result}

@classmethod
def get_schema_from_engine_params(
cls,
Expand Down
97 changes: 83 additions & 14 deletions tests/unit_tests/db_engine_specs/test_doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

from typing import Any, Optional
from unittest.mock import Mock

import pytest
from sqlalchemy import JSON, types
Expand Down Expand Up @@ -85,25 +86,25 @@ def test_get_column_spec(
(
"doris://user:password@host/db1",
{"param1": "some_value"},
"db1",
"internal.information_schema",
villebro marked this conversation as resolved.
Show resolved Hide resolved
{"param1": "some_value"},
),
(
"pydoris://user:password@host/db1",
{"param1": "some_value"},
"db1",
"internal.information_schema",
{"param1": "some_value"},
),
(
"doris://user:password@host/catalog1.db1",
{"param1": "some_value"},
"catalog1.db1",
"catalog1.information_schema",
{"param1": "some_value"},
),
(
"pydoris://user:password@host/catalog1.db1",
{"param1": "some_value"},
"catalog1.db1",
"catalog1.information_schema",
{"param1": "some_value"},
),
],
Expand All @@ -120,28 +121,96 @@ def test_adjust_engine_params(
returned_url, returned_connect_args = DorisEngineSpec.adjust_engine_params(
url, connect_args
)

assert returned_url.database == return_schema
assert returned_connect_args == return_connect_args


def test_get_schema_from_engine_params() -> None:
@pytest.mark.parametrize(
"url,expected_schema",
[
("doris://localhost:9030/hive.test", "test"),
("doris://localhost:9030/hive", None),
],
)
def test_get_schema_from_engine_params(
url: str, expected_schema: Optional[str]
) -> None:
"""
Test the ``get_schema_from_engine_params`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec

assert (
DorisEngineSpec.get_schema_from_engine_params(
make_url("doris://localhost:9030/hive.test"),
make_url(url),
{},
)
== "test"
== expected_schema
)

assert (
DorisEngineSpec.get_schema_from_engine_params(
make_url("doris://localhost:9030/hive"),
{},
)
is None
)

@pytest.mark.parametrize(
"database_value,expected_catalog",
[
("catalog1.schema1", "catalog1"),
("catalog1", "catalog1"),
(None, None),
],
)
def test_get_default_catalog(
database_value: Optional[str], expected_catalog: Optional[str]
) -> None:
"""
Test the ``get_default_catalog`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)
database.url_object.database = database_value

assert DorisEngineSpec.get_default_catalog(database) == expected_catalog


@pytest.mark.parametrize(
"mock_catalogs,expected_result",
[
(
[
Mock(CatalogName="catalog1"),
Mock(CatalogName="catalog2"),
Mock(CatalogName="catalog3"),
],
{"catalog1", "catalog2", "catalog3"},
),
(
[Mock(CatalogName="single_catalog")],
{"single_catalog"},
),
(
[],
set(),
),
],
)
def test_get_catalog_names(
mock_catalogs: list[Mock], expected_result: set[str]
) -> None:
"""
Test the ``get_catalog_names`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)
inspector = Mock()
inspector.bind.execute.return_value = mock_catalogs

catalogs = DorisEngineSpec.get_catalog_names(database, inspector)

# Verify the SQL query
inspector.bind.execute.assert_called_once_with("SHOW CATALOGS")

# Verify the returned catalog names
assert catalogs == expected_result
Loading