diff --git a/controller/root/root.go b/controller/root/root.go new file mode 100644 index 0000000..dc0ef11 --- /dev/null +++ b/controller/root/root.go @@ -0,0 +1,50 @@ +package root + +import ( + "crypto/sha256" + "errors" + "fmt" + "net/http" + "quackpipe/model" + "quackpipe/service/db" + "quackpipe/utils" +) + +func QueryOperation(flagInformation *model.CommandLineFlags, query string, r *http.Request, default_path string, default_format string, default_params string) (string, error) { + // auth to hash based temp file storage + username, password, ok := r.BasicAuth() + hashdb := "" + if ok && len(password) > 0 { + hash := sha256.Sum256([]byte(username + password)) + hashdb = fmt.Sprintf("%s/%x.db", default_path, hash) + } + // extract FORMAT from query and override the current `default_format` + cleanquery, format := utils.ExtractAndRemoveFormat(query) + if len(format) > 0 { + query = cleanquery + default_format = format + } + + if len(format) > 0 { + query = cleanquery + default_format = format + } + + if len(query) == 0 { + return "", errors.New("query length is empty") + } else { + rows, duration, err := db.Quack(*flagInformation, query, false, default_params, hashdb) + if err != nil { + return "", err + } else { + + result, err := utils.ConversationOfRows(rows, default_format, duration) + if err != nil { + return "", err + } + return result, nil + } + } + + return "", nil +} diff --git a/go.mod b/go.mod index daa6d60..4e1664f 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,7 @@ module quackpipe -go 1.20 +go 1.21 -require github.com/marcboeker/go-duckdb v1.7.0 +toolchain go1.22.4 -require ( - github.com/apache/arrow/go/v14 v14.0.2 // indirect - github.com/goccy/go-json v0.10.2 // indirect - github.com/google/flatbuffers v23.5.26+incompatible // indirect - github.com/klauspost/compress v1.16.7 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/zeebo/xxh3 v1.0.2 // indirect - golang.org/x/mod v0.13.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/tools v0.14.0 // indirect - golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect -) +require github.com/gorilla/mux v1.8.1 diff --git a/go.sum b/go.sum index 3a051f3..7128337 100644 --- a/go.sum +++ b/go.sum @@ -1,36 +1,2 @@ -github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= -github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= -github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= -github.com/marcboeker/go-duckdb v1.7.0 h1:c9DrS13ta+gqVgg9DiEW8I+PZBE85nBMLL/YMooYoUY= -github.com/marcboeker/go-duckdb v1.7.0/go.mod h1:WtWeqqhZoTke/Nbd7V9lnBx7I2/A/q0SAq/urGzPCMs= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= -github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= -github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..8603f0f --- /dev/null +++ b/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "quackpipe/model" + "quackpipe/route" + "quackpipe/utils" +) + +// initFlags initializes the command line flags +func initFlags() *model.CommandLineFlags { + + appFlags := &model.CommandLineFlags{} + appFlags.Host = flag.String("host", "0.0.0.0", "API host. Default 0.0.0.0") + appFlags.Port = flag.String("port", "8123", "API port. Default 8123") + appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") + appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") + appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") + appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") + appFlags.Alias = flag.Bool("alias", true, "Built-in CH Aliases. Default true") + flag.Parse() + + return appFlags +} + +var appFlags *model.CommandLineFlags + +func main() { + appFlags = initFlags() + if *appFlags.Stdin { + rows, duration, format, err := utils.ReadFromScanner(*appFlags) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + results, err := utils.ConversationOfRows(rows, format, duration) + if err != nil { + fmt.Println(err) + os.Exit(1) + } else { + fmt.Println(results) + } + + } else { + r := route.NewRouter(appFlags) + fmt.Printf("QuackPipe API Running: %s:%s\n", *appFlags.Host, *appFlags.Port) + if err := http.ListenAndServe(*appFlags.Host+":"+*appFlags.Port, r); err != nil { + panic(err) + } + + } + +} diff --git a/model/flags.go b/model/flags.go new file mode 100644 index 0000000..c9f4adf --- /dev/null +++ b/model/flags.go @@ -0,0 +1,12 @@ +package model + +// params for Flags +type CommandLineFlags struct { + Host *string `json:"host"` + Port *string `json:"port"` + Stdin *bool `json:"stdin"` + Alias *bool `json:"alias"` + Format *string `json:"format"` + Params *string `json:"params"` + DBPath *string `json:"dbpath"` +} diff --git a/model/helper.go b/model/helper.go new file mode 100644 index 0000000..1d0dd1a --- /dev/null +++ b/model/helper.go @@ -0,0 +1,23 @@ +package model + +// Metadata is the metadata for a column +type Metadata struct { + Name string `json:"name"` + Type string `json:"type"` +} + +// Statistics is the statistics for a query +type Statistics struct { + Elapsed float64 `json:"elapsed"` + RowsRead int `json:"rows_read"` + BytesRead int `json:"bytes_read"` +} + +// OutputJSON is the JSON output for a query +type OutputJSON struct { + Meta []Metadata `json:"meta"` + Data [][]interface{} `json:"data"` + Rows int `json:"rows"` + RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"` + Statistics Statistics `json:"statistics"` +} diff --git a/quackpipe.go b/quackpipe.go index b2c1a4f..52a9e3b 100644 --- a/quackpipe.go +++ b/quackpipe.go @@ -1,386 +1,386 @@ package main -import ( - "context" - "bufio" - "database/sql" - _ "embed" - "encoding/json" - "flag" - "fmt" - "io/ioutil" - "log" - "net/http" - "os" - "regexp" - "strings" - "time" - "crypto/sha256" - - _ "github.com/marcboeker/go-duckdb" // load duckdb driver -) - -//go:embed play.html -var staticPlay string - -// params for Flags -type CommandLineFlags struct { - Host *string `json:"host"` - Port *string `json:"port"` - Stdin *bool `json:"stdin"` - Alias *bool `json:"alias"` - Format *string `json:"format"` - Params *string `json:"params"` - DBPath *string `json:"dbpath"` -} - -var appFlags CommandLineFlags - -var db *sql.DB - -func check(args ...interface{}) { - err := args[len(args)-1] - if err != nil { - panic(err) - } -} - -func quack(query string, stdin bool, format string, params string, hashdb string) (string, error) { - var err error - alias := *appFlags.Alias - motherduck, md := os.LookupEnv("motherduck_token") - - if (len(hashdb) > 0) { - params = hashdb + "?" + params - } - - db, err = sql.Open("duckdb", params) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - if !stdin { - check(db.ExecContext(context.Background(),"LOAD httpfs; LOAD json; LOAD parquet;")) - check(db.ExecContext(context.Background(),"SET autoinstall_known_extensions=1;")) - check(db.ExecContext(context.Background(),"SET autoload_known_extensions=1;")) - } - - if (alias) { - check(db.ExecContext(context.Background(), "LOAD chsql;")) - } - - if (md) && (motherduck != "") { - check(db.ExecContext(context.Background(), "LOAD motherduck; ATTACH 'md:';")) - } - - startTime := time.Now() - rows, err := db.QueryContext(context.Background(), query) - if err != nil { - return "", err - } - elapsedTime := time.Since(startTime) - - switch format { - case "JSONCompact", "JSON": - return rowsToJSON(rows, elapsedTime) - case "CSVWithNames": - return rowsToCSV(rows, true) - case "TSVWithNames", "TabSeparatedWithNames": - return rowsToTSV(rows, true) - case "TSV", "TabSeparated": - return rowsToTSV(rows, false) - default: - return rowsToTSV(rows, false) - } -} - -// initFlags initializes the command line flags -func initFlags() { - appFlags.Host = flag.String("host", "0.0.0.0", "API host. Default 0.0.0.0") - appFlags.Port = flag.String("port", "8123", "API port. Default 8123") - appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") - appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") - appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") - appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") - appFlags.Alias = flag.Bool("alias", true, "Built-in CH Aliases. Default true") - flag.Parse() -} - -// extractAndRemoveFormat extracts the FORMAT clause from the query and returns the query without the FORMAT clause -func extractAndRemoveFormat(input string) (string, string) { - re := regexp.MustCompile(`(?i)\bFORMAT\s+(\w+)\b`) - match := re.FindStringSubmatch(input) - if len(match) != 2 { - return input, "" - } - format := match[1] - return re.ReplaceAllString(input, ""), format -} - -// Metadata is the metadata for a column -type Metadata struct { - Name string `json:"name"` - Type string `json:"type"` -} - -// Statistics is the statistics for a query -type Statistics struct { - Elapsed float64 `json:"elapsed"` - RowsRead int `json:"rows_read"` - BytesRead int `json:"bytes_read"` -} - -// OutputJSON is the JSON output for a query -type OutputJSON struct { - Meta []Metadata `json:"meta"` - Data [][]interface{} `json:"data"` - Rows int `json:"rows"` - RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"` - Statistics Statistics `json:"statistics"` -} - -// rowsToJSON converts the rows to JSON string -func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) { - defer rows.Close() - - // Get column names - columns, err := rows.Columns() - if err != nil { - return "", err - } - - // Create a slice to store maps of column names and their corresponding values - var results OutputJSON - results.Meta = make([]Metadata, len(columns)) - results.Data = make([][]interface{}, 0) - - for i, column := range columns { - results.Meta[i].Name = column - } - - for rows.Next() { - // Create a slice to hold pointers to the values of the columns - values := make([]interface{}, len(columns)) - for i := range columns { - values[i] = new(interface{}) - } - - // Scan the values from the row into the pointers - err := rows.Scan(values...) - if err != nil { - return "", err - } - - // Create a slice to hold the row data - rowData := make([]interface{}, len(columns)) - for i, value := range values { - // Convert the value to the appropriate Go type - switch v := (*(value.(*interface{}))).(type) { - case []byte: - rowData[i] = string(v) - default: - rowData[i] = v - } - } - results.Data = append(results.Data, rowData) - } - - err = rows.Err() - if err != nil { - return "", err - } - - results.Rows = len(results.Data) - results.RowsBeforeLimitAtLeast = len(results.Data) - - // Populate the statistics object with number of rows, bytes, and elapsed time - results.Statistics.Elapsed = elapsedTime.Seconds() - results.Statistics.RowsRead = results.Rows - // Note: bytes_read is an approximation, it's just the number of rows * number of columns - // results.Statistics.BytesRead = results.Rows * len(columns) * 8 // Assuming each value takes 8 bytes - jsonData, err := json.Marshal(results) - if err != nil { - return "", err - } - - return string(jsonData), nil -} - -// rowsToTSV converts the rows to TSV string -func rowsToTSV(rows *sql.Rows, cols bool) (string, error) { - var result []string - columns, err := rows.Columns() - if err != nil { - return "", err - } - - if cols { - // Append column names as the first row - result = append(result, strings.Join(columns, "\t")) - } - - // Fetch rows and append their values as tab-delimited lines - values := make([]interface{}, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] - } - for rows.Next() { - err := rows.Scan(scanArgs...) - if err != nil { - return "", err - } - - var lineParts []string - for _, v := range values { - lineParts = append(lineParts, fmt.Sprintf("%v", v)) - } - result = append(result, strings.Join(lineParts, "\t")) - } - - if err := rows.Err(); err != nil { - return "", err - } - - return strings.Join(result, "\n"), nil -} - -// rowsToCSV converts the rows to CSV string -func rowsToCSV(rows *sql.Rows, cols bool) (string, error) { - var result []string - columns, err := rows.Columns() - if err != nil { - return "", err - } - - if cols { - // Append column names as the first row - result = append(result, strings.Join(columns, ",")) - } - - // Fetch rows and append their values as CSV rows - values := make([]interface{}, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] - } - for rows.Next() { - err := rows.Scan(scanArgs...) - if err != nil { - return "", err - } - - var lineParts []string - for _, v := range values { - lineParts = append(lineParts, fmt.Sprintf("%v", v)) - } - result = append(result, strings.Join(lineParts, ",")) - } - - if err := rows.Err(); err != nil { - return "", err - } - - return strings.Join(result, "\n"), nil -} - -func main() { - initFlags() - default_format := *appFlags.Format - default_params := *appFlags.Params - default_path := *appFlags.DBPath - - if *appFlags.Stdin { - scanner := bufio.NewScanner((os.Stdin)) - query := "" - for scanner.Scan() { - query = query + "\n" + scanner.Text() - } - if err := scanner.Err(); err != nil { - fmt.Fprintln(os.Stderr, "reading standard input:", err) - } - cleanquery, format := extractAndRemoveFormat(query) - if len(format) > 0 { - query = cleanquery - default_format = format - } - result, err := quack(query, true, default_format, default_params, "") - if err != nil { - fmt.Println(err) - os.Exit(1) - } else { - fmt.Println(result) - } - } else { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - var bodyBytes []byte - var query string - var err error - - // handle query parameter - if r.URL.Query().Get("query") != "" { - // query = r.FormValue("query") - query = r.URL.Query().Get("query") - } else if r.Body != nil { - bodyBytes, err = ioutil.ReadAll(r.Body) - if err != nil { - fmt.Printf("Body reading error: %v", err) - return - } - defer r.Body.Close() - query = string(bodyBytes) - } - - switch r.Header.Get("Accept") { - case "application/json": - w.Header().Set("Content-Type", "application/json; charset=utf-8") - case "application/xml": - w.Header().Set("Content-Type", "application/xml; charset=utf-8") - case "text/css": - w.Header().Set("Content-Type", "text/css; charset=utf-8") - default: - w.Header().Set("Content-Type", "text/html; charset=utf-8") - } - - // format handling - if r.URL.Query().Get("default_format") != "" { - default_format = r.URL.Query().Get("default_format") - } - // param handling - if r.URL.Query().Get("default_params") != "" { - default_params = r.URL.Query().Get("default_params") - } - // auth to hash based temp file storage - username, password, ok := r.BasicAuth() - hashdb := "" - if ok && len(password) > 0 { - hash := sha256.Sum256([]byte(username + password)) - hashdb = fmt.Sprintf("%s/%x.db", default_path, hash) - } - - // extract FORMAT from query and override the current `default_format` - cleanquery, format := extractAndRemoveFormat(query) - if len(format) > 0 { - query = cleanquery - default_format = format - } - - if len(query) == 0 { - _, _ = w.Write([]byte(staticPlay)) - } else { - result, err := quack(query, false, default_format, default_params, hashdb) - if err != nil { - _, _ = w.Write([]byte(err.Error())) - } else { - _, _ = w.Write([]byte(result)) - } - } - }) - - fmt.Printf("QuackPipe API Running: %s:%s\n", *appFlags.Host, *appFlags.Port) - if err := http.ListenAndServe(*appFlags.Host+":"+*appFlags.Port, nil); err != nil { - panic(err) - } - } -} +// +//import ( +// "bufio" +// "context" +// "crypto/sha256" +// "database/sql" +// _ "embed" +// "encoding/json" +// "fmt" +// "io/ioutil" +// "log" +// "net/http" +// "os" +// "regexp" +// "strings" +// "time" +// +// _ "github.com/marcboeker/go-duckdb" // load duckdb driver +//) +// +////go:embed route/root/play.html +//var staticPlay string +// +//// params for Flags +//type CommandLineFlags struct { +// Host *string `json:"host"` +// Port *string `json:"port"` +// Stdin *bool `json:"stdin"` +// Alias *bool `json:"alias"` +// Format *string `json:"format"` +// Params *string `json:"params"` +// DBPath *string `json:"dbpath"` +//} +// +//var appFlags CommandLineFlags +// +//var db *sql.DB +// +//func check(args ...interface{}) { +// err := args[len(args)-1] +// if err != nil { +// panic(err) +// } +//} +// +//func quack(query string, stdin bool, format string, params string, hashdb string) (string, error) { +// var err error +// alias := *appFlags.Alias +// motherduck, md := os.LookupEnv("motherduck_token") +// +// if len(hashdb) > 0 { +// params = hashdb + "?" + params +// } +// +// db, err = sql.Open("duckdb", params) +// if err != nil { +// log.Fatal(err) +// } +// defer db.Close() +// +// if !stdin { +// check(db.ExecContext(context.Background(), "LOAD httpfs; LOAD json; LOAD parquet;")) +// check(db.ExecContext(context.Background(), "SET autoinstall_known_extensions=1;")) +// check(db.ExecContext(context.Background(), "SET autoload_known_extensions=1;")) +// } +// +// if alias { +// check(db.ExecContext(context.Background(), "LOAD chsql;")) +// } +// +// if (md) && (motherduck != "") { +// check(db.ExecContext(context.Background(), "LOAD motherduck; ATTACH 'md:';")) +// } +// +// startTime := time.Now() +// rows, err := db.QueryContext(context.Background(), query) +// if err != nil { +// return "", err +// } +// elapsedTime := time.Since(startTime) +// +// switch format { +// case "JSONCompact", "JSON": +// return rowsToJSON(rows, elapsedTime) +// case "CSVWithNames": +// return rowsToCSV(rows, true) +// case "TSVWithNames", "TabSeparatedWithNames": +// return rowsToTSV(rows, true) +// case "TSV", "TabSeparated": +// return rowsToTSV(rows, false) +// default: +// return rowsToTSV(rows, false) +// } +//} +// +//////initFlags initializes the command line flags +////func initFlags() { +//// appFlags.Host = flag.String("host", "0.0.0.0", "API host. Default 0.0.0.0") +//// appFlags.Port = flag.String("port", "8123", "API port. Default 8123") +//// appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") +//// appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") +//// appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") +//// appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") +//// appFlags.Alias = flag.Bool("alias", true, "Built-in CH Aliases. Default true") +//// flag.Parse() +////} +// +//// extractAndRemoveFormat extracts the FORMAT clause from the query and returns the query without the FORMAT clause +//func extractAndRemoveFormat(input string) (string, string) { +// re := regexp.MustCompile(`(?i)\bFORMAT\s+(\w+)\b`) +// match := re.FindStringSubmatch(input) +// if len(match) != 2 { +// return input, "" +// } +// format := match[1] +// return re.ReplaceAllString(input, ""), format +//} +// +//// Metadata is the metadata for a column +//type Metadata struct { +// Name string `json:"name"` +// Type string `json:"type"` +//} +// +//// Statistics is the statistics for a query +//type Statistics struct { +// Elapsed float64 `json:"elapsed"` +// RowsRead int `json:"rows_read"` +// BytesRead int `json:"bytes_read"` +//} +// +//// OutputJSON is the JSON output for a query +//type OutputJSON struct { +// Meta []Metadata `json:"meta"` +// Data [][]interface{} `json:"data"` +// Rows int `json:"rows"` +// RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"` +// Statistics Statistics `json:"statistics"` +//} +// +//// rowsToJSON converts the rows to JSON string +//func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) { +// defer rows.Close() +// +// // Get column names +// columns, err := rows.Columns() +// if err != nil { +// return "", err +// } +// +// // Create a slice to store maps of column names and their corresponding values +// var results OutputJSON +// results.Meta = make([]Metadata, len(columns)) +// results.Data = make([][]interface{}, 0) +// +// for i, column := range columns { +// results.Meta[i].Name = column +// } +// +// for rows.Next() { +// // Create a slice to hold pointers to the values of the columns +// values := make([]interface{}, len(columns)) +// for i := range columns { +// values[i] = new(interface{}) +// } +// +// // Scan the values from the row into the pointers +// err := rows.Scan(values...) +// if err != nil { +// return "", err +// } +// +// // Create a slice to hold the row data +// rowData := make([]interface{}, len(columns)) +// for i, value := range values { +// // Convert the value to the appropriate Go type +// switch v := (*(value.(*interface{}))).(type) { +// case []byte: +// rowData[i] = string(v) +// default: +// rowData[i] = v +// } +// } +// results.Data = append(results.Data, rowData) +// } +// +// err = rows.Err() +// if err != nil { +// return "", err +// } +// +// results.Rows = len(results.Data) +// results.RowsBeforeLimitAtLeast = len(results.Data) +// +// // Populate the statistics object with number of rows, bytes, and elapsed time +// results.Statistics.Elapsed = elapsedTime.Seconds() +// results.Statistics.RowsRead = results.Rows +// // Note: bytes_read is an approximation, it's just the number of rows * number of columns +// // results.Statistics.BytesRead = results.Rows * len(columns) * 8 // Assuming each value takes 8 bytes +// jsonData, err := json.Marshal(results) +// if err != nil { +// return "", err +// } +// +// return string(jsonData), nil +//} +// +//// rowsToTSV converts the rows to TSV string +//func rowsToTSV(rows *sql.Rows, cols bool) (string, error) { +// var result []string +// columns, err := rows.Columns() +// if err != nil { +// return "", err +// } +// +// if cols { +// // Append column names as the first row +// result = append(result, strings.Join(columns, "\t")) +// } +// +// // Fetch rows and append their values as tab-delimited lines +// values := make([]interface{}, len(columns)) +// scanArgs := make([]interface{}, len(columns)) +// for i := range values { +// scanArgs[i] = &values[i] +// } +// for rows.Next() { +// err := rows.Scan(scanArgs...) +// if err != nil { +// return "", err +// } +// +// var lineParts []string +// for _, v := range values { +// lineParts = append(lineParts, fmt.Sprintf("%v", v)) +// } +// result = append(result, strings.Join(lineParts, "\t")) +// } +// +// if err := rows.Err(); err != nil { +// return "", err +// } +// +// return strings.Join(result, "\n"), nil +//} +// +//// rowsToCSV converts the rows to CSV string +//func rowsToCSV(rows *sql.Rows, cols bool) (string, error) { +// var result []string +// columns, err := rows.Columns() +// if err != nil { +// return "", err +// } +// +// if cols { +// // Append column names as the first row +// result = append(result, strings.Join(columns, ",")) +// } +// +// // Fetch rows and append their values as CSV rows +// values := make([]interface{}, len(columns)) +// scanArgs := make([]interface{}, len(columns)) +// for i := range values { +// scanArgs[i] = &values[i] +// } +// for rows.Next() { +// err := rows.Scan(scanArgs...) +// if err != nil { +// return "", err +// } +// +// var lineParts []string +// for _, v := range values { +// lineParts = append(lineParts, fmt.Sprintf("%v", v)) +// } +// result = append(result, strings.Join(lineParts, ",")) +// } +// +// if err := rows.Err(); err != nil { +// return "", err +// } +// +// return strings.Join(result, "\n"), nil +//} +// +//func main() { +// initFlags() +// default_format := *appFlags.Format +// default_params := *appFlags.Params +// default_path := *appFlags.DBPath +// +// if *appFlags.Stdin { +// scanner := bufio.NewScanner((os.Stdin)) +// query := "" +// for scanner.Scan() { +// query = query + "\n" + scanner.Text() +// } +// if err := scanner.Err(); err != nil { +// fmt.Fprintln(os.Stderr, "reading standard input:", err) +// } +// cleanquery, format := extractAndRemoveFormat(query) +// if len(format) > 0 { +// query = cleanquery +// default_format = format +// } +// result, err := quack(query, true, default_format, default_params, "") +// if err != nil { +// fmt.Println(err) +// os.Exit(1) +// } else { +// fmt.Println(result) +// } +// } else { +// http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { +// var bodyBytes []byte +// var query string +// var err error +// +// // handle query parameter +// if r.URL.Query().Get("query") != "" { +// // query = r.FormValue("query") +// query = r.URL.Query().Get("query") +// } else if r.Body != nil { +// bodyBytes, err = ioutil.ReadAll(r.Body) +// if err != nil { +// fmt.Printf("Body reading error: %v", err) +// return +// } +// defer r.Body.Close() +// query = string(bodyBytes) +// } +// +// switch r.Header.Get("Accept") { +// case "application/json": +// w.Header().Set("Content-Type", "application/json; charset=utf-8") +// case "application/xml": +// w.Header().Set("Content-Type", "application/xml; charset=utf-8") +// case "text/css": +// w.Header().Set("Content-Type", "text/css; charset=utf-8") +// default: +// w.Header().Set("Content-Type", "text/html; charset=utf-8") +// } +// +// // format handling +// if r.URL.Query().Get("default_format") != "" { +// default_format = r.URL.Query().Get("default_format") +// } +// // param handling +// if r.URL.Query().Get("default_params") != "" { +// default_params = r.URL.Query().Get("default_params") +// } +// // auth to hash based temp file storage +// username, password, ok := r.BasicAuth() +// hashdb := "" +// if ok && len(password) > 0 { +// hash := sha256.Sum256([]byte(username + password)) +// hashdb = fmt.Sprintf("%s/%x.db", default_path, hash) +// } +// +// // extract FORMAT from query and override the current `default_format` +// cleanquery, format := extractAndRemoveFormat(query) +// if len(format) > 0 { +// query = cleanquery +// default_format = format +// } +// +// if len(query) == 0 { +// _, _ = w.Write([]byte(staticPlay)) +// } else { +// result, err := quack(query, false, default_format, default_params, hashdb) +// if err != nil { +// _, _ = w.Write([]byte(err.Error())) +// } else { +// _, _ = w.Write([]byte(result)) +// } +// } +// }) +// +// fmt.Printf("QuackPipe API Running: %s:%s\n", *appFlags.Host, *appFlags.Port) +// if err := http.ListenAndServe(*appFlags.Host+":"+*appFlags.Port, nil); err != nil { +// panic(err) +// } +// } +//} diff --git a/play.html b/route/root/play.html similarity index 100% rename from play.html rename to route/root/play.html diff --git a/route/root/root_handler.go b/route/root/root_handler.go new file mode 100644 index 0000000..6546832 --- /dev/null +++ b/route/root/root_handler.go @@ -0,0 +1,76 @@ +package root + +import ( + _ "embed" + "fmt" + "github.com/gorilla/mux" + "io" + "net/http" + "quackpipe/controller/root" + "quackpipe/model" + "strings" +) + +//go:embed play.html +var staticPlay string + +type Handler struct { + FlagInformation *model.CommandLineFlags +} + +// RootHandler function for the root endpoint +func RootHandler(router *mux.Router, FlagInformation *model.CommandLineFlags) *Handler { + HandlerInfo := &Handler{FlagInformation: FlagInformation} + router.HandleFunc("/", HandlerInfo.Handlers).Methods("POST", "GET") + return HandlerInfo +} +func (u *Handler) Handlers(w http.ResponseWriter, r *http.Request) { + var bodyBytes []byte + var query string + var err error + defaultFormat := *u.FlagInformation.Format + defaultParams := *u.FlagInformation.Params + defaultPath := *u.FlagInformation.DBPath + // handle query parameter + if r.URL.Query().Get("query") != "" { + query = r.URL.Query().Get("query") + } else if r.Body != nil { + bodyBytes, err = io.ReadAll(r.Body) + if err != nil { + fmt.Printf("Body reading error: %v", err) + return + } + defer r.Body.Close() + query = string(bodyBytes) + } + + switch r.Header.Get("Accept") { + case "application/json": + w.Header().Set("Content-Type", "application/json; charset=utf-8") + case "application/xml": + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + case "text/css": + w.Header().Set("Content-Type", "text/css; charset=utf-8") + default: + w.Header().Set("Content-Type", "text/html; charset=utf-8") + } + // format handling + if r.URL.Query().Get("default_format") != "" { + defaultFormat = r.URL.Query().Get("default_format") + } + // param handling + if r.URL.Query().Get("default_params") != "" { + defaultParams = r.URL.Query().Get("default_params") + } + + result, err := root.QueryOperation(u.FlagInformation, query, r, defaultPath, defaultFormat, defaultParams) + if err != nil && strings.Contains(err.Error(), "query length is empty") { + _, _ = w.Write([]byte(staticPlay)) + } + if err != nil { + _, _ = w.Write([]byte(err.Error())) + } else { + _, _ = w.Write([]byte(result)) + } + +} diff --git a/route/route.go b/route/route.go new file mode 100644 index 0000000..4dc98de --- /dev/null +++ b/route/route.go @@ -0,0 +1,14 @@ +package route + +import ( + "github.com/gorilla/mux" + "quackpipe/model" + "quackpipe/route/root" +) + +func NewRouter(flagInformation *model.CommandLineFlags) *mux.Router { + router := mux.NewRouter() + // Register root module routes + root.RootHandler(router, flagInformation) + return router +} diff --git a/service/db/db.go b/service/db/db.go new file mode 100644 index 0000000..0fd0dac --- /dev/null +++ b/service/db/db.go @@ -0,0 +1,53 @@ +package db + +import ( + "context" + "database/sql" + "os" + "quackpipe/model" + "time" +) + +func Quack(appFlags model.CommandLineFlags, query string, stdin bool, params string, hashdb string) (*sql.Rows, time.Duration, error) { + var err error + alias := *appFlags.Alias + motherduck, md := os.LookupEnv("motherduck_token") + + if len(hashdb) > 0 { + params = hashdb + "?" + params + } + + db, err := sql.Open("duckdb", params) + if err != nil { + return nil, 0, err + } + defer db.Close() + + if !stdin { + check(db.ExecContext(context.Background(), "LOAD httpfs; LOAD json; LOAD parquet;")) + check(db.ExecContext(context.Background(), "SET autoinstall_known_extensions=1;")) + check(db.ExecContext(context.Background(), "SET autoload_known_extensions=1;")) + } + + if alias { + check(db.ExecContext(context.Background(), "LOAD chsql;")) + } + + if (md) && (motherduck != "") { + check(db.ExecContext(context.Background(), "LOAD motherduck; ATTACH 'md:';")) + } + startTime := time.Now() + rows, err := db.QueryContext(context.Background(), query) + if err != nil { + return nil, 0, err + } + elapsedTime := time.Since(startTime) + return rows, elapsedTime, nil +} + +func check(args ...interface{}) { + err := args[len(args)-1] + if err != nil { + panic(err) + } +} diff --git a/utils/helper.go b/utils/helper.go new file mode 100644 index 0000000..bb5f119 --- /dev/null +++ b/utils/helper.go @@ -0,0 +1,201 @@ +package utils + +import ( + "database/sql" + "encoding/json" + "fmt" + "quackpipe/model" + "regexp" + "strings" + "time" +) + +// ExtractAndRemoveFormat extracts the FORMAT clause from the query and returns the query without the FORMAT clause +func ExtractAndRemoveFormat(input string) (string, string) { + re := regexp.MustCompile(`(?i)\bFORMAT\s+(\w+)\b`) + match := re.FindStringSubmatch(input) + if len(match) != 2 { + return input, "" + } + format := match[1] + return re.ReplaceAllString(input, ""), format +} + +func ConversationOfRows(rows *sql.Rows, default_format string, duration time.Duration) (string, error) { + + switch default_format { + case "JSONCompact", "JSON": + result, err := rowsToJSON(rows, duration) + if err != nil { + return "", err + } + return result, nil + case "CSVWithNames": + result, err := rowsToCSV(rows, true) + if err != nil { + return "", err + } + return result, nil + case "TSVWithNames", "TabSeparatedWithNames": + + result, err := rowsToTSV(rows, true) + if err != nil { + return "", err + } + return result, nil + case "TSV", "TabSeparated": + result, err := rowsToTSV(rows, true) + if err != nil { + return "", err + } + return result, nil + + } + + return "", nil +} + +// rowsToJSON converts the rows to JSON string +func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) { + defer rows.Close() + + // Get column names + columns, err := rows.Columns() + if err != nil { + return "", err + } + + // Create a slice to store maps of column names and their corresponding values + var results model.OutputJSON + results.Meta = make([]model.Metadata, len(columns)) + results.Data = make([][]interface{}, 0) + + for i, column := range columns { + results.Meta[i].Name = column + } + + for rows.Next() { + // Create a slice to hold pointers to the values of the columns + values := make([]interface{}, len(columns)) + for i := range columns { + values[i] = new(interface{}) + } + + // Scan the values from the row into the pointers + err := rows.Scan(values...) + if err != nil { + return "", err + } + + // Create a slice to hold the row data + rowData := make([]interface{}, len(columns)) + for i, value := range values { + // Convert the value to the appropriate Go type + switch v := (*(value.(*interface{}))).(type) { + case []byte: + rowData[i] = string(v) + default: + rowData[i] = v + } + } + results.Data = append(results.Data, rowData) + } + + err = rows.Err() + if err != nil { + return "", err + } + + results.Rows = len(results.Data) + results.RowsBeforeLimitAtLeast = len(results.Data) + + // Populate the statistics object with number of rows, bytes, and elapsed time + results.Statistics.Elapsed = elapsedTime.Seconds() + results.Statistics.RowsRead = results.Rows + // Note: bytes_read is an approximation, it's just the number of rows * number of columns + // results.Statistics.BytesRead = results.Rows * len(columns) * 8 // Assuming each value takes 8 bytes + jsonData, err := json.Marshal(results) + if err != nil { + return "", err + } + + return string(jsonData), nil +} + +// rowsToTSV converts the rows to TSV string +func rowsToTSV(rows *sql.Rows, cols bool) (string, error) { + var result []string + columns, err := rows.Columns() + if err != nil { + return "", err + } + + if cols { + // Append column names as the first row + result = append(result, strings.Join(columns, "\t")) + } + + // Fetch rows and append their values as tab-delimited lines + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + err := rows.Scan(scanArgs...) + if err != nil { + return "", err + } + + var lineParts []string + for _, v := range values { + lineParts = append(lineParts, fmt.Sprintf("%v", v)) + } + result = append(result, strings.Join(lineParts, "\t")) + } + + if err := rows.Err(); err != nil { + return "", err + } + + return strings.Join(result, "\n"), nil +} + +// rowsToCSV converts the rows to CSV string +func rowsToCSV(rows *sql.Rows, cols bool) (string, error) { + var result []string + columns, err := rows.Columns() + if err != nil { + return "", err + } + + if cols { + // Append column names as the first row + result = append(result, strings.Join(columns, ",")) + } + + // Fetch rows and append their values as CSV rows + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + err := rows.Scan(scanArgs...) + if err != nil { + return "", err + } + + var lineParts []string + for _, v := range values { + lineParts = append(lineParts, fmt.Sprintf("%v", v)) + } + result = append(result, strings.Join(lineParts, ",")) + } + + if err := rows.Err(); err != nil { + return "", err + } + + return strings.Join(result, "\n"), nil +} diff --git a/utils/util.go b/utils/util.go new file mode 100644 index 0000000..5c0ffa9 --- /dev/null +++ b/utils/util.go @@ -0,0 +1,36 @@ +package utils + +import ( + "bufio" + "database/sql" + "os" + "quackpipe/model" + "quackpipe/service/db" + "time" +) + +func ReadFromScanner(appFlags model.CommandLineFlags) (*sql.Rows, time.Duration, string, error) { + defaultFormat := *appFlags.Format + defaultParams := *appFlags.Params + scanner := bufio.NewScanner((os.Stdin)) + query := "" + for scanner.Scan() { + query = query + "\n" + scanner.Text() + } + if err := scanner.Err(); err != nil { + return nil, 0, "", err + } + + cleanQuery, format := ExtractAndRemoveFormat(query) + if len(format) > 0 { + query = cleanQuery + defaultFormat = format + } + result, duration, err := db.Quack(appFlags, query, true, defaultParams, "") + if err != nil { + return nil, 0, "", err + } + + return result, duration, defaultFormat, nil + +}