-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathcollector.go
111 lines (96 loc) · 3.8 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package collector
import (
"fmt"
"github.com/google-cloud-tools/kube-eagle/kubernetes"
"github.com/google-cloud-tools/kube-eagle/options"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
type collectorFactoryFunc = func(opts *options.Options) (Collector, error)
var (
kubernetesClient *kubernetes.Client
scrapeDurationDesc *prometheus.Desc
scrapeSuccessDesc *prometheus.Desc
factoriesByCollectorName = make(map[string]collectorFactoryFunc)
)
// registerCollector adds a collector to the registry so that it's Update() method will be called every time
// the metrics endpoint is triggered
func registerCollector(collectorName string, collectorFactory collectorFactoryFunc) {
log.Debugf("Registering collector '%s'", collectorName)
factoriesByCollectorName[collectorName] = collectorFactory
}
// KubeEagleCollector implements the prometheus collector interface
type KubeEagleCollector struct {
CollectorByName map[string]Collector
}
// NewKubeEagleCollector creates a new KubeEagle collector which can be considered as manager of multiple collectors
func NewKubeEagleCollector(opts *options.Options) (*KubeEagleCollector, error) {
// Create registered collectors by executing it's collector factory function
collectorByName := make(map[string]Collector)
for collectorName, factory := range factoriesByCollectorName {
log.Debugf("Creating collector '%s'", collectorName)
collector, err := factory(opts)
if err != nil {
return nil, fmt.Errorf("failed to create collector '%s': '%s'", collectorName, err)
}
collectorByName[collectorName] = collector
}
var err error
kubernetesClient, err = kubernetes.NewClient(opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize kubernetes client: '%v'", err)
}
scrapeDurationDesc = prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, "scrape", "collector_duration_seconds"),
"Kube Eagle: Duration of a collector scrape.",
[]string{"collector"},
nil,
)
scrapeSuccessDesc = prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, "scrape", "collector_success"),
"Kube Eagle: Whether a collector succeeded.",
[]string{"collector"},
nil,
)
return &KubeEagleCollector{CollectorByName: collectorByName}, nil
}
// Describe implements the prometheus.Collector interface
func (k KubeEagleCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
ch <- scrapeSuccessDesc
}
// Collect implements the prometheus.Collector interface
func (k KubeEagleCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
// Run all collectors concurrently and add meta information about that (such as request duration and error/success count)
for name, collector := range k.CollectorByName {
wg.Add(1)
go func(wg *sync.WaitGroup, collectorName string, c Collector) {
defer wg.Done()
begin := time.Now()
err := c.updateMetrics(ch)
duration := time.Since(begin)
var isSuccess float64
if err != nil {
log.Errorf("Collector '%s' failed after %fs: %s", collectorName, duration.Seconds(), err)
isSuccess = 0
} else {
log.Debugf("Collector '%s' succeeded after %fs.", collectorName, duration.Seconds())
isSuccess = 1
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), collectorName)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, isSuccess, collectorName)
}(&wg, name, collector)
}
wg.Wait()
}
// IsHealthy returns a bool which indicates whether the collector is working properly or not
func (k KubeEagleCollector) IsHealthy() bool {
return kubernetesClient.IsHealthy()
}
// Collector is an interface which has to be implemented for each collector which wants to expose metrics
type Collector interface {
updateMetrics(ch chan<- prometheus.Metric) error
}