Skip to content

Commit

Permalink
Merge pull request #20 from metrico/dep_replace
Browse files Browse the repository at this point in the history
fix: to / wideTo
  • Loading branch information
lmangani authored Oct 3, 2023
2 parents e2f29f9 + d93fcfa commit ebbb859
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 327 deletions.
6 changes: 3 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
export PKG_CONFIG_PATH=$(pwd)

echo "Building fluxpipe-server ..."
go build -a -ldflags '-extldflags "-static -w -ldl"' -o fluxpipe-server fluxpipe-server.go
go build -a -ldflags '-extldflags "-static -w -ldl"' -o fluxpipe-server ./cmd/server

echo "Building fluxpipe-lambda ..."
go build -a -ldflags '-extldflags "-static -w -ldl"' -o fluxpipe-lambda fluxpipe-lambda.go
go build -a -ldflags '-extldflags "-static -w -ldl"' -o fluxpipe-lambda ./cmd/lambda

echo "Building fluxpipe-lib ..."
CGO_ENABLED=1 go build -buildmode=c-archive -o fluxpipelib.a fluxpipelib.go
CGO_ENABLED=1 go build -buildmode=c-archive -o fluxpipelib.a ./cmd/lib
# CGO_ENABLED=1 go build -buildmode=c-shared -o fluxpipelib.dylib fluxpipelib.go
28 changes: 28 additions & 0 deletions cmd/lambda/fluxpipe-lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"context"
"github.com/metrico/fluxpipe/service"

"github.com/aws/aws-lambda-go/lambda"
)

var APPNAME = "fluxpipe-lambda"

func exec(ctx context.Context, inputString string) (string, string) {
res, err := service.RunE(ctx, inputString)
return res, err.Error()
}

type FluxEvent struct {
Query string `json:"flux"`
}

func HandleRequest(ctx context.Context, flux FluxEvent) (string, error) {
buf, _ := exec(ctx, flux.Query)
return buf, nil
}

func main() {
lambda.Start(HandleRequest)
}
25 changes: 25 additions & 0 deletions cmd/lib/fluxpipelib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"C"
"context"
"fmt"
_ "github.com/InfluxCommunity/flux/fluxinit/static"
"github.com/metrico/fluxpipe/service"
)

// # CGO_ENABLED=1 go build -buildmode=c-shared -o fluxpipe.a fluxpipelib.go
// # CGO_ENABLED=1 go build -buildmode=c-archive -o fluxpipe.a fluxpipelib.go

var APPNAME = "fluxpipe-library"

//export Query
func Query(query string) string {
res, err := service.RunE(context.Background(), query)
if err != nil {
return fmt.Sprintf(`{"code":"invalid","message":"%v"}`, err.Error())
}
return res
}

func main() {}
114 changes: 11 additions & 103 deletions fluxpipe-server.go → cmd/server/fluxpipe-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,23 @@ package main

import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/InfluxCommunity/flux"
"github.com/InfluxCommunity/flux/csv"
_ "github.com/InfluxCommunity/flux/fluxinit/static"
"github.com/InfluxCommunity/flux/lang"
"github.com/InfluxCommunity/flux/memory"
"github.com/InfluxCommunity/flux/runtime"


_fluxhttp "github.com/InfluxCommunity/flux/dependencies/http"
"github.com/InfluxCommunity/flux/dependencies/secret"
"github.com/InfluxCommunity/flux/dependencies/url"

_ "embed"
"github.com/metrico/fluxpipe/service"
"github.com/metrico/fluxpipe/static"
"io/ioutil"
"net/http"
"os"
"strings"

"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)

var APPNAME = "fluxpipe"

//go:embed static/play.html
var PLAY []byte

//go:embed static/favicon.ico
var FAVICON []byte

func runQuery(ctx context.Context, script string) (flux.Query, error) {

program, err := lang.Compile(ctx, script, runtime.Default, time.Now())
if err != nil {
return nil, err
}

q, err := program.Start(ctx, memory.DefaultAllocator)
if err != nil {
return nil, err
}
return q, nil
}

