-
Notifications
You must be signed in to change notification settings - Fork 880
/
util.ts
192 lines (164 loc) · 7.06 KB
/
util.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright 2016-2019, Pulumi Corporation. All rights reserved.
import * as k8s from "@pulumi/kubernetes";
import * as pulumi from "@pulumi/pulumi";
import { spawn } from "child_process";
import * as http from "http";
export interface PromPortForwardOpts {
localPort: number;
targetPort?: number;
}
// Use `kubectl port-forward` to forward Prometheus service locally; because this command will run
// forever, this function returns a handle to the process that must be called to kill the process.
// Usually you want to pass this in to `checkHttpLatency`, which will properly clean this up:
//
// util.checkHttpLatency(canary, containerName, {forwarderHandle: forwarderHandle [...]})
//
// This is useful for when we can't run Pulumi in-cluster, in which case simply calling to the
// appropriate KubeDNS URL is not sufficient.
export function forwardPrometheusService(
service: pulumi.Input<k8s.core.v1.Service>,
deployment: pulumi.Input<k8s.extensions.v1beta1.Deployment>,
opts: PromPortForwardOpts,
): pulumi.Output<() => void> {
if (pulumi.runtime.isDryRun()) {
return pulumi.output(() => undefined);
}
return pulumi.all([service, deployment]).apply(([s, d]) => pulumi.all([s.metadata, d.urn])).apply(([meta]) => {
return new Promise<() => void>((resolve, reject) => {
const forwarderHandle = spawn("kubectl", [
"port-forward",
`service/${meta.name}`,
`${opts.localPort}:${opts.targetPort || 80}`,
]);
// NOTE: we need to wrap `forwarderHandle.kill` because of JavaScript's `this`
// semantics.
forwarderHandle.stdout.on("data", data => resolve(() => forwarderHandle.kill()));
forwarderHandle.stderr.on("data", data => reject());
});
});
}
export interface CheckLatencyOpts {
// Duration of time to periodically poll latency metrics. In seconds.
durationSeconds: number;
// Period of time to wait between polling metrics. In seconds. Defaults to 1.
periodSeconds?: number;
// Quantile of latency responses to check. One of: {0.9 | 0.99}.
quantile: 0.9 | 0.99;
// Threshold on which we will fail the deployment. In microseconds.
thresholdMicroseconds: number;
// Location of the Prometheus endpoint to query.
prometheusEndpoint: string;
forwarderHandle: pulumi.Output<() => void>;
}
// Polls Prometheus on some customizable period and checks that the HTTP request latency for a
// specified quartile does not exceed some maximum threshold.
export function checkHttpLatency(
canary: k8s.apps.v1beta1.Deployment,
containerName: string,
opts: CheckLatencyOpts,
): pulumi.Output<string> {
if (pulumi.runtime.isDryRun()) {
return pulumi.output(Promise.resolve("<computed value>"));
}
const url = `http://${
opts.prometheusEndpoint
}/api/v1/query?query=http_request_duration_microseconds%7Bjob%3D%22kubernetes-pods%22%2C%20app%3D%22${containerName}%22%7D`;
//
// Turn an `http.get` into a `Promise<string>`.
//
const kill = opts.forwarderHandle || (() => undefined);
return pulumi.all([canary.urn, kill]).apply(([_, kill]) => {
console.log("Checking HTTP metrics");
// Poll Prometheus for metrics. If they drop below the specified threshold, we return error
// immediately.
return pollP8s(url, opts).then(latency => {
kill();
return latency;
});
});
}
function pollP8s(url: string, opts: CheckLatencyOpts): Promise<string> {
let timedOut = false;
setTimeout(_ => {
timedOut = true;
}, opts.durationSeconds * 1000);
function pollRecursive(): Promise<string> {
return getHttpLatency(url).then(bodyText => {
//
// Validate that the HTTP latency in microseconds has not gone above the safe threshold.
// If it has, reject, and if it hasn't, recursively poll until the timeout.
//
let microseconds = "";
const kontinue = () => {
return new Promise<string>(resolve =>
setTimeout(_ => {
resolve(microseconds);
}, (opts.periodSeconds || 1) * 1000),
).then(pollRecursive);
};
const body = JSON.parse(bodyText);
if (body.data.result.length === 0) {
if (timedOut) {
throw new Error(`Failed metrics check: no HTTP latency measurements returned`);
}
// No data yet. Return.
return kontinue();
}
for (const result of body.data.result) {
const quantile = result.metric["quantile"];
microseconds = result.value[1];
// Check HTTP latency metrics. Recursively poll if the metrics have not met the
// unacceptable latency threshold.
if (quantile === opts.quantile) {
if (microseconds === "" || microseconds === "NaN") {
if (timedOut) {
throw new Error(
`Failed metrics check: querying HTTP latency got '${microseconds}'`,
);
}
// Ignore invalid data.
return kontinue();
}
if (parseFloat(microseconds) > opts.thresholdMicroseconds) {
console.error(
`Failed metrics check: required < ${opts.thresholdMicroseconds.toString()} microseconds, got '${microseconds}'`,
);
throw new Error(
`Failed metrics check: required < ${opts.thresholdMicroseconds.toString()} microseconds, got '${microseconds}'`,
);
}
if (timedOut) {
return Promise.resolve(microseconds);
}
return kontinue();
}
}
throw new Error(
`Failed metrics check: required < 20000 microseconds, got '${microseconds}'`,
);
});
}
return pollRecursive();
}
function getHttpLatency(url: string): Promise<string> {
//
// Turn Prometheus metrics check into a `Promise<string>`
//
return new Promise<string>((resolve, reject) => {
const request = http.get(url, response => {
if (
response.statusCode === undefined ||
response.statusCode < 200 ||
response.statusCode > 299
) {
reject(new Error("Failed to load page, status code: " + response.statusCode));
}
// Append to the body until the end.
const body: string[] = [];
response.on("data", chunk => body.push(chunk.toString()));
response.on("end", () => resolve(body.join("")));
});
// Handle error case.
request.on("error", err => reject(err));
});
}