diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 9ac1f63..d49a518 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -7,14 +7,10 @@ import ( "net/http" "sync" - "github.com/VictoriaMetrics/metricsql" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/log" - "github.com/grafana/grafana-plugin-sdk-go/data" - - "github.com/VictoriaMetrics/grafana-logs-datasource/pkg/utils" ) const ( @@ -119,67 +115,7 @@ func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query b return newResponseError(err, backend.Status(resp.StatusCode)) } - labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0) - labelsField.Name = "labels" - - timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0) - timeFd.Name = "Time" - - lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0) - lineField.Name = "Line" - - labels := data.Labels{} - - dec := json.NewDecoder(resp.Body) - - for dec.More() { - var r Response - err := dec.Decode(&r) - if err != nil { - return newResponseError(err, backend.StatusInternal) - } - - for fieldName, value := range r { - switch fieldName { - case messageField: - lineField.Append(value) - case timeField: - getTime, err := utils.GetTime(value) - if err != nil { - return newResponseError(err, backend.StatusInternal) - } - timeFd.Append(getTime) - case streamField: - expr, err := metricsql.Parse(value) - if err != nil { - return newResponseError(err, backend.StatusInternal) - } - if mExpr, ok := expr.(*metricsql.MetricExpr); ok { - for _, filters := range mExpr.LabelFilterss { - for _, filter := range filters { - labels[filter.Label] = filter.Value - } - } - } - default: - labels[fieldName] = value - } - } - - d, err := labelsToRawJson(labels) - if err != nil { - return newResponseError(err, backend.StatusInternal) - } - labelsField.Append(d) - } - - frame := data.NewFrame("", timeFd, lineField, labelsField) - - rsp := backend.DataResponse{} - frame.Meta = &data.FrameMeta{} - rsp.Frames = append(rsp.Frames, frame) - - return rsp + return parseStreamResponse(resp.Body) } // CheckHealth handles health checks sent from Grafana to the plugin. @@ -221,13 +157,3 @@ func newResponseError(err error, httpStatus backend.Status) backend.DataResponse log.DefaultLogger.Error(err.Error()) return backend.DataResponse{Status: httpStatus, Error: err} } - -func labelsToRawJson(labels data.Labels) (json.RawMessage, error) { - // data.Labels when converted to JSON keep the fields sorted - bytes, err := json.Marshal(labels) - if err != nil { - return nil, err - } - - return bytes, nil -} diff --git a/pkg/plugin/response.go b/pkg/plugin/response.go index cd07d7e..83f3d13 100644 --- a/pkg/plugin/response.go +++ b/pkg/plugin/response.go @@ -1,11 +1,102 @@ package plugin +import ( + "encoding/json" + "io" + + "github.com/VictoriaMetrics/metricsql" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/VictoriaMetrics/grafana-logs-datasource/pkg/utils" +) + const ( messageField = "_msg" streamField = "_stream" timeField = "_time" + + // Grafana logs fields + gLabelsField = "labels" + gTimeField = "Time" + gLineField = "Line" ) // Response contains fields from query response // It represents victoria logs response type Response map[string]string + +func parseStreamResponse(reader io.Reader) backend.DataResponse { + + labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0) + labelsField.Name = gLabelsField + + timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0) + timeFd.Name = gTimeField + + lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0) + lineField.Name = gLineField + + labels := data.Labels{} + + dec := json.NewDecoder(reader) + + for dec.More() { + var r Response + err := dec.Decode(&r) + if err != nil { + return newResponseError(err, backend.StatusInternal) + } + + for fieldName, value := range r { + switch fieldName { + case messageField: + lineField.Append(value) + case timeField: + getTime, err := utils.GetTime(value) + if err != nil { + return newResponseError(err, backend.StatusInternal) + } + timeFd.Append(getTime) + case streamField: + expr, err := metricsql.Parse(value) + if err != nil { + return newResponseError(err, backend.StatusInternal) + } + if mExpr, ok := expr.(*metricsql.MetricExpr); ok { + for _, filters := range mExpr.LabelFilterss { + for _, filter := range filters { + labels[filter.Label] = filter.Value + } + } + } + default: + labels[fieldName] = value + } + } + + d, err := labelsToRawJson(labels) + if err != nil { + return newResponseError(err, backend.StatusInternal) + } + labelsField.Append(d) + } + + frame := data.NewFrame("", timeFd, lineField, labelsField) + + rsp := backend.DataResponse{} + frame.Meta = &data.FrameMeta{} + rsp.Frames = append(rsp.Frames, frame) + + return rsp +} + +func labelsToRawJson(labels data.Labels) (json.RawMessage, error) { + // data.Labels when converted to JSON keep the fields sorted + bytes, err := json.Marshal(labels) + if err != nil { + return nil, err + } + + return bytes, nil +}