Skip to content

Commit

Permalink
[plugins] Add otel exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
csweichel committed May 12, 2022
1 parent 6b293df commit 85fda75
Show file tree
Hide file tree
Showing 11 changed files with 4,045 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugins/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ packages:
- plugins/github-repo:app
- plugins/github-integration:app
- plugins/integration-example:app
- plugins/otel-exporter:app
- plugins/webhook:app
config:
commands:
Expand Down
13 changes: 13 additions & 0 deletions plugins/otel-exporter/BUILD.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
packages:
- name: app
type: go
deps:
- //:plugin-client-lib
srcs:
- "main.go"
- "go.mod"
- "go.sum"
env:
- CGO_ENABLED=0
config:
buildFlags: ["-o", "werft-plugin-otel-exporter"]
14 changes: 14 additions & 0 deletions plugins/otel-exporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
This plugin emits OpenTelemetry tracing data for werft builds.

## Configuration
```YAML
# which OTel exporter to use. Supported values are "stdout" and "http"
exporter: "http"
```
When using the `http` exporter, you can configure its behaviour using the `OTEL` environment variables, e.g.
```bash
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.honeycomb.io/"
export OTEL_EXPORTER_OTLP_HEADERS="x-honeycomb-team=your-api-key,x-honeycomb-dataset=your-dataset"
export OTEL_SERVICE_NAME="your-service-name"
```
3,184 changes: 3,184 additions & 0 deletions plugins/otel-exporter/example-log.txt

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions plugins/otel-exporter/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module github.com/csweichel/werft/plugins/otel-exporter

go 1.17

replace github.com/csweichel/werft => ../..

require (
github.com/csweichel/werft v0.0.0-00010101000000-000000000000
github.com/golang/mock v1.5.0
github.com/sirupsen/logrus v1.8.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.7.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
google.golang.org/protobuf v1.28.0
)

require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.46.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
464 changes: 464 additions & 0 deletions plugins/otel-exporter/go.sum

Large diffs are not rendered by default.

238 changes: 238 additions & 0 deletions plugins/otel-exporter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package main

import (
"context"
"errors"
"fmt"
"io"
"reflect"
"sync"
"time"

v1 "github.com/csweichel/werft/pkg/api/v1"
"github.com/csweichel/werft/pkg/filterexpr"
"github.com/csweichel/werft/pkg/plugin/client"
plugin "github.com/csweichel/werft/pkg/plugin/client"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

// "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"

// "go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Config configures this plugin
type Config struct {
Filter []string `yaml:"filter"`
Exporter OTelExporter `yaml:"exporter"`
}

type OTelExporter string

const (
OTelExporterStdout OTelExporter = "stdout"
OTelExporterHTTP OTelExporter = "http"
)

func main() {
plugin.Serve(&Config{},
plugin.WithIntegrationPlugin(&otelExporterPlugin{}),
)
}

type otelExporterPlugin struct{}

func (*otelExporterPlugin) Run(ctx context.Context, config interface{}, srv *client.Services) error {
cfg, ok := config.(*Config)
if !ok {
return fmt.Errorf("config has wrong type %s", reflect.TypeOf(config))
}

var opts []sdktrace.TracerProviderOption
switch cfg.Exporter {
case OTelExporterStdout:
out, _ := stdouttrace.New(stdouttrace.WithPrettyPrint())
opts = append(opts, sdktrace.WithSpanProcessor(sdktrace.NewSimpleSpanProcessor(out)))
case OTelExporterHTTP:
exporter, err := otlptrace.New(ctx, otlptracehttp.NewClient())
if err != nil {
return err
}
opts = append(opts, sdktrace.WithBatcher(exporter))
default:
return fmt.Errorf("unsupported exporter: %s", cfg.Exporter)
}

tp := sdktrace.NewTracerProvider(opts...)
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)

filter, err := filterexpr.Parse(cfg.Filter)
if err != nil {
return fmt.Errorf("cannot parse filter: %w", err)
}

sub, err := srv.Subscribe(ctx, &v1.SubscribeRequest{
Filter: []*v1.FilterExpression{{Terms: filter}},
})
if err != nil {
return fmt.Errorf("cannot subscribe: %w", err)
}

var wg sync.WaitGroup
defer wg.Wait()

jobs := make(map[string]context.CancelFunc)
for {
resp, err := sub.Recv()
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || resp == nil {
return nil
}
if err != nil {
return fmt.Errorf("subscription error: %w", err)
}

job := resp.Result
if _, exists := jobs[job.Name]; exists {
continue
}
if job.Phase == v1.JobPhase_PHASE_DONE || job.Phase == v1.JobPhase_PHASE_CLEANUP {
continue
}

jctx, cancel := context.WithCancel(context.Background())
jobs[job.Name] = cancel

wg.Add(1)
go watchJob(jctx, &wg, srv, job)
}
}