func postQuery(c echo.Context) error {

c.Response().Header().Set(echo.HeaderContentType, "text/csv; charset=utf-8")
Expand Down Expand Up @@ -82,7 +48,7 @@ func postQuery(c echo.Context) error {
} else {
q := json_map["query"]
query := fmt.Sprintf("%v", q)
res, err := exec(query)
res, err := service.RunE(c.Request().Context(), query)
if err != nil {
c.Response().Header().Set(echo.HeaderContentType, "application/json; charset=utf-8")
c.Response().Header().Set("x-platform-error-code", "invalid")
Expand All @@ -93,8 +59,7 @@ func postQuery(c echo.Context) error {
}

} else {

res, err := exec(string(s))
res, err := service.RunE(c.Request().Context(), string(s))
if err != nil {
c.Response().Header().Set(echo.HeaderContentType, "application/json; charset=utf-8")
c.Response().Header().Set("x-platform-error-code", "invalid")
Expand All @@ -105,71 +70,14 @@ func postQuery(c echo.Context) error {
}
}

// NewCustomDependencies produces a Custom set of dependencies including EnvironmentSecretService.
func NewCustomDependencies() flux.Deps {
validator := url.PassValidator{}
return flux.Deps{
Deps: flux.WrappedDeps{
HTTPClient: _fluxhttp.NewLimitedDefaultClient(validator),
// Default to having no filesystem, no secrets, and no url validation (always pass).
FilesystemService: nil,
SecretService: secret.EnvironmentSecretService{},
URLValidator: validator,
},
}
}

func exec(inputString string) (string, error) {

// CustomDeps produces a Custom set of dependencies including EnvironmentSecretService.
customValidator := url.PassValidator{}
customDeps := flux.Deps{
Deps: flux.WrappedDeps{
HTTPClient: _fluxhttp.NewLimitedDefaultClient(customValidator),
FilesystemService: nil,
SecretService: secret.EnvironmentSecretService{},
URLValidator: customValidator,
},
}

// ctx := flux.NewDefaultDependencies().Inject(context.Background())
ctx := customDeps.Inject(context.Background())

q, err := runQuery(ctx, inputString)
if err != nil {
fmt.Println("unexpected error while creating query: %s", err)
return "", err
}

results := flux.NewResultIteratorFromQuery(q)
defer results.Release()

buf := bytes.NewBuffer(nil)
encoder := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())

if _, err := encoder.Encode(buf, results); err != nil {
return "", err
}

q.Done()

if q.Err() != nil {
fmt.Println("unexpected error from query execution: %s", q.Err())
return "", q.Err()

} else {
return buf.String(), nil
}
}

func main() {

port := flag.String("port", "8086", "API port")
stdin := flag.Bool("stdin", false, "STDIN mode")
cors := flag.Bool("cors", true, "API cors mode")
flag.Parse()

scanner := bufio.NewScanner((os.Stdin))
scanner := bufio.NewScanner(os.Stdin)
inputString := ""

if *stdin == true {
Expand All @@ -181,12 +89,12 @@ func main() {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}

buf, err := exec(inputString)
buf, err := service.RunE(context.Background(), inputString)
if err != nil {
fmt.Fprintln(os.Stderr, "we have some error: ", err)
return
}

fmt.Println(strings.Replace(buf, "\r\n", "\n", -1))

} else {
Expand All @@ -204,10 +112,10 @@ func main() {
}

e.GET("/", func(c echo.Context) error {
return c.Blob(http.StatusOK, "text/html", PLAY)
return c.Blob(http.StatusOK, "text/html", static.PLAY)
})
e.GET("/favicon.ico", func(c echo.Context) error {
return c.Blob(http.StatusOK, "image/x-icon", FAVICON)
return c.Blob(http.StatusOK, "image/x-icon", static.FAVICON)
})

e.GET("/hello", func(c echo.Context) error {
Expand Down
108 changes: 0 additions & 108 deletions fluxpipe-lambda.go

This file was deleted.

Loading

0 comments on commit ebbb859

Please sign in to comment.