Skip to content

Commit

Permalink
Update fluxpipe.go
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Apr 5, 2022
1 parent 8c1d74b commit 11c1c01
Showing 1 changed file with 21 additions and 52 deletions.
73 changes: 21 additions & 52 deletions fluxpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bufio"
"os"
"context"
// "io"
// "log"
"fmt"
"strings"
"bytes"
Expand All @@ -21,7 +19,7 @@ import (
"github.com/influxdata/flux/runtime"
)

var APPNAME = "flux-pipe"
var APPNAME = "fluxpipe"

func runQuery(ctx context.Context, script string) (flux.Query, func(), error) {
program, err := lang.Compile(script, runtime.Default, time.Unix(0, 0))
Expand All @@ -38,71 +36,42 @@ func runQuery(ctx context.Context, script string) (flux.Query, func(), error) {
return q, deps.Finish, nil
}

var validScript = `
import "csv"
data = "
#datatype,string,long,long,string
#group,false,false,false,true
#default,_result,,,
,result,table,value,tag
,,0,10,a
,,0,10,a
,,1,20,b
,,1,20,b
,,2,30,c
,,2,30,c
,,3,40,d
,,3,40,d
"
csv.from(csv: data) |> filter(fn: (r) => r["value"] >= 20) |> yield(name: "res") `

func main() {


scanner := bufio.NewScanner((os.Stdin))
inputString := ""

for scanner.Scan() {
inputString = inputString + "\n" + scanner.Text()
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}

// fmt.Println(inputString)

q, close, err := runQuery(context.Background(), inputString)
if err != nil {
fmt.Println("unexpected error while creating query: %s", err)
}
defer close()

results := flux.NewResultIteratorFromQuery(q)
defer results.Release()

buf := bytes.NewBuffer(nil)
encoder := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())

if _, err := encoder.Encode(buf, results); err != nil {
panic(err)
}

// This substitution is done because the testable example's Output
// section cannot contain carriage return while the csv encoder emits them
fmt.Println(strings.Replace(buf.String(), "\r\n", "\n", -1))

// release query resources
q.Done()
if err != nil {
fmt.Println("unexpected error while creating query: %s", err)
}
defer close()

if q.Err() != nil {
fmt.Println("unexpected error from query execution: %s", q.Err())
}
results := flux.NewResultIteratorFromQuery(q)
defer results.Release()

buf := bytes.NewBuffer(nil)
encoder := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())

if _, err := encoder.Encode(buf, results); err != nil {
panic(err)
}

// This substitution is done because the testable example's Output
// section cannot contain carriage return while the csv encoder emits them
fmt.Println(strings.Replace(buf.String(), "\r\n", "\n", -1))

// release query resources
q.Done()

if q.Err() != nil {
fmt.Println("unexpected error from query execution: %s", q.Err())
}
}


0 comments on commit 11c1c01

Please sign in to comment.