Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing Custom columns #1120

Open
Jacky56 opened this issue Oct 26, 2024 · 5 comments
Open

Parsing Custom columns #1120

Jacky56 opened this issue Oct 26, 2024 · 5 comments

Comments

@Jacky56
Copy link

Jacky56 commented Oct 26, 2024

With the rise of storing vectors and very unique data structures, the base columns from piccolo.columns is simply not good enough to express these datatypes for our DBs.

In my example I am creating a custom column type called Vector:

from piccolo.columns import Column

class Vector(Column):
    def __init__(self, length: int, *args,  **kwargs):
        self.length = length
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.length})"

Now every time I try to save the vector, I must serialise it as such:

Table.vector = a_serialiser_method(Table.vector)
Table.save()

Now because Table.vector = a_serialiser_method(Table.vector) creates side effects, I must unserialise it to the python type.

This also applies to fetching the vector, I must deserialise it to the python equalivent:

rows = Table.objects().all()
for row in rows:
  row.vector = a_deserialiser_method(row.vector)

Question

Is there a way to simply add a default serialising/deserialising behaviour for custom columns?

Suggestion

extend Column so that we add default serialiser/deserialiser methods to translate python types to db types and vice versa such as:

from piccolo.columns import Column

class Vector(Column):
    def __init__(self, length: int, *args,  **kwargs):
        self.length = length
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.length})"

   def serializer(self): # suggestion 1
       """a method to translate the python datatype to database column type"""
       self.value = value.tostring()
       return self
 
  def deserializer(self): # suggestion 2
       """a method to translate the database column type to python datatype"""
       self.value =  self.value.tolist()
       return self

and these serializer and deserializer are pre hooks when calling methods such as .todict() or .save()/writing to db.
These methods should also not generate side effects.

@Jacky56 Jacky56 closed this as completed Oct 26, 2024
@Jacky56 Jacky56 reopened this Oct 26, 2024
@Jacky56 Jacky56 changed the title parse data Parsing Custom columns Oct 26, 2024
@Jacky56
Copy link
Author

Jacky56 commented Oct 26, 2024

In my case, pgvector needs the vector to be a string such as '[1,2,3]' but in the python code it should be accessed as a iterable.

@sinisaos
Copy link
Member

In my case, pgvector needs the vector to be a string such as '[1,2,3]' but in the python code it should be accessed as a iterable.

@Jacky56 I don't know if this helps, but you can use the json module to transform '[1,2,3]' into an iterable (list) like this

import json

embeddings = '[1,2,3]'
results = json.loads(embeddings)
print(type(results))  # <class 'list'>

You can also look in this discussion. If Piccolo doesn't support a feature, you can always just use raw sql to get the desired result. Hope this helps.

@Jacky56
Copy link
Author

Jacky56 commented Oct 27, 2024

hello @sinisaos
the suggested discussion is what I'm currently doing, but constantly using Table.raw to resolve most uncovered query needs kinda defeats the purpose of the ORM existing.

When I grab an object from the database, the python data structure is as shows:

obj = Table.objects().first()
obj.embedding # piccolo resolves this as a string when converting the custom column

Since its a custom column, therefore the dev should have the ability to edit the behaviour on how the column is serialised to python and deserialised as a sql statement.

which that I suspect a behaviour such as:

obj = Table.objects().first()
obj.embedding # piccolo resolves this as `np.ndarray` as the developer specified how it should be serialised from the defined method on `Column`

@sinisaos
Copy link
Member

@Jacky56 I don't know if this helps, but you can use the methods of the table class to serialize/deserialize. Try something like this for serialization/deserialization data.

Example script
import asyncio
import json

import numpy as np
from piccolo.columns import Column, Varchar
from piccolo.engine import engine_finder
from piccolo.engine.postgres import PostgresEngine
from piccolo.table import Table

DB = PostgresEngine(
    config={
        "database": "vectordb",
        "user": "postgres",
        "password": "postgres",
        "host": "localhost",
        "port": 5432,
    }
)


# Custom vector column
class Vector(Column):
    def __init__(self, dimensions: int, *args, **kwargs):
        self.dimensions = dimensions
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.dimensions})"


# Create a JSON Encoder class
class json_serialize(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)


class Task(Table, db=DB):
    """
    An example table.
    """

    name = Varchar()
    embeddings = Vector(dimensions=3)

    # use class mmethods for serialization
    @classmethod
    def serializer(cls, value):  # suggestion 1
        """a method to translate the python datatype to database column type"""
        if isinstance(value, np.ndarray):
            return json.dumps(value, cls=json_serialize)
        else:
            return json.dumps(value)

    @classmethod
    def deserializer(cls, value):  # suggestion 2
        """a method to translate the database column type to python datatype"""
        return json.loads(value)


async def open_database_connection_pool():
    try:
        engine = engine_finder()
        await engine.start_connection_pool()
    except Exception:
        print("Unable to connect to the database")


async def close_database_connection_pool():
    try:
        engine = engine_finder()
        await engine.close_connection_pool()
    except Exception:
        print("Unable to connect to the database")


async def main():
    # create table
    await Task.create_table(if_not_exists=True)

    embedding = np.array([1, 2, 3])  # or [1, 2, 3]
    # insert data using serializer
    task = Task(name="Task 1", embeddings=Task.serializer(embedding))
    await task.save()
    # retrive data using deserializer
    obj = await Task.objects()
    for i in obj:
        print(Task.deserializer(i.embeddings))


if __name__ == "__main__":

    asyncio.run(main())

Sorry if I missed your point and don't understand your needs.

@Jacky56
Copy link
Author

Jacky56 commented Oct 29, 2024

Thank you for your reply @sinisaos but its not quite whats desired.

The thing I am describing actually exist in the code here Column.get_sql_value, This translate the python type to the sql equivalent

as we can see here for example, it translates the python list to the psql array syntax.

It would be nice to somehow override this method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants