diff --git a/.github/workflows/gobuild.yml b/.github/workflows/gobuild.yml index a4fccf6..a0be613 100644 --- a/.github/workflows/gobuild.yml +++ b/.github/workflows/gobuild.yml @@ -9,10 +9,11 @@ jobs: steps: - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: - go-version: 1.20 - + go-version: '1.20' + check-latest: true + - run: go version - name: Build run: | go mod tidy diff --git a/Dockerfile b/Dockerfile index e36aade..53ea7f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM golang:1.20 AS builder WORKDIR / COPY . . -RUN CGO_ENABLED=1 go build -o quackpipe quackpipe.go +RUN CGO_ENABLED=1 go build -o quackpipe . RUN strip quackpipe RUN apt update && apt install -y libgrpc-dev diff --git a/controller/root/root.go b/controller/root/root.go new file mode 100644 index 0000000..6656837 --- /dev/null +++ b/controller/root/root.go @@ -0,0 +1,31 @@ +package root + +import ( + "crypto/sha256" + "fmt" + "net/http" + "quackpipe/model" + "quackpipe/service/db" + "quackpipe/utils" +) + +func QueryOperation(flagInformation *model.CommandLineFlags, query string, r *http.Request, defaultPath string, defaultFormat string, defaultParams string) (string, error) { + // auth to hash based temp file storage + username, password, ok := r.BasicAuth() + hashdb := "" + if ok && len(password) > 0 { + hash := sha256.Sum256([]byte(username + password)) + hashdb = fmt.Sprintf("%s/%x.db", defaultPath, hash) + } + rows, duration, err := db.Quack(*flagInformation, query, false, defaultParams, hashdb) + if err != nil { + return "", err + } else { + result, err := utils.ConversationOfRows(rows, defaultFormat, duration) + if err != nil { + return "", err + } + return result, nil + } + +} diff --git a/go.mod b/go.mod index daa6d60..38fe3be 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module quackpipe go 1.20 -require github.com/marcboeker/go-duckdb v1.7.0 +require ( + github.com/gorilla/mux v1.8.1 + github.com/marcboeker/go-duckdb v1.7.0 +) require ( github.com/apache/arrow/go/v14 v14.0.2 // indirect diff --git a/go.sum b/go.sum index 3a051f3..3b1fb07 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,15 @@ github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= @@ -17,14 +21,19 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -33,4 +42,6 @@ golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler/api_handler.go b/handler/api_handler.go new file mode 100644 index 0000000..9e614b2 --- /dev/null +++ b/handler/api_handler.go @@ -0,0 +1,77 @@ +package handlers + +import ( + _ "embed" + "fmt" + "io" + "net/http" + "quackpipe/controller/root" + "quackpipe/model" + "quackpipe/utils" +) + +//go:embed play.html +var staticPlay string + +type Handler struct { + FlagInformation *model.CommandLineFlags +} + +func (u *Handler) Handlers(w http.ResponseWriter, r *http.Request) { + var bodyBytes []byte + var query string + var err error + defaultFormat := *u.FlagInformation.Format + defaultParams := *u.FlagInformation.Params + defaultPath := *u.FlagInformation.DBPath + // handle query parameter + if r.URL.Query().Get("query") != "" { + query = r.URL.Query().Get("query") + } else if r.Body != nil { + bodyBytes, err = io.ReadAll(r.Body) + if err != nil { + fmt.Printf("Body reading error: %v", err) + return + } + defer r.Body.Close() + query = string(bodyBytes) + } + + switch r.Header.Get("Accept") { + case "application/json": + w.Header().Set("Content-Type", "application/json; charset=utf-8") + case "application/xml": + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + case "text/css": + w.Header().Set("Content-Type", "text/css; charset=utf-8") + default: + w.Header().Set("Content-Type", "text/html; charset=utf-8") + } + // format handling + if r.URL.Query().Get("default_format") != "" { + defaultFormat = r.URL.Query().Get("default_format") + } + // param handling + if r.URL.Query().Get("default_params") != "" { + defaultParams = r.URL.Query().Get("default_params") + } + + // extract FORMAT from query and override the current `default_format` + cleanQuery, format := utils.ExtractAndRemoveFormat(query) + if len(format) > 0 { + query = cleanQuery + defaultFormat = format + } + if len(query) == 0 { + _, _ = w.Write([]byte(staticPlay)) + + } else { + result, err := root.QueryOperation(u.FlagInformation, query, r, defaultPath, defaultFormat, defaultParams) + if err != nil { + _, _ = w.Write([]byte(err.Error())) + } else { + _, _ = w.Write([]byte(result)) + } + } + +} diff --git a/play.html b/handler/play.html similarity index 100% rename from play.html rename to handler/play.html diff --git a/main.go b/main.go new file mode 100644 index 0000000..9356efd --- /dev/null +++ b/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "quackpipe/model" + "quackpipe/router" + "quackpipe/utils" +) + +// initFlags initializes the command line flags +func initFlags() *model.CommandLineFlags { + + appFlags := &model.CommandLineFlags{} + appFlags.Host = flag.String("host", "0.0.0.0", "API host. Default 0.0.0.0") + appFlags.Port = flag.String("port", "8123", "API port. Default 8123") + appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") + appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") + appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") + appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") + appFlags.Alias = flag.Bool("alias", true, "Built-in CH Aliases. Default true") + flag.Parse() + + return appFlags +} + +var appFlags *model.CommandLineFlags + +func main() { + appFlags = initFlags() + if *appFlags.Stdin { + rows, duration, format, err := utils.ReadFromScanner(*appFlags) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + results, err := utils.ConversationOfRows(rows, format, duration) + if err != nil { + fmt.Println(err) + os.Exit(1) + } else { + fmt.Println(results) + } + + } else { + r := router.NewRouter(appFlags) + fmt.Printf("QuackPipe API Running: %s:%s\n", *appFlags.Host, *appFlags.Port) + if err := http.ListenAndServe(*appFlags.Host+":"+*appFlags.Port, r); err != nil { + panic(err) + } + + } + +} diff --git a/model/flags.go b/model/flags.go new file mode 100644 index 0000000..c9f4adf --- /dev/null +++ b/model/flags.go @@ -0,0 +1,12 @@ +package model + +// params for Flags +type CommandLineFlags struct { + Host *string `json:"host"` + Port *string `json:"port"` + Stdin *bool `json:"stdin"` + Alias *bool `json:"alias"` + Format *string `json:"format"` + Params *string `json:"params"` + DBPath *string `json:"dbpath"` +} diff --git a/model/internal.go b/model/internal.go new file mode 100644 index 0000000..1d0dd1a --- /dev/null +++ b/model/internal.go @@ -0,0 +1,23 @@ +package model + +// Metadata is the metadata for a column +type Metadata struct { + Name string `json:"name"` + Type string `json:"type"` +} + +// Statistics is the statistics for a query +type Statistics struct { + Elapsed float64 `json:"elapsed"` + RowsRead int `json:"rows_read"` + BytesRead int `json:"bytes_read"` +} + +// OutputJSON is the JSON output for a query +type OutputJSON struct { + Meta []Metadata `json:"meta"` + Data [][]interface{} `json:"data"` + Rows int `json:"rows"` + RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"` + Statistics Statistics `json:"statistics"` +} diff --git a/quackpipe.go b/quackpipe.go deleted file mode 100644 index b2c1a4f..0000000 --- a/quackpipe.go +++ /dev/null @@ -1,386 +0,0 @@ -package main - -import ( - "context" - "bufio" - "database/sql" - _ "embed" - "encoding/json" - "flag" - "fmt" - "io/ioutil" - "log" - "net/http" - "os" - "regexp" - "strings" - "time" - "crypto/sha256" - - _ "github.com/marcboeker/go-duckdb" // load duckdb driver -) - -//go:embed play.html -var staticPlay string - -// params for Flags -type CommandLineFlags struct { - Host *string `json:"host"` - Port *string `json:"port"` - Stdin *bool `json:"stdin"` - Alias *bool `json:"alias"` - Format *string `json:"format"` - Params *string `json:"params"` - DBPath *string `json:"dbpath"` -} - -var appFlags CommandLineFlags - -var db *sql.DB - -func check(args ...interface{}) { - err := args[len(args)-1] - if err != nil { - panic(err) - } -} - -func quack(query string, stdin bool, format string, params string, hashdb string) (string, error) { - var err error - alias := *appFlags.Alias - motherduck, md := os.LookupEnv("motherduck_token") - - if (len(hashdb) > 0) { - params = hashdb + "?" + params - } - - db, err = sql.Open("duckdb", params) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - if !stdin { - check(db.ExecContext(context.Background(),"LOAD httpfs; LOAD json; LOAD parquet;")) - check(db.ExecContext(context.Background(),"SET autoinstall_known_extensions=1;")) - check(db.ExecContext(context.Background(),"SET autoload_known_extensions=1;")) - } - - if (alias) { - check(db.ExecContext(context.Background(), "LOAD chsql;")) - } - - if (md) && (motherduck != "") { - check(db.ExecContext(context.Background(), "LOAD motherduck; ATTACH 'md:';")) - } - - startTime := time.Now() - rows, err := db.QueryContext(context.Background(), query) - if err != nil { - return "", err - } - elapsedTime := time.Since(startTime) - - switch format { - case "JSONCompact", "JSON": - return rowsToJSON(rows, elapsedTime) - case "CSVWithNames": - return rowsToCSV(rows, true) - case "TSVWithNames", "TabSeparatedWithNames": - return rowsToTSV(rows, true) - case "TSV", "TabSeparated": - return rowsToTSV(rows, false) - default: - return rowsToTSV(rows, false) - } -} - -// initFlags initializes the command line flags -func initFlags() { - appFlags.Host = flag.String("host", "0.0.0.0", "API host. Default 0.0.0.0") - appFlags.Port = flag.String("port", "8123", "API port. Default 8123") - appFlags.Format = flag.String("format", "JSONCompact", "API port. Default JSONCompact") - appFlags.Params = flag.String("params", "", "DuckDB optional parameters. Default to none.") - appFlags.DBPath = flag.String("dbpath", "/tmp/", "DuckDB DB storage path. Default to /tmp/") - appFlags.Stdin = flag.Bool("stdin", false, "STDIN query. Default false") - appFlags.Alias = flag.Bool("alias", true, "Built-in CH Aliases. Default true") - flag.Parse() -} - -// extractAndRemoveFormat extracts the FORMAT clause from the query and returns the query without the FORMAT clause -func extractAndRemoveFormat(input string) (string, string) { - re := regexp.MustCompile(`(?i)\bFORMAT\s+(\w+)\b`) - match := re.FindStringSubmatch(input) - if len(match) != 2 { - return input, "" - } - format := match[1] - return re.ReplaceAllString(input, ""), format -} - -// Metadata is the metadata for a column -type Metadata struct { - Name string `json:"name"` - Type string `json:"type"` -} - -// Statistics is the statistics for a query -type Statistics struct { - Elapsed float64 `json:"elapsed"` - RowsRead int `json:"rows_read"` - BytesRead int `json:"bytes_read"` -} - -// OutputJSON is the JSON output for a query -type OutputJSON struct { - Meta []Metadata `json:"meta"` - Data [][]interface{} `json:"data"` - Rows int `json:"rows"` - RowsBeforeLimitAtLeast int `json:"rows_before_limit_at_least"` - Statistics Statistics `json:"statistics"` -} - -// rowsToJSON converts the rows to JSON string -func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) { - defer rows.Close() - - // Get column names - columns, err := rows.Columns() - if err != nil { - return "", err - } - - // Create a slice to store maps of column names and their corresponding values - var results OutputJSON - results.Meta = make([]Metadata, len(columns)) - results.Data = make([][]interface{}, 0) - - for i, column := range columns { - results.Meta[i].Name = column - } - - for rows.Next() { - // Create a slice to hold pointers to the values of the columns - values := make([]interface{}, len(columns)) - for i := range columns { - values[i] = new(interface{}) - } - - // Scan the values from the row into the pointers - err := rows.Scan(values...) - if err != nil { - return "", err - } - - // Create a slice to hold the row data - rowData := make([]interface{}, len(columns)) - for i, value := range values { - // Convert the value to the appropriate Go type - switch v := (*(value.(*interface{}))).(type) { - case []byte: - rowData[i] = string(v) - default: - rowData[i] = v - } - } - results.Data = append(results.Data, rowData) - } - - err = rows.Err() - if err != nil { - return "", err - } - - results.Rows = len(results.Data) - results.RowsBeforeLimitAtLeast = len(results.Data) - - // Populate the statistics object with number of rows, bytes, and elapsed time - results.Statistics.Elapsed = elapsedTime.Seconds() - results.Statistics.RowsRead = results.Rows - // Note: bytes_read is an approximation, it's just the number of rows * number of columns - // results.Statistics.BytesRead = results.Rows * len(columns) * 8 // Assuming each value takes 8 bytes - jsonData, err := json.Marshal(results) - if err != nil { - return "", err - } - - return string(jsonData), nil -} - -// rowsToTSV converts the rows to TSV string -func rowsToTSV(rows *sql.Rows, cols bool) (string, error) { - var result []string - columns, err := rows.Columns() - if err != nil { - return "", err - } - - if cols { - // Append column names as the first row - result = append(result, strings.Join(columns, "\t")) - } - - // Fetch rows and append their values as tab-delimited lines - values := make([]interface{}, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] - } - for rows.Next() { - err := rows.Scan(scanArgs...) - if err != nil { - return "", err - } - - var lineParts []string - for _, v := range values { - lineParts = append(lineParts, fmt.Sprintf("%v", v)) - } - result = append(result, strings.Join(lineParts, "\t")) - } - - if err := rows.Err(); err != nil { - return "", err - } - - return strings.Join(result, "\n"), nil -} - -// rowsToCSV converts the rows to CSV string -func rowsToCSV(rows *sql.Rows, cols bool) (string, error) { - var result []string - columns, err := rows.Columns() - if err != nil { - return "", err - } - - if cols { - // Append column names as the first row - result = append(result, strings.Join(columns, ",")) - } - - // Fetch rows and append their values as CSV rows - values := make([]interface{}, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] - } - for rows.Next() { - err := rows.Scan(scanArgs...) - if err != nil { - return "", err - } - - var lineParts []string - for _, v := range values { - lineParts = append(lineParts, fmt.Sprintf("%v", v)) - } - result = append(result, strings.Join(lineParts, ",")) - } - - if err := rows.Err(); err != nil { - return "", err - } - - return strings.Join(result, "\n"), nil -} - -func main() { - initFlags() - default_format := *appFlags.Format - default_params := *appFlags.Params - default_path := *appFlags.DBPath - - if *appFlags.Stdin { - scanner := bufio.NewScanner((os.Stdin)) - query := "" - for scanner.Scan() { - query = query + "\n" + scanner.Text() - } - if err := scanner.Err(); err != nil { - fmt.Fprintln(os.Stderr, "reading standard input:", err) - } - cleanquery, format := extractAndRemoveFormat(query) - if len(format) > 0 { - query = cleanquery - default_format = format - } - result, err := quack(query, true, default_format, default_params, "") - if err != nil { - fmt.Println(err) - os.Exit(1) - } else { - fmt.Println(result) - } - } else { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - var bodyBytes []byte - var query string - var err error - - // handle query parameter - if r.URL.Query().Get("query") != "" { - // query = r.FormValue("query") - query = r.URL.Query().Get("query") - } else if r.Body != nil { - bodyBytes, err = ioutil.ReadAll(r.Body) - if err != nil { - fmt.Printf("Body reading error: %v", err) - return - } - defer r.Body.Close() - query = string(bodyBytes) - } - - switch r.Header.Get("Accept") { - case "application/json": - w.Header().Set("Content-Type", "application/json; charset=utf-8") - case "application/xml": - w.Header().Set("Content-Type", "application/xml; charset=utf-8") - case "text/css": - w.Header().Set("Content-Type", "text/css; charset=utf-8") - default: - w.Header().Set("Content-Type", "text/html; charset=utf-8") - } - - // format handling - if r.URL.Query().Get("default_format") != "" { - default_format = r.URL.Query().Get("default_format") - } - // param handling - if r.URL.Query().Get("default_params") != "" { - default_params = r.URL.Query().Get("default_params") - } - // auth to hash based temp file storage - username, password, ok := r.BasicAuth() - hashdb := "" - if ok && len(password) > 0 { - hash := sha256.Sum256([]byte(username + password)) - hashdb = fmt.Sprintf("%s/%x.db", default_path, hash) - } - - // extract FORMAT from query and override the current `default_format` - cleanquery, format := extractAndRemoveFormat(query) - if len(format) > 0 { - query = cleanquery - default_format = format - } - - if len(query) == 0 { - _, _ = w.Write([]byte(staticPlay)) - } else { - result, err := quack(query, false, default_format, default_params, hashdb) - if err != nil { - _, _ = w.Write([]byte(err.Error())) - } else { - _, _ = w.Write([]byte(result)) - } - } - }) - - fmt.Printf("QuackPipe API Running: %s:%s\n", *appFlags.Host, *appFlags.Port) - if err := http.ListenAndServe(*appFlags.Host+":"+*appFlags.Port, nil); err != nil { - panic(err) - } - } -} diff --git a/router/apiRouter.go b/router/apiRouter.go new file mode 100644 index 0000000..6062f70 --- /dev/null +++ b/router/apiRouter.go @@ -0,0 +1,14 @@ +package router + +import ( + "github.com/gorilla/mux" + handlers "quackpipe/handler" + "quackpipe/model" +) + +// APIHandler function for the root endpoint +func APIHandler(router *mux.Router, FlagInformation *model.CommandLineFlags) handlers.Handler { + HandlerInfo := handlers.Handler{FlagInformation: FlagInformation} + router.HandleFunc("/", HandlerInfo.Handlers).Methods("POST", "GET") + return HandlerInfo +} diff --git a/router/route.go b/router/route.go new file mode 100644 index 0000000..94e88cf --- /dev/null +++ b/router/route.go @@ -0,0 +1,13 @@ +package router + +import ( + "github.com/gorilla/mux" + "quackpipe/model" +) + +func NewRouter(flagInformation *model.CommandLineFlags) *mux.Router { + router := mux.NewRouter() + // Register module routes + APIHandler(router, flagInformation) + return router +} diff --git a/service/db/db.go b/service/db/db.go new file mode 100644 index 0000000..c497b29 --- /dev/null +++ b/service/db/db.go @@ -0,0 +1,54 @@ +package db + +import ( + "context" + "database/sql" + _ "github.com/marcboeker/go-duckdb" // load duckdb driver + "os" + "quackpipe/model" + "time" +) + +func Quack(appFlags model.CommandLineFlags, query string, stdin bool, params string, hashdb string) (*sql.Rows, time.Duration, error) { + var err error + alias := *appFlags.Alias + motherduck, md := os.LookupEnv("motherduck_token") + + if len(hashdb) > 0 { + params = hashdb + "?" + params + } + + db, err := sql.Open("duckdb", params) + if err != nil { + return nil, 0, err + } + defer db.Close() + + if !stdin { + check(db.ExecContext(context.Background(), "LOAD httpfs; LOAD json; LOAD parquet;")) + check(db.ExecContext(context.Background(), "SET autoinstall_known_extensions=1;")) + check(db.ExecContext(context.Background(), "SET autoload_known_extensions=1;")) + } + + if alias { + check(db.ExecContext(context.Background(), "LOAD chsql;")) + } + + if (md) && (motherduck != "") { + check(db.ExecContext(context.Background(), "LOAD motherduck; ATTACH 'md:';")) + } + startTime := time.Now() + rows, err := db.QueryContext(context.Background(), query) + if err != nil { + return nil, 0, err + } + elapsedTime := time.Since(startTime) + return rows, elapsedTime, nil +} + +func check(args ...interface{}) { + err := args[len(args)-1] + if err != nil { + panic(err) + } +} diff --git a/utils/helper.go b/utils/helper.go new file mode 100644 index 0000000..bb5f119 --- /dev/null +++ b/utils/helper.go @@ -0,0 +1,201 @@ +package utils + +import ( + "database/sql" + "encoding/json" + "fmt" + "quackpipe/model" + "regexp" + "strings" + "time" +) + +// ExtractAndRemoveFormat extracts the FORMAT clause from the query and returns the query without the FORMAT clause +func ExtractAndRemoveFormat(input string) (string, string) { + re := regexp.MustCompile(`(?i)\bFORMAT\s+(\w+)\b`) + match := re.FindStringSubmatch(input) + if len(match) != 2 { + return input, "" + } + format := match[1] + return re.ReplaceAllString(input, ""), format +} + +func ConversationOfRows(rows *sql.Rows, default_format string, duration time.Duration) (string, error) { + + switch default_format { + case "JSONCompact", "JSON": + result, err := rowsToJSON(rows, duration) + if err != nil { + return "", err + } + return result, nil + case "CSVWithNames": + result, err := rowsToCSV(rows, true) + if err != nil { + return "", err + } + return result, nil + case "TSVWithNames", "TabSeparatedWithNames": + + result, err := rowsToTSV(rows, true) + if err != nil { + return "", err + } + return result, nil + case "TSV", "TabSeparated": + result, err := rowsToTSV(rows, true) + if err != nil { + return "", err + } + return result, nil + + } + + return "", nil +} + +// rowsToJSON converts the rows to JSON string +func rowsToJSON(rows *sql.Rows, elapsedTime time.Duration) (string, error) { + defer rows.Close() + + // Get column names + columns, err := rows.Columns() + if err != nil { + return "", err + } + + // Create a slice to store maps of column names and their corresponding values + var results model.OutputJSON + results.Meta = make([]model.Metadata, len(columns)) + results.Data = make([][]interface{}, 0) + + for i, column := range columns { + results.Meta[i].Name = column + } + + for rows.Next() { + // Create a slice to hold pointers to the values of the columns + values := make([]interface{}, len(columns)) + for i := range columns { + values[i] = new(interface{}) + } + + // Scan the values from the row into the pointers + err := rows.Scan(values...) + if err != nil { + return "", err + } + + // Create a slice to hold the row data + rowData := make([]interface{}, len(columns)) + for i, value := range values { + // Convert the value to the appropriate Go type + switch v := (*(value.(*interface{}))).(type) { + case []byte: + rowData[i] = string(v) + default: + rowData[i] = v + } + } + results.Data = append(results.Data, rowData) + } + + err = rows.Err() + if err != nil { + return "", err + } + + results.Rows = len(results.Data) + results.RowsBeforeLimitAtLeast = len(results.Data) + + // Populate the statistics object with number of rows, bytes, and elapsed time + results.Statistics.Elapsed = elapsedTime.Seconds() + results.Statistics.RowsRead = results.Rows + // Note: bytes_read is an approximation, it's just the number of rows * number of columns + // results.Statistics.BytesRead = results.Rows * len(columns) * 8 // Assuming each value takes 8 bytes + jsonData, err := json.Marshal(results) + if err != nil { + return "", err + } + + return string(jsonData), nil +} + +// rowsToTSV converts the rows to TSV string +func rowsToTSV(rows *sql.Rows, cols bool) (string, error) { + var result []string + columns, err := rows.Columns() + if err != nil { + return "", err + } + + if cols { + // Append column names as the first row + result = append(result, strings.Join(columns, "\t")) + } + + // Fetch rows and append their values as tab-delimited lines + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + err := rows.Scan(scanArgs...) + if err != nil { + return "", err + } + + var lineParts []string + for _, v := range values { + lineParts = append(lineParts, fmt.Sprintf("%v", v)) + } + result = append(result, strings.Join(lineParts, "\t")) + } + + if err := rows.Err(); err != nil { + return "", err + } + + return strings.Join(result, "\n"), nil +} + +// rowsToCSV converts the rows to CSV string +func rowsToCSV(rows *sql.Rows, cols bool) (string, error) { + var result []string + columns, err := rows.Columns() + if err != nil { + return "", err + } + + if cols { + // Append column names as the first row + result = append(result, strings.Join(columns, ",")) + } + + // Fetch rows and append their values as CSV rows + values := make([]interface{}, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + err := rows.Scan(scanArgs...) + if err != nil { + return "", err + } + + var lineParts []string + for _, v := range values { + lineParts = append(lineParts, fmt.Sprintf("%v", v)) + } + result = append(result, strings.Join(lineParts, ",")) + } + + if err := rows.Err(); err != nil { + return "", err + } + + return strings.Join(result, "\n"), nil +} diff --git a/utils/util.go b/utils/util.go new file mode 100644 index 0000000..5c0ffa9 --- /dev/null +++ b/utils/util.go @@ -0,0 +1,36 @@ +package utils + +import ( + "bufio" + "database/sql" + "os" + "quackpipe/model" + "quackpipe/service/db" + "time" +) + +func ReadFromScanner(appFlags model.CommandLineFlags) (*sql.Rows, time.Duration, string, error) { + defaultFormat := *appFlags.Format + defaultParams := *appFlags.Params + scanner := bufio.NewScanner((os.Stdin)) + query := "" + for scanner.Scan() { + query = query + "\n" + scanner.Text() + } + if err := scanner.Err(); err != nil { + return nil, 0, "", err + } + + cleanQuery, format := ExtractAndRemoveFormat(query) + if len(format) > 0 { + query = cleanQuery + defaultFormat = format + } + result, duration, err := db.Quack(appFlags, query, true, defaultParams, "") + if err != nil { + return nil, 0, "", err + } + + return result, duration, defaultFormat, nil + +}