func watchJob(ctx context.Context, wg *sync.WaitGroup, srv *client.Services, job *v1.JobStatus) {
defer wg.Done()

jobName := job.Name
log := logrus.WithField("job", jobName)

tracer := otel.GetTracerProvider().Tracer("github.com/csweichel/werft/plugins/otel-exporter")

log.Info("exporting telemetry for this job")

var jobAttributes []attribute.KeyValue
for _, a := range job.Metadata.Annotations {
jobAttributes = append(jobAttributes, attribute.String(fmt.Sprintf("werft.annotation.%s", a.Key), a.Value))
}
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.owner", job.Metadata.Owner))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.jobSpecName", job.Metadata.JobSpecName))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.trigger", job.Metadata.Trigger.String()))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.created", job.Metadata.Created.AsTime().Format(time.RFC3339)))
if job.Metadata.Repository != nil {
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.repo.host", job.Metadata.Repository.Host))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.repo.owner", job.Metadata.Repository.Owner))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.repo.ref", job.Metadata.Repository.Ref))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.repo.repo", job.Metadata.Repository.Repo))
jobAttributes = append(jobAttributes, attribute.String("werft.metadata.repo.revision", job.Metadata.Repository.Revision))
}

ctx, jobSpan := tracer.Start(ctx, jobName, trace.WithAttributes(
attribute.String("werft.type", "job"),
), trace.WithAttributes(jobAttributes...))
defer jobSpan.End()

sub, err := srv.Listen(ctx, &v1.ListenRequest{
Name: jobName,
Updates: true,
Logs: v1.ListenRequestLogs_LOGS_RAW,
})
if err != nil {
log.WithError(err).Error("failed to listen to job")
return
}
defer log.Debug("done exporting telemetry for this job")

var (
jobPhase v1.JobPhase
phaseSpan trace.Span
phaseSpanCtx context.Context
sliceSpans = make(map[string]trace.Span)
)
defer func() {
if phaseSpan != nil {
phaseSpan.End()
}
}()

newPhaseSpan := func(name string) {
if phaseSpan != nil {
phaseSpan.End()
}
phaseSpanCtx, phaseSpan = tracer.Start(ctx, name, trace.WithAttributes(
attribute.String("werft.type", "phase"),
), trace.WithAttributes(jobAttributes...))
}
handleSlice := func(slice *v1.LogSliceEvent) {
if slice == nil {
return
}

name := slice.Name

switch slice.Type {
case v1.LogSliceType_SLICE_START, v1.LogSliceType_SLICE_CONTENT:
if phaseSpanCtx == nil {
// phase hasn't started yet - create a default one
newPhaseSpan(name)
}

if _, ok := sliceSpans[name]; !ok {
_, s := tracer.Start(phaseSpanCtx, name, trace.WithAttributes(
attribute.String("werft.type", "slice"),
), trace.WithAttributes(jobAttributes...))
sliceSpans[name] = s
}

case v1.LogSliceType_SLICE_DONE, v1.LogSliceType_SLICE_ABANDONED, v1.LogSliceType_SLICE_FAIL:
if s, ok := sliceSpans[name]; ok {
if slice.Type == v1.LogSliceType_SLICE_FAIL {
s.SetStatus(codes.Error, slice.Payload)
}

s.End()
delete(sliceSpans, name)
}

case v1.LogSliceType_SLICE_PHASE:
newPhaseSpan(name)

case v1.LogSliceType_SLICE_RESULT:
jobSpan.AddEvent("result "+name, trace.WithAttributes(attribute.String("payload", slice.Payload)))
}
}

