From 20cf28b70805818e8efec6ee2ded252cd79a1238 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 15 Jun 2024 13:58:08 +0200 Subject: [PATCH 1/5] stateful queries --- quackpipe.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/quackpipe.go b/quackpipe.go index 656a4ef..745820c 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -14,6 +14,7 @@ import ( "regexp" "strings" "time" + "crypto/sha256" _ "github.com/marcboeker/go-duckdb" // load duckdb driver ) @@ -56,11 +57,11 @@ func quack(query string, stdin bool, format string, params string) (string, erro if !stdin { check(db.Exec("LOAD httpfs; LOAD json; LOAD parquet;")) } - + if staticAliases != "" { check(db.Exec(staticAliases)) } - + startTime := time.Now() rows, err := db.Query(query) if err != nil { @@ -325,7 +326,7 @@ func main() { default: w.Header().Set("Content-Type", "text/html; charset=utf-8") } - + // format handling if r.URL.Query().Get("default_format") != "" { default_format = r.URL.Query().Get("default_format") @@ -335,15 +336,13 @@ func main() { default_params = r.URL.Query().Get("default_params") } // auth to hash based temp file storage - /* username, password, ok := r.BasicAuth() + hashdb := "" if ok && len(password) > 0 { hash := sha256.Sum256([]byte(username + password)) - hashdb, _ := fmt.Printf("/tmp/%x.db", hash) - repath := regexp.MustCompile(`(.*?)\?`) - default_params = repath.ReplaceAllString(default_params, repath+"?") + hashdb = fmt.Sprintf("/tmp/%x.db", hash) } - */ + // extract FORMAT from query and override the current `default_format` cleanquery, format := extractAndRemoveFormat(query) if len(format) > 0 { @@ -354,11 +353,20 @@ func main() { if len(query) == 0 { _, _ = w.Write([]byte(staticPlay)) } else { - result, err := quack(query, false, default_format, default_params) - if err != nil { - _, _ = w.Write([]byte(err.Error())) + if (len(hashdb) > 0) { + result, err := quack(query, false, default_format, hashdb + "?"+ default_params) + if err != nil { + _, _ = w.Write([]byte(err.Error())) + } else { + _, _ = w.Write([]byte(result)) + } } else { - _, _ = w.Write([]byte(result)) + result, err := quack(query, false, default_format, default_params) + if err != nil { + _, _ = w.Write([]byte(err.Error())) + } else { + _, _ = w.Write([]byte(result)) + } } } }) From a27bc5c61b81bb91ede14d224d49625db928bcd4 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 15 Jun 2024 17:22:14 +0200 Subject: [PATCH 2/5] resync --- quackpipe.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/quackpipe.go b/quackpipe.go index 745820c..48cb2e9 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -30,8 +30,10 @@ type CommandLineFlags struct { Host *string `json:"host"` Port *string `json:"port"` Stdin *bool `json:"stdin"` + Alias *bool `json:"alias"` Format *string `json:"format"` Params *string `json:"params"` + DBPath *string `json:"dbpath"` } var appFlags CommandLineFlags @@ -45,8 +47,13 @@ func check(args ...interface{}) { } } -func quack(query string, stdin bool, format string, params string) (string, error) { +func quack(query string, stdin bool, format string, params string, hashdb string) (string, error) { var err error + alias := *appFlags.Alias + + if (len(hashdb) > 0) { + params = hashdb + "?" + params + } db, err = sql.Open("duckdb", params) if err != nil { @@ -54,11 +61,12 @@ func quack(query string, stdin bool, format string, params string) (string, erro } defer db.Close() + if !stdin { check(db.Exec("LOAD httpfs; LOAD json; LOAD parquet;")) } - if staticAliases != "" { + if (alias) && (staticAliases != "") { check(db.Exec(staticAliases)) } @@ -89,7 +97,9 @@ func initFlags() { appFlags.Port = flag.String("port", "8123", "API port. Default 8123") appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") + appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp.") appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") + appFlags.Alias = flag.Bool("alias", false, "Built-in Aliases. Slower. Default false") flag.Parse() } @@ -275,6 +285,8 @@ func main() { initFlags() default_format := *appFlags.Format default_params := *appFlags.Params + default_path := *appFlags.DBPath + if *appFlags.Stdin { scanner := bufio.NewScanner((os.Stdin)) query := "" @@ -289,7 +301,7 @@ func main() { query = cleanquery default_format = format } - result, err := quack(query, true, default_format, default_params) + result, err := quack(query, true, default_format, default_params, "") if err != nil { fmt.Println(err) os.Exit(1) @@ -340,7 +352,7 @@ func main() { hashdb := "" if ok && len(password) > 0 { hash := sha256.Sum256([]byte(username + password)) - hashdb = fmt.Sprintf("/tmp/%x.db", hash) + hashdb = fmt.Sprintf("%s/%x.db", default_path, hash) } // extract FORMAT from query and override the current `default_format` @@ -353,20 +365,11 @@ func main() { if len(query) == 0 { _, _ = w.Write([]byte(staticPlay)) } else { - if (len(hashdb) > 0) { - result, err := quack(query, false, default_format, hashdb + "?"+ default_params) - if err != nil { - _, _ = w.Write([]byte(err.Error())) - } else { - _, _ = w.Write([]byte(result)) - } + result, err := quack(query, false, default_format, default_params, hashdb) + if err != nil { + _, _ = w.Write([]byte(err.Error())) } else { - result, err := quack(query, false, default_format, default_params) - if err != nil { - _, _ = w.Write([]byte(err.Error())) - } else { - _, _ = w.Write([]byte(result)) - } + _, _ = w.Write([]byte(result)) } } }) From 989b737e00919b6398abd83a408beeb38a1aaa95 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sat, 15 Jun 2024 18:09:21 +0200 Subject: [PATCH 3/5] Update quackpipe.go --- quackpipe.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quackpipe.go b/quackpipe.go index 48cb2e9..985d5aa 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -61,9 +61,9 @@ func quack(query string, stdin bool, format string, params string, hashdb string } defer db.Close() - if !stdin { - check(db.Exec("LOAD httpfs; LOAD json; LOAD parquet;")) + check(db.Exec("SET autoinstall_known_extensions=1;")) + check(db.Exec("SET autoload_known_extensions=1;")) } if (alias) && (staticAliases != "") { @@ -97,9 +97,9 @@ func initFlags() { appFlags.Port = flag.String("port", "8123", "API port. Default 8123") appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") - appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp.") + appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") - appFlags.Alias = flag.Bool("alias", false, "Built-in Aliases. Slower. Default false") + appFlags.Alias = flag.Bool("alias", false, "Built-in CH Aliases. Default false") flag.Parse() } From 2e1fe3e68bc7f0dfdba7747042b65b508f5894fc Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sat, 15 Jun 2024 18:15:09 +0200 Subject: [PATCH 4/5] Update quackpipe.go --- quackpipe.go | 1 + 1 file changed, 1 insertion(+) diff --git a/quackpipe.go b/quackpipe.go index 985d5aa..1a0cedd 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -62,6 +62,7 @@ func quack(query string, stdin bool, format string, params string, hashdb string defer db.Close() if !stdin { + check(db.Exec("LOAD httpfs; LOAD json; LOAD parquet;")) check(db.Exec("SET autoinstall_known_extensions=1;")) check(db.Exec("SET autoload_known_extensions=1;")) } From 5431eb7946527b2d1bd72d950ebe188343caba06 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 15 Jun 2024 18:27:30 +0200 Subject: [PATCH 5/5] use context --- quackpipe.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/quackpipe.go b/quackpipe.go index 48cb2e9..2f2965d 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -1,6 +1,7 @@ package main import ( + "context" "bufio" "database/sql" _ "embed" @@ -63,15 +64,17 @@ func quack(query string, stdin bool, format string, params string, hashdb string if !stdin { - check(db.Exec("LOAD httpfs; LOAD json; LOAD parquet;")) + check(db.ExecContext(context.Background(),"LOAD httpfs; LOAD json; LOAD parquet;")) + check(db.ExecContext(context.Background(),"SET autoinstall_known_extensions=1;")) + check(db.ExecContext(context.Background(),"SET autoload_known_extensions=1;")) } if (alias) && (staticAliases != "") { - check(db.Exec(staticAliases)) + check(db.ExecContext(context.Background(), staticAliases)) } startTime := time.Now() - rows, err := db.Query(query) + rows, err := db.QueryContext(context.Background(), query) if err != nil { return "", err }