-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathclient_exec.go
200 lines (168 loc) · 6.37 KB
/
client_exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package kube
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
executil "k8s.io/client-go/util/exec"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
"github.com/akash-network/provider/cluster"
"github.com/akash-network/provider/cluster/kube/builder"
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
)
// the type implementing the interface returned by the Exec command
type execResult struct {
exitCode int
}
func (er execResult) ExitCode() int {
return er.exitCode
}
// a type to allow a slice of kubernetes pods to be sorted
type sortablePods []corev1.Pod
func (sp sortablePods) Len() int {
return len(sp)
}
func (sp sortablePods) Less(i, j int) bool {
return strings.Compare(sp[i].Name, sp[j].Name) == -1
}
func (sp sortablePods) Swap(i, j int) {
sp[i], sp[j] = sp[j], sp[i]
}
func (c *client) Exec(ctx context.Context, leaseID mtypes.LeaseID, serviceName string, podIndex uint, cmd []string, stdin io.Reader,
stdout io.Writer,
stderr io.Writer, tty bool,
tsq remotecommand.TerminalSizeQueue) (ctypes.ExecResult, error) {
namespace := builder.LidNS(leaseID)
mani, err := c.ac.AkashV2beta2().Manifests(c.ns).Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("%w: failed getting manifest", err)
}
loop:
for idx := range mani.Spec.Group.Services {
if mani.Spec.Group.Services[idx].Name == serviceName {
break loop
}
if idx == len(mani.Spec.Group.Services)-1 {
return nil, cluster.ErrExecNoServiceWithName
}
}
// Check that the pod exists
pods, err := c.kc.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
TypeMeta: metav1.TypeMeta{},
LabelSelector: fmt.Sprintf("akash.network/manifest-service=%s", serviceName),
})
if err != nil {
return nil, fmt.Errorf("%w: failed getting pods in namespace %q", err, namespace)
}
// if no pods are found yet then the deployment hasn't been spun up kubernetes yet
if 0 == len(pods.Items) {
return nil, cluster.ErrExecServiceNotRunning
}
// check that the requested pod is within the range
if podIndex >= uint(len(pods.Items)) {
return nil, fmt.Errorf("%w: valid range is [0, %d]", cluster.ErrExecPodIndexOutOfRange, len(pods.Items)-1)
}
// sort the pods, since we have no idea what order kubernetes returns them in
podsEff := sortablePods(pods.Items)
sort.Sort(podsEff)
selectedPod := podsEff[podIndex]
// validate the pod is in a state where it can be connected to
switch selectedPod.Status.Phase {
case corev1.PodSucceeded:
return nil, fmt.Errorf("%w: the service has completed", cluster.ErrExecServiceNotRunning)
case corev1.PodFailed:
return nil, fmt.Errorf("%w: the service has failed", cluster.ErrExecServiceNotRunning)
default:
}
// Check the conditions, make sure the pod is marked as ready
isReady := false
for _, cond := range selectedPod.Status.Conditions {
if cond.Type == corev1.PodReady {
isReady = cond.Status == corev1.ConditionTrue
}
}
if !isReady {
return nil, fmt.Errorf("%w: the service is not ready", cluster.ErrExecServiceNotRunning)
}
podName := selectedPod.Name
containerName := serviceName // Container name is always the same as the service name
// Define the necessary runtime scheme & codec to send the request
groupVersion := schema.GroupVersion{Group: "api", Version: "v1"}
myScheme := runtime.NewScheme()
err = corev1.AddToScheme(myScheme)
if err != nil {
return nil, err
}
myParameterCodec := runtime.NewParameterCodec(myScheme)
myScheme.AddKnownTypes(groupVersion, &corev1.PodExecOptions{})
kubeConfig := restclient.CopyConfig(c.kubeContentConfig) // Make a local copy of the configuration
kubeConfig.GroupVersion = &groupVersion
codecFactory := serializer.NewCodecFactory(myScheme)
negotiatedSerializer := runtime.NegotiatedSerializer(codecFactory)
kubeConfig.NegotiatedSerializer = negotiatedSerializer
kubeRestClient, err := restclient.RESTClientFor(kubeConfig)
if err != nil {
return nil, fmt.Errorf("%w: failed getting REST client", err)
}
c.log.Info("Opening container shell", "namespace", namespace, "pod", podName, "container", containerName)
if tty {
// disable stderr if running as a TTY, results come back over stdout
stderr = nil
}
const subResource = "exec" // This value copied from kubectl and never changes
// Configure the request
req := kubeRestClient.Post().Resource("pods").Name(podName).Namespace(namespace).SubResource(subResource)
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: tty,
}, myParameterCodec)
// Make the request with SPDY
exec, err := remotecommand.NewSPDYExecutor(kubeConfig, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("%w: execution via SPDY failed", err)
}
// Run, passing in the streams and everything else. This runs until the remote end closes
// or the streams close
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin, // any reader
Stdout: stdout, // any writer
Stderr: stderr, // any writer
Tty: tty,
TerminalSizeQueue: tsq,
})
if err == nil {
// No error means the process returned a 0 exit code
return execResult{exitCode: 0}, nil
}
// Check to see if the process ran & returned an exit code
// If this is true, don't return an error. Something ran in the
// container which is what this code was trying to do
terr := executil.CodeExitError{}
if errors.As(err, &terr) {
return execResult{exitCode: terr.Code}, nil
}
// Some errors are untyped, use string matching to give better answers
if strings.Contains(err.Error(), "error executing command in container") {
if strings.Contains(err.Error(), "no such file or directory") || strings.Contains(err.Error(), "executable file not found in $PATH") {
return nil, cluster.ErrExecCommandDoesNotExist
}
// Don't send the full text of unknown errors back to the user
// Log the error here so this can be tracked down somehow in the provider logs at least
c.log.Error("command execution failed", "err", err)
return nil, cluster.ErrExecCommandExecutionFailed
}
return nil, err
}