forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopentelemetry.go
124 lines (101 loc) · 3.18 KB
/
opentelemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package opentelemetry
import (
"fmt"
"net"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"go.opentelemetry.io/collector/model/otlpgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type OpenTelemetry struct {
ServiceAddress string `toml:"service_address"`
MetricsSchema string `toml:"metrics_schema"`
tls.ServerConfig
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
grpcServer *grpc.Server
wg sync.WaitGroup
}
const sampleConfig = `
## Override the default (0.0.0.0:4317) destination OpenTelemetry gRPC service
## address:port
# service_address = "0.0.0.0:4317"
## Override the default (5s) new connection timeout
# timeout = "5s"
## Override the default (prometheus-v1) metrics schema.
## Supports: "prometheus-v1", "prometheus-v2"
## For more information about the alternatives, read the Prometheus input
## plugin notes.
# metrics_schema = "prometheus-v1"
## Optional TLS Config.
## For advanced options: https://github.com/influxdata/telegraf/blob/v1.18.3/docs/TLS.md
##
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key.
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
`
func (o *OpenTelemetry) SampleConfig() string {
return sampleConfig
}
func (o *OpenTelemetry) Description() string {
return "Receive OpenTelemetry traces, metrics, and logs over gRPC"
}
func (o *OpenTelemetry) Gather(_ telegraf.Accumulator) error {
return nil
}
func (o *OpenTelemetry) Start(accumulator telegraf.Accumulator) error {
var grpcOptions []grpc.ServerOption
if tlsConfig, err := o.ServerConfig.TLSConfig(); err != nil {
return err
} else if tlsConfig != nil {
grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
if o.Timeout > 0 {
grpcOptions = append(grpcOptions, grpc.ConnectionTimeout(time.Duration(o.Timeout)))
}
logger := &otelLogger{o.Log}
influxWriter := &writeToAccumulator{accumulator}
o.grpcServer = grpc.NewServer(grpcOptions...)
otlpgrpc.RegisterTracesServer(o.grpcServer, newTraceService(logger, influxWriter))
ms, err := newMetricsService(logger, influxWriter, o.MetricsSchema)
if err != nil {
return err
}
otlpgrpc.RegisterMetricsServer(o.grpcServer, ms)
otlpgrpc.RegisterLogsServer(o.grpcServer, newLogsService(logger, influxWriter))
listener, err := net.Listen("tcp", o.ServiceAddress)
if err != nil {
return err
}
o.wg.Add(1)
go func() {
if err := o.grpcServer.Serve(listener); err != nil {
accumulator.AddError(fmt.Errorf("failed to stop OpenTelemetry gRPC service: %w", err))
}
o.wg.Done()
}()
return nil
}
func (o *OpenTelemetry) Stop() {
if o.grpcServer != nil {
o.grpcServer.Stop()
}
o.wg.Wait()
}
func init() {
inputs.Add("opentelemetry", func() telegraf.Input {
return &OpenTelemetry{
ServiceAddress: "0.0.0.0:4317",
MetricsSchema: "prometheus-v1",
Timeout: config.Duration(5 * time.Second),
}
})
}