Skip to content

Commit

Permalink
simplify query logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryk-dk committed Feb 12, 2024
1 parent ebb14d0 commit d582180
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 75 deletions.
76 changes: 1 addition & 75 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@ import (
"net/http"
"sync"

"github.com/VictoriaMetrics/metricsql"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"

"github.com/VictoriaMetrics/grafana-logs-datasource/pkg/utils"
)

const (
Expand Down Expand Up @@ -119,67 +115,7 @@ func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query b
return newResponseError(err, backend.Status(resp.StatusCode))
}

labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = "labels"

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = "Time"

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = "Line"

labels := data.Labels{}

dec := json.NewDecoder(resp.Body)

for dec.More() {
var r Response
err := dec.Decode(&r)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}

for fieldName, value := range r {
switch fieldName {
case messageField:
lineField.Append(value)
case timeField:
getTime, err := utils.GetTime(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
timeFd.Append(getTime)
case streamField:
expr, err := metricsql.Parse(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
}
}
default:
labels[fieldName] = value
}
}

d, err := labelsToRawJson(labels)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
labelsField.Append(d)
}

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
return parseStreamResponse(resp.Body)
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand Down Expand Up @@ -221,13 +157,3 @@ func newResponseError(err error, httpStatus backend.Status) backend.DataResponse
log.DefaultLogger.Error(err.Error())
return backend.DataResponse{Status: httpStatus, Error: err}
}

func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {
// data.Labels when converted to JSON keep the fields sorted
bytes, err := json.Marshal(labels)
if err != nil {
return nil, err
}

return bytes, nil
}
91 changes: 91 additions & 0 deletions pkg/plugin/response.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,102 @@
package plugin

import (
"encoding/json"
"io"

"github.com/VictoriaMetrics/metricsql"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"

"github.com/VictoriaMetrics/grafana-logs-datasource/pkg/utils"
)

const (
messageField = "_msg"
streamField = "_stream"
timeField = "_time"

// Grafana logs fields
gLabelsField = "labels"
gTimeField = "Time"
gLineField = "Line"
)

// Response contains fields from query response
// It represents victoria logs response
type Response map[string]string

func parseStreamResponse(reader io.Reader) backend.DataResponse {

labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = gLabelsField

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = gTimeField

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = gLineField

labels := data.Labels{}

dec := json.NewDecoder(reader)

for dec.More() {
var r Response
err := dec.Decode(&r)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}

for fieldName, value := range r {
switch fieldName {
case messageField:
lineField.Append(value)
case timeField:
getTime, err := utils.GetTime(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
timeFd.Append(getTime)
case streamField:
expr, err := metricsql.Parse(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
}
}
default:
labels[fieldName] = value
}
}

d, err := labelsToRawJson(labels)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
labelsField.Append(d)
}

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
}

func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {
// data.Labels when converted to JSON keep the fields sorted
bytes, err := json.Marshal(labels)
if err != nil {
return nil, err
}

return bytes, nil
}

0 comments on commit d582180

Please sign in to comment.