for {
update, err := sub.Recv()
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || update == nil {
return
}

switch ctnt := update.Content.(type) {
case *v1.ListenResponse_Slice:
handleSlice(ctnt.Slice)
case *v1.ListenResponse_Update:
if jobPhase != ctnt.Update.Phase {
jobSpan.AddEvent(fmt.Sprintf("job-phase-%s", ctnt.Update.Phase))
jobPhase = ctnt.Update.Phase
}
if jobPhase == v1.JobPhase_PHASE_DONE || jobPhase == v1.JobPhase_PHASE_CLEANUP {
return
}
}
}
}
84 changes: 84 additions & 0 deletions plugins/otel-exporter/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"bytes"
"context"
"io"
"testing"
"time"

v1 "github.com/csweichel/werft/pkg/api/v1"
"github.com/csweichel/werft/pkg/api/v1/mock"
"github.com/csweichel/werft/pkg/logcutter"
"github.com/csweichel/werft/pkg/plugin/client"
"github.com/golang/mock/gomock"
"google.golang.org/protobuf/types/known/timestamppb"

_ "embed"
)

func TestOtelExporterPlugin(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
jobName = "test-job"
jobMD = &v1.JobMetadata{
Owner: "someone",
Repository: &v1.Repository{},
Trigger: v1.JobTrigger_TRIGGER_MANUAL,
Created: timestamppb.New(time.UnixMilli(0)),
}
)

var statusIdx int
status := []*v1.JobStatus{
{
Name: jobName,
Metadata: jobMD,
Phase: v1.JobPhase_PHASE_PREPARING,
},
}

sub := mock.NewMockWerftService_SubscribeClient(ctrl)
sub.EXPECT().Recv().DoAndReturn(func() (*v1.SubscribeResponse, error) {
if statusIdx >= len(status) {
time.Sleep(1 * time.Millisecond)
return nil, io.EOF
}
res := &v1.SubscribeResponse{Result: status[statusIdx]}
statusIdx++
return res, nil
}).AnyTimes()

logevt, errchan := logcutter.DefaultCutter.Slice(bytes.NewReader([]byte(logtext)))

logsClient := mock.NewMockWerftService_ListenClient(ctrl)
logsClient.EXPECT().Recv().DoAndReturn(func() (*v1.ListenResponse, error) {
select {
case err := <-errchan:
return nil, err
case evt := <-logevt:
if evt != nil && evt.Type != v1.LogSliceType_SLICE_CONTENT {
time.Sleep(1 * time.Millisecond)
t.Log(evt)
}
return &v1.ListenResponse{Content: &v1.ListenResponse_Slice{Slice: evt}}, nil
}
}).AnyTimes()

werftClient := mock.NewMockWerftServiceClient(ctrl)
werftClient.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(sub, nil)
werftClient.EXPECT().Listen(gomock.Any(), gomock.Any()).Return(logsClient, nil)

plugin := &otelExporterPlugin{}
err := plugin.Run(context.Background(), &Config{
Exporter: OTelExporterStdout,
}, &client.Services{WerftServiceClient: werftClient})
if err != nil {
t.Fatal(err)
}
}

//go:embed example-log.txt
var logtext string
Loading

0 comments on commit 85fda75

Please sign in to comment.