Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR #8

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog for Meltano/Singer Target for CrateDB

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,27 @@ LIMIT
```


## Vector Store Support

In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
data type, you will need to install `numpy`. It has been added to an "extra"
of the Python package, called `vector`.

When installing the package using pip, this would apply:
```
pip install 'meltano-target-cratedb[vector]'
```

When installing the package using the Meltano's project definition, this
would probably be the right way to write it down, but it hasn't been verified
yet.
```yaml
- name: target-cratedb
variant: cratedb
pip_url: meltano-target-cratedb[vector]
```


## Development

In order to work on this adapter dialect on behalf of a real pipeline definition,
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ dynamic = [
dependencies = [
"crate[sqlalchemy]",
"cratedb-toolkit",
'importlib-resources; python_version < "3.9"',
"meltanolabs-target-postgres==0.0.9",
'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9",
Copy link
Contributor Author

@amotl amotl Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's apparently a rendering glitch by pyproject-fmt.

What I wrote down, to signal that meltanolabs-target-postgres was only hooked off temporarily, was:

'importlib-resources; python_version < "3.9"',
# "meltanolabs-target-postgres==0.0.9",

pyproject-fmt was so kind to fold the comment next to the previous line ;]. Is it legit? This one might be related:

/cc @gaborbernat, @edgarrmondragon

"meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
]
[project.optional-dependencies]
all = [
"meltano-target-cratedb[vector]",
]
develop = [
"black<24",
"mypy==1.7.1",
Expand All @@ -115,6 +118,9 @@ test = [
"pytest-cov<5",
"pytest-mock<4",
]
vector = [
"numpy",
]
[project.urls]
changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md"
documentation = "https://github.com/crate-workbench/meltano-target-cratedb"
Expand Down
2 changes: 1 addition & 1 deletion target_cratedb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Init CrateDB."""
from target_cratedb.patch import patch_sqlalchemy
from target_cratedb.sqlalchemy.patch import patch_sqlalchemy

patch_sqlalchemy()
80 changes: 72 additions & 8 deletions target_cratedb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from datetime import datetime

import sqlalchemy
import sqlalchemy as sa
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
from sqlalchemy.types import (
ARRAY,
BIGINT,
BOOLEAN,
DATE,
DATETIME,
DECIMAL,
FLOAT,
INTEGER,
TEXT,
TIME,
Expand All @@ -22,7 +26,8 @@
)
from target_postgres.connector import NOTYPE, PostgresConnector

from target_cratedb.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.vector import FloatVector


class CrateDBConnector(PostgresConnector):
Expand Down Expand Up @@ -111,8 +116,52 @@ def pick_individual_type(jsonschema_type: dict):
if "object" in jsonschema_type["type"]:
return ObjectType
if "array" in jsonschema_type["type"]:
# TODO: Handle other inner-types as well?
# Select between different kinds of `ARRAY` data types.
#
# This currently leverages an unspecified definition for the Singer SCHEMA,
# using the `additionalProperties` attribute to convey additional type
# information, agnostic of the target database.
#
# In this case, it is about telling different kinds of `ARRAY` types apart:
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
# alternatively, it can be a "vector" kind `ARRAY` of floating point
# numbers, effectively what pgvector is storing in its `VECTOR` type.
#
# Still, `type: "vector"` is only a surrogate label here, because other
# database systems may use different types for implementing the same thing,
# and need to translate accordingly.
"""
Schema override rule in `meltano.yml`:
type: "array"
items:
type: "number"
additionalProperties:
storage:
type: "vector"
dim: 4
Produced schema annotation in `catalog.json`:
{"type": "array",
"items": {"type": "number"},
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
"""
if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]:
storage_properties = jsonschema_type["additionalProperties"]["storage"]
if "type" in storage_properties and storage_properties["type"] == "vector":
# On PostgreSQL/pgvector, use the corresponding type definition
# from its SQLAlchemy dialect.
return FloatVector(storage_properties["dim"])

# Discover/translate inner types.
inner_type = resolve_array_inner_type(jsonschema_type)
if inner_type is not None:
return ARRAY(inner_type)

# When type discovery fails, assume `TEXT`.
return ARRAY(TEXT())

if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
individual_type = th.to_sql_type(jsonschema_type)
Expand All @@ -139,20 +188,18 @@ def pick_best_sql_type(sql_type_array: list):
DATE,
TIME,
DECIMAL,
FLOAT,
BIGINT,
INTEGER,
BOOLEAN,
NOTYPE,
ARRAY,
ObjectType,
FloatVector,
ObjectTypeImpl,
]

for sql_type in precedence_order:
for obj in sql_type_array:
# FIXME: Workaround. Currently, ObjectType can not be resolved back to a type?
# TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
if isinstance(sql_type, ObjectTypeImpl):
return ObjectType
if isinstance(obj, sql_type):
return obj
return TEXT()
Expand Down Expand Up @@ -188,6 +235,8 @@ def _get_type_sort_key(

if isinstance(sql_type, _ObjectArray):
return 0, _len
if isinstance(sql_type, FloatVector):
return 0, _len
if isinstance(sql_type, NOTYPE):
return 0, _len

Expand Down Expand Up @@ -245,3 +294,18 @@ def prepare_schema(self, schema_name: str) -> None:
Don't emit `CREATE SCHEMA` statements to CrateDB.
"""
pass


def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]:
if "items" in jsonschema_type:
if is_boolean_type(jsonschema_type["items"]):
return BOOLEAN()
if is_number_type(jsonschema_type["items"]):
return FLOAT()
if is_integer_type(jsonschema_type["items"]):
return BIGINT()
if is_object_type(jsonschema_type["items"]):
return ObjectType()
if is_array_type(jsonschema_type["items"]):
return resolve_array_inner_type(jsonschema_type["items"]["type"])
return None
Empty file.
38 changes: 35 additions & 3 deletions target_cratedb/patch.py → target_cratedb/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
from datetime import datetime

import sqlalchemy as sa
from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime
from _decimal import Decimal
from crate.client.http import CrateJsonEncoder
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
from crate.client.sqlalchemy.types import _ObjectArray
from sqlalchemy.sql import sqltypes


def patch_sqlalchemy():
patch_types()
patch_json_encoder()


def patch_types():
"""
Register missing timestamp data type.
Register missing data types, and fix erroneous ones.
TODO: Upstream to crate-python.
"""
# TODO: Submit patch to `crate-python`.
TYPES_MAP["bigint"] = sqltypes.BIGINT
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["long"] = sqltypes.BIGINT
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["real"] = sqltypes.DOUBLE
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP

# TODO: Can `ARRAY` be inherited from PostgreSQL's
# `ARRAY`, to make type checking work?

def as_generic(self):
return sqltypes.ARRAY

Expand All @@ -36,6 +51,23 @@ def process(value):
DateTime.bind_processor = bind_processor


def patch_json_encoder():
"""
`Decimal` types have been rendered as strings.
TODO: Upstream to crate-python.
"""

json_encoder_default = CrateJsonEncoder.default

def default(self, o):
if isinstance(o, Decimal):
return float(o)
return json_encoder_default(o)

CrateJsonEncoder.default = default


def polyfill_refresh_after_dml_engine(engine: sa.Engine):
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
Expand Down
Loading