-
Notifications
You must be signed in to change notification settings - Fork 0
/
couac.go
324 lines (302 loc) · 10.3 KB
/
couac.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// Package couac provides a helpful wrapper around ADBC for DuckDB.
package couac
import (
"context"
"fmt"
"github.com/apache/arrow-adbc/go/adbc"
drivermgr "github.com/apache/arrow-adbc/go/adbc/drivermgr"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
json "github.com/goccy/go-json"
)
type ObjectDepth int
const (
ObjectDepthAll ObjectDepth = iota
ObjectDepthCatalogs
ObjectDepthDBSchemas
ObjectDepthTables
ObjectDepthColumns = ObjectDepthAll
)
// Quacker represents a DuckDB database.
type Quacker struct {
ctx context.Context
drv adbc.Driver
db adbc.Database
// duckdb database connections
ducklings []*QuackCon
path string
driverPath string
}
// QuackCon represents a connection to a DuckDB database.
type QuackCon struct {
parent *Quacker
conn adbc.Connection
}
type Statement = adbc.Statement
// Option configures a Quacker
type (
Option func(config)
config *Quacker
)
// WithPath option provides the location of the DuckDB file,
// if none provided, defaults to in-memory.
func WithPath(path string) Option {
return func(cfg config) {
cfg.path = path
}
}
// WithDriverPath specifies the location of libduckdb.so, if driver
// path is empty, defaults to /usr/local/lib.
func WithDriverPath(path string) Option {
return func(cfg config) {
cfg.driverPath = path
}
}
// NewDuck opens a DuckDB database. WithPath option provides the location of
// the DuckDB file, if none provided, defaults to in-memory.
// The WithDriverPath specifies the location of libduckdb.so, if driver
// path is empty, defaults to /usr/local/lib.
func NewDuck(opts ...Option) (*Quacker, error) {
var err error
var dPath string
couac := new(Quacker)
for _, opt := range opts {
opt(couac)
}
if couac.driverPath == "" {
dPath = "/usr/local/lib/libduckdb.so"
} else {
dPath = couac.driverPath
}
couac.ctx = context.TODO()
couac.drv = drivermgr.Driver{}
dbOpts := make(map[string]string)
dbOpts["driver"] = dPath
dbOpts["entrypoint"] = "duckdb_adbc_init"
if couac.path != "" {
dbOpts["path"] = couac.path
}
couac.db, err = couac.drv.NewDatabase(dbOpts)
if err != nil {
return nil, fmt.Errorf("new database error: %v", err)
}
return couac, nil
}
func (q *Quacker) NewConnection() (*QuackCon, error) {
var err error
qc := new(QuackCon)
qc.conn, err = q.db.Open(context.Background())
if err != nil {
return nil, fmt.Errorf("db open error: %v", err)
}
qc.parent = q
q.ducklings = append(q.ducklings, qc)
return qc, nil
}
// Close closes the database and releases any associated resources.
// It is important to do this to allow DuckDB to properly commit all WAL file changes before closing.
func (q *Quacker) Close() {
for _, d := range q.ducklings {
d.conn.Close()
}
q.db.Close()
}
// Close closes the connection to database and releases any associated resources.
// It is important to do this to allow DuckDB to properly commit all WAL file changes before closing.
func (q *QuackCon) Close() {
q.parent = nil
q.conn.Close()
}
// Exec executes a statement that does not generate a result
// set. It returns the number of rows affected if known, otherwise -1.
func (q *QuackCon) Exec(ctx context.Context, query string) (int64, error) {
var u int64
stmt, err := q.conn.NewStatement()
if err != nil {
return u, fmt.Errorf("new statement error: %v", err)
}
defer stmt.Close()
err = stmt.SetSqlQuery(query)
if err != nil {
return u, fmt.Errorf("error setting sql query: %v", err)
}
u, err = stmt.ExecuteUpdate(ctx)
return u, err
}
// GetObjects gets a hierarchical view of all catalogs, database schemas,
// tables, and columns.
//
// The result is an Arrow Dataset with the following schema:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// catalog_name | utf8
// catalog_db_schemas | list<DB_SCHEMA_SCHEMA>
//
// DB_SCHEMA_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// db_schema_name | utf8
// db_schema_tables | list<TABLE_SCHEMA>
//
// TABLE_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// table_name | utf8 not null
// table_type | utf8 not null
// table_columns | list<COLUMN_SCHEMA>
// table_constraints | list<CONSTRAINT_SCHEMA>
//
// COLUMN_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type | Comments
// ----------------------------|---------------------|---------
// column_name | utf8 not null |
// ordinal_position | int32 | (1)
// remarks | utf8 | (2)
// xdbc_data_type | int16 | (3)
// xdbc_type_name | utf8 | (3)
// xdbc_column_size | int32 | (3)
// xdbc_decimal_digits | int16 | (3)
// xdbc_num_prec_radix | int16 | (3)
// xdbc_nullable | int16 | (3)
// xdbc_column_def | utf8 | (3)
// xdbc_sql_data_type | int16 | (3)
// xdbc_datetime_sub | int16 | (3)
// xdbc_char_octet_length | int32 | (3)
// xdbc_is_nullable | utf8 | (3)
// xdbc_scope_catalog | utf8 | (3)
// xdbc_scope_schema | utf8 | (3)
// xdbc_scope_table | utf8 | (3)
// xdbc_is_autoincrement | bool | (3)
// xdbc_is_generatedcolumn | bool | (3)
//
// 1. The column's ordinal position in the table (starting from 1).
// 2. Database-specific description of the column.
// 3. Optional Value. Should be null if not supported by the driver.
// xdbc_values are meant to provide JDBC/ODBC-compatible metadata
// in an agnostic manner.
//
// CONSTRAINT_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type | Comments
// ----------------------------|---------------------|---------
// constraint_name | utf8 |
// constraint_type | utf8 not null | (1)
// constraint_column_names | list<utf8> not null | (2)
// constraint_column_usage | list<USAGE_SCHEMA> | (3)
//
// 1. One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
// 2. The columns on the current table that are constrained, in order.
// 3. For FOREIGN KEY only, the referenced table and columns.
//
// USAGE_SCHEMA is a Struct with fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// fk_catalog | utf8
// fk_db_schema | utf8
// fk_table | utf8 not null
// fk_column_name | utf8 not null
func (q *QuackCon) GetObjectsMap() ([]map[string]any, error) {
rr, err := q.conn.GetObjects(q.parent.ctx, adbc.ObjectDepthAll, nil, nil, nil, nil, nil)
if err != nil {
return nil, err
}
var m []map[string]any
for rr.Next() {
var ob []byte
rec := rr.Record()
ob, err = rec.MarshalJSON()
if err != nil {
return nil, err
}
fmt.Println(string(ob))
err = json.Unmarshal(ob, &m)
if err != nil {
return nil, err
}
break
}
return m, err
}
// GetTableSchema returns the Arrow scheme of a DuckDB table. Pass nil for catalog and dbSchema
// to use the default catalog and database schema.
func (q *QuackCon) GetTableSchema(ctx context.Context, catalog, dbSchema *string, tableName string) (*arrow.Schema, error) {
return q.conn.GetTableSchema(ctx, catalog, dbSchema, tableName)
}
// IngestCreateAppend attempts to ingest an Arrow record into the DuckDB database, creating the table
// from the record's schema if it does not exist. It returns the number of rows affected if known, otherwise -1.
// Ingest mode switches between Create and Append since DuckDB does not currently support CreateAppend mode.
// DuckDB also does not support AutoCommit option.
func (q *QuackCon) IngestCreateAppend(ctx context.Context, destTable string, rec arrow.Record) (int64, error) {
var u int64
if destTable == "" {
return u, fmt.Errorf("destination table name error")
}
if rec == nil {
return u, fmt.Errorf("nil arrow record")
}
schema, _ := q.conn.GetTableSchema(ctx, nil, nil, destTable)
stmt, err := q.conn.NewStatement()
if err != nil {
return u, fmt.Errorf("new statement error: %v", err)
}
defer stmt.Close()
// If schema is non-nil the table is assumed to exist, Append ingest mode will be used;
// otherwise the table will be created with Create ingest mode.
if schema == nil {
err = stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate)
if err != nil {
return u, fmt.Errorf("set option ingest mode create error: %v", err)
}
} else {
err = stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend)
if err != nil {
return u, fmt.Errorf("set option ingest mode append error: %v", err)
}
}
// Invalid Argument: Statement Set Option adbc.connection.autocommit is not yet accepted by DuckDB
// err = stmt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueEnabled)
// if err != nil {
// return 0, fmt.Errorf("setoption autocommit error: %v", err)
// }
err = stmt.SetOption(adbc.OptionKeyIngestTargetTable, destTable)
if err != nil {
return u, fmt.Errorf("set option target table error: %v", err)
}
err = stmt.Bind(ctx, rec)
if err != nil {
return u, fmt.Errorf("statement binding arrow record error: %v", err)
}
u, err = stmt.ExecuteUpdate(ctx)
return u, err
}
// NewStatement initializes a new statement object tied to an open connection.
// The caller must close the statement when done with it.
func (q *QuackCon) NewStatement() (Statement, error) {
if q.conn == nil {
return nil, fmt.Errorf("database connection is closed")
}
return q.conn.NewStatement()
}
// Query executes the query and returns a RecordReader for the results, the statement, and the number
// of rows affected if known, otherwise it will be -1. The statement should be closed once done with the
// RecordReader.
// Since ADBC 1.1.0: releasing the returned RecordReader without consuming it fully is equivalent to
// calling AdbcStatementCancel.
func (q *QuackCon) Query(ctx context.Context, query string) (array.RecordReader, adbc.Statement, int64, error) {
var u int64
stmt, err := q.conn.NewStatement()
if err != nil {
return nil, nil, u, fmt.Errorf("new statement error: %v", err)
}
err = stmt.SetSqlQuery(query)
if err != nil {
return nil, nil, u, fmt.Errorf("error setting sql query: %v", err)
}
rr, u, err := stmt.ExecuteQuery(ctx)
return rr, stmt, u, err
}