-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathnode_resources.go
233 lines (204 loc) · 8.13 KB
/
node_resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package collector
import (
"github.com/google-cloud-tools/kube-eagle/kubernetes"
"github.com/google-cloud-tools/kube-eagle/options"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
"sync"
)
type nodeResourcesCollector struct {
kubernetesClient *kubernetes.Client
// Allocatable
allocatableCPUCoresDesc *prometheus.Desc
allocatableMemoryBytesDesc *prometheus.Desc
// Resource limits
limitCPUCoresDesc *prometheus.Desc
limitMemoryBytesDesc *prometheus.Desc
// Resource requests
requestCPUCoresDesc *prometheus.Desc
requestMemoryBytesDesc *prometheus.Desc
// Resource usage
usageCPUCoresDesc *prometheus.Desc
usageMemoryBytesDesc *prometheus.Desc
usagePodCount *prometheus.Desc
}
func init() {
registerCollector("node_resource", newNodeResourcesCollector)
}
func newNodeResourcesCollector(opts *options.Options) (Collector, error) {
subsystem := "node_resource"
labels := []string{"node"}
return &nodeResourcesCollector{
// Prometheus metrics
// Allocatable
allocatableCPUCoresDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "allocatable_cpu_cores"),
"Allocatable CPU cores on a specific node in Kubernetes",
labels,
prometheus.Labels{},
),
allocatableMemoryBytesDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "allocatable_memory_bytes"),
"Allocatable memory bytes on a specific node in Kubernetes",
labels,
prometheus.Labels{},
),
// Resource limits
limitCPUCoresDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "limits_cpu_cores"),
"Total limit CPU cores of all specified pod resources on a node",
labels,
prometheus.Labels{},
),
limitMemoryBytesDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "limits_memory_bytes"),
"Total limit of RAM bytes of all specified pod resources on a node",
labels,
prometheus.Labels{},
),
// Resource requests
requestCPUCoresDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "requests_cpu_cores"),
"Total request of CPU cores of all specified pod resources on a node",
labels,
prometheus.Labels{},
),
requestMemoryBytesDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "requests_memory_bytes"),
"Total request of RAM bytes of all specified pod resources on a node",
labels,
prometheus.Labels{},
),
// Resource usage
usageCPUCoresDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "usage_cpu_cores"),
"Total number of used CPU cores on a node",
labels,
prometheus.Labels{},
),
usageMemoryBytesDesc: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "usage_memory_bytes"),
"Total number of RAM bytes used on a node",
labels,
prometheus.Labels{},
),
usagePodCount: prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, subsystem, "usage_pod_count"),
"Total number of running pods for each kubernetes node",
labels,
prometheus.Labels{},
),
}, nil
}
func (c *nodeResourcesCollector) updateMetrics(ch chan<- prometheus.Metric) error {
log.Debug("Collecting node metrics")
var wg sync.WaitGroup
var nodeList *corev1.NodeList
var nodeListError error
var podList *corev1.PodList
var podListError error
var nodeMetricsList *v1beta1.NodeMetricsList
var nodeMetricsListError error
// Get pod list
wg.Add(1)
go func() {
defer wg.Done()
podList, podListError = kubernetesClient.PodList()
}()
// Get node list
wg.Add(1)
go func() {
defer wg.Done()
nodeList, nodeListError = kubernetesClient.NodeList()
}()
// Get node resource usage metrics
wg.Add(1)
go func() {
defer wg.Done()
nodeMetricsList, nodeMetricsListError = kubernetesClient.NodeMetricses()
}()
wg.Wait()
if podListError != nil {
log.Warn("Failed to get podList from Kubernetes", podListError)
return podListError
}
if nodeListError != nil {
log.Warn("Failed to get nodeList from Kubernetes", nodeListError)
return nodeListError
}
if nodeMetricsListError != nil {
log.Warn("Failed to get podList from Kubernetes", nodeMetricsListError)
return nodeMetricsListError
}
nodeMetricsByNodeName := getNodeMetricsByNodeName(nodeMetricsList)
podMetricsByNodeName := getAggregatedPodMetricsByNodeName(podList)
for _, n := range nodeList.Items {
// allocatable
allocatableCPU := n.Status.Allocatable.Cpu().Value()
allocatableMemoryBytes := float64(n.Status.Allocatable.Memory().MilliValue()) / 1000
ch <- prometheus.MustNewConstMetric(c.allocatableCPUCoresDesc, prometheus.GaugeValue, float64(allocatableCPU), n.Name)
ch <- prometheus.MustNewConstMetric(c.allocatableMemoryBytesDesc, prometheus.GaugeValue, float64(allocatableMemoryBytes), n.Name)
// resource usage
usageMetrics := nodeMetricsByNodeName[n.Name]
usageCPU := float64(usageMetrics.Usage.Cpu().MilliValue()) / 1000
usageMemoryBytes := float64(usageMetrics.Usage.Memory().MilliValue()) / 1000
ch <- prometheus.MustNewConstMetric(c.usageCPUCoresDesc, prometheus.GaugeValue, float64(usageCPU), n.Name)
ch <- prometheus.MustNewConstMetric(c.usageMemoryBytesDesc, prometheus.GaugeValue, float64(usageMemoryBytes), n.Name)
// aggregated pod metrics (e. g. resource requests by node)
podMetrics := podMetricsByNodeName[n.Name]
ch <- prometheus.MustNewConstMetric(c.requestCPUCoresDesc, prometheus.GaugeValue, podMetrics.requestedCPUCores, n.Name)
ch <- prometheus.MustNewConstMetric(c.requestMemoryBytesDesc, prometheus.GaugeValue, float64(podMetrics.requestedMemoryBytes), n.Name)
ch <- prometheus.MustNewConstMetric(c.limitCPUCoresDesc, prometheus.GaugeValue, podMetrics.limitCPUCores, n.Name)
ch <- prometheus.MustNewConstMetric(c.limitMemoryBytesDesc, prometheus.GaugeValue, float64(podMetrics.limitMemoryBytes), n.Name)
ch <- prometheus.MustNewConstMetric(c.usagePodCount, prometheus.GaugeValue, float64(podMetrics.podCount), n.Name)
}
return nil
}
// getNodeMetricsByNodeName returns a map of node metrics where the keys are the particular node names
func getNodeMetricsByNodeName(nodeMetricsList *v1beta1.NodeMetricsList) map[string]v1beta1.NodeMetrics {
nodeMetricsByName := make(map[string]v1beta1.NodeMetrics)
for _, metrics := range nodeMetricsList.Items {
nodeMetricsByName[metrics.Name] = metrics
}
return nodeMetricsByName
}
type aggregatedPodMetrics struct {
podCount uint16
containerCount uint16
requestedMemoryBytes int64
requestedCPUCores float64
limitMemoryBytes int64
limitCPUCores float64
}
// getAggregatedPodMetricsByNodeName returns a map of aggregated pod metrics grouped by node name.
func getAggregatedPodMetricsByNodeName(pods *corev1.PodList) map[string]aggregatedPodMetrics {
podMetrics := make(map[string]aggregatedPodMetrics)
// Iterate through all pod definitions to sum and group pods' resource requests and limits by node name
for _, podInfo := range pods.Items {
nodeName := podInfo.Spec.NodeName
// skip not running pods (e. g. failed/succeeded jobs, evicted pods etc.)
podPhase := podInfo.Status.Phase
if podPhase == corev1.PodFailed || podPhase == corev1.PodSucceeded {
continue
}
// Don't increment this counter for failed / non running pods
podCount := podMetrics[nodeName].podCount + 1
for _, c := range podInfo.Spec.Containers {
requestedCPUCores := float64(c.Resources.Requests.Cpu().MilliValue()) / 1000
requestedMemoryBytes := c.Resources.Requests.Memory().MilliValue() / 1000
limitCPUCores := float64(c.Resources.Limits.Cpu().MilliValue()) / 1000
limitMemoryBytes := c.Resources.Limits.Memory().MilliValue() / 1000
podMetrics[nodeName] = aggregatedPodMetrics{
podCount: podCount,
containerCount: podMetrics[nodeName].containerCount + 1,
requestedCPUCores: podMetrics[nodeName].requestedCPUCores + requestedCPUCores,
requestedMemoryBytes: podMetrics[nodeName].requestedMemoryBytes + requestedMemoryBytes,
limitCPUCores: podMetrics[nodeName].limitCPUCores + limitCPUCores,
limitMemoryBytes: podMetrics[nodeName].limitMemoryBytes + limitMemoryBytes,
}
}
}
return podMetrics
}