From 7061deb4dac45ccfa35a88015f67f7a2ff06182d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 8 Dec 2023 12:47:44 +0100 Subject: [PATCH] Fix "schema name" propagation --- target_cratedb/sinks.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/target_cratedb/sinks.py b/target_cratedb/sinks.py index e947a4f..ea43aa9 100644 --- a/target_cratedb/sinks.py +++ b/target_cratedb/sinks.py @@ -1,11 +1,11 @@ """CrateDB target sink class, which handles writing streams.""" import datetime import time -from typing import List, Optional +from typing import List, Optional, Union import sqlalchemy from pendulum import now -from sqlalchemy import Column, bindparam, insert, select, update +from sqlalchemy import Column, Executable, MetaData, Table, bindparam, insert, select, update from target_postgres.sinks import PostgresSink from target_cratedb.connector import CrateDBConnector @@ -296,3 +296,22 @@ def activate_version(self, new_version: int) -> None: bindparam("version", value=new_version, type_=integer_type), ) connection.execute(query) + + def generate_insert_statement( + self, + full_table_name: str, + columns: List[Column], + ) -> Union[str, Executable]: + """Generate an insert statement for the given records. + + Args: + full_table_name: the target table name. + schema: the JSON schema for the new table. + + Returns: + An insert statement. + """ + # FIXME: + metadata = MetaData(schema=self.schema_name) + table = Table(full_table_name, metadata, *columns) + return insert(table)