Skip to content

Commit 518aba4

Browse files
committed
refactoring
1 parent b16185a commit 518aba4

File tree

2 files changed

+159
-99
lines changed

2 files changed

+159
-99
lines changed

pkg/plugin/mount.go

+129-99
Original file line numberDiff line numberDiff line change
@@ -13,87 +13,165 @@ import (
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
"k8s.io/apimachinery/pkg/util/wait"
1515
"k8s.io/client-go/kubernetes"
16-
"k8s.io/client-go/tools/clientcmd"
16+
// Remove clientcmd import if it's not used here anymore
1717
)
1818

19-
// BuildKubeClient creates a Kubernetes client
20-
func BuildKubeClient() (*kubernetes.Clientset, error) {
21-
kubeconfig := os.Getenv("KUBECONFIG")
22-
if kubeconfig == "" {
23-
home := os.Getenv("HOME")
24-
kubeconfig = fmt.Sprintf("%s/.kube/config", home)
19+
func Mount(namespace, pvcName, localMountPoint string) error {
20+
if _, err := os.Stat(localMountPoint); os.IsNotExist(err) {
21+
return fmt.Errorf("local mount point %s does not exist", localMountPoint)
2522
}
2623

27-
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
24+
clientset, err := BuildKubeClient()
2825
if err != nil {
29-
return nil, fmt.Errorf("failed to build Kubernetes config: %v", err)
26+
return err
3027
}
3128

32-
clientset, err := kubernetes.NewForConfig(config)
29+
pvc, err := checkPVCUsage(clientset, namespace, pvcName)
3330
if err != nil {
34-
return nil, fmt.Errorf("failed to create Kubernetes client: %v", err)
31+
return err
3532
}
3633

37-
return clientset, nil
38-
}
39-
40-
// Mount mounts a PVC to a local directory
41-
func Mount(namespace, pvcName, localMountPoint string) error {
42-
if _, err := os.Stat(localMountPoint); os.IsNotExist(err) {
43-
return fmt.Errorf("local mount point %s does not exist", localMountPoint)
34+
canMount, podUsingPVC, err := checkPVAccessMode(clientset, pvc, namespace) // Corrected the number of parameters
35+
if err != nil {
36+
return err
4437
}
4538

46-
rand.Seed(time.Now().UnixNano())
47-
suffix := randSeq(5)
48-
podName := fmt.Sprintf("volume-exposer-%s", suffix)
49-
port := rand.Intn(64511) + 1024 // Generate a random port between 1024 and 65535
39+
if !canMount {
40+
fmt.Printf("RWO volume is currently mounted by another pod: %s; mounting in this mode is not implemented yet.\n", podUsingPVC)
41+
return fmt.Errorf("mount operation for RWO volume that is already in use by pod %s is not implemented yet", podUsingPVC)
42+
}
5043

51-
sshKeyPath := fmt.Sprintf("%s/.ssh/id_rsa.pub", os.Getenv("HOME"))
52-
sshKey, err := ioutil.ReadFile(sshKeyPath)
44+
sshKey, err := readSSHKey()
5345
if err != nil {
54-
return fmt.Errorf("failed to read SSH public key: %v", err)
46+
return err
5547
}
5648

57-
clientset, err := BuildKubeClient()
49+
podName, port, err := setupPod(clientset, namespace, pvcName, sshKey)
5850
if err != nil {
5951
return err
6052
}
6153

62-
ctx := context.TODO()
63-
pvcClient := clientset.CoreV1().PersistentVolumeClaims(namespace)
64-
pvc, err := pvcClient.Get(ctx, pvcName, metav1.GetOptions{})
54+
if err := waitForPodReady(clientset, namespace, podName); err != nil {
55+
return err
56+
}
57+
58+
if err := setupPortForwarding(namespace, podName, port); err != nil {
59+
return err
60+
}
61+
62+
return mountPVCOverSSH(namespace, podName, port, localMountPoint, pvcName)
63+
}
64+
65+
func checkPVAccessMode(clientset *kubernetes.Clientset, pvc *corev1.PersistentVolumeClaim, namespace string) (bool, string, error) {
66+
pvName := pvc.Spec.VolumeName
67+
pv, err := clientset.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
6568
if err != nil {
66-
return fmt.Errorf("failed to get PVC: %v", err)
69+
return true, "", fmt.Errorf("failed to get PV: %v", err)
6770
}
6871

69-
if pvc.Status.Phase == corev1.ClaimBound {
70-
pvName := pvc.Spec.VolumeName
71-
pvClient := clientset.CoreV1().PersistentVolumes()
72-
pv, err := pvClient.Get(ctx, pvName, metav1.GetOptions{})
72+
// Assuming pv is now being checked for its AccessModes.
73+
if contains(pv.Spec.AccessModes, corev1.ReadWriteOnce) {
74+
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
7375
if err != nil {
74-
return fmt.Errorf("failed to get PV: %v", err)
76+
return true, "", fmt.Errorf("failed to list pods: %v", err)
7577
}
76-
77-
accessModes := pv.Spec.AccessModes
78-
for _, mode := range accessModes {
79-
if mode == corev1.ReadWriteOnce {
80-
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
81-
if err != nil {
82-
return fmt.Errorf("failed to list pods: %v", err)
78+
for _, pod := range podList.Items {
79+
for _, volume := range pod.Spec.Volumes {
80+
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name {
81+
return false, pod.Name, nil
8382
}
83+
}
84+
}
85+
}
86+
return true, "", nil
87+
}
8488

85-
for _, pod := range pods.Items {
86-
for _, volume := range pod.Spec.Volumes {
87-
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvcName {
88-
return fmt.Errorf("PVC %s is already in use by pod %s and cannot be mounted because it has RWO access mode", pvcName, pod.Name)
89-
}
90-
}
91-
}
89+
func contains(modes []corev1.PersistentVolumeAccessMode, modeToFind corev1.PersistentVolumeAccessMode) bool {
90+
for _, mode := range modes {
91+
if mode == modeToFind {
92+
return true
93+
}
94+
}
95+
return false
96+
}
97+
98+
func readSSHKey() (string, error) {
99+
sshKeyPath := fmt.Sprintf("%s/.ssh/id_rsa.pub", os.Getenv("HOME"))
100+
sshKey, err := ioutil.ReadFile(sshKeyPath)
101+
if err != nil {
102+
return "", fmt.Errorf("failed to read SSH public key: %v", err)
103+
}
104+
return string(sshKey), nil
105+
}
106+
107+
func checkPVCUsage(clientset *kubernetes.Clientset, namespace, pvcName string) (*corev1.PersistentVolumeClaim, error) {
108+
pvc, err := clientset.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
109+
if err != nil {
110+
return nil, fmt.Errorf("failed to get PVC: %v", err)
111+
}
112+
if pvc.Status.Phase != corev1.ClaimBound {
113+
return nil, fmt.Errorf("PVC %s is not bound", pvcName)
114+
}
115+
return pvc, nil
116+
}
117+
118+
func setupPod(clientset *kubernetes.Clientset, namespace, pvcName, sshKey string) (string, int, error) {
119+
podName, port := generatePodNameAndPort(pvcName)
120+
pod := createPodSpec(podName, port, pvcName, sshKey)
121+
if _, err := clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
122+
return "", 0, fmt.Errorf("failed to create pod: %v", err)
123+
}
124+
fmt.Printf("Pod %s created successfully\n", podName)
125+
return podName, port, nil
126+
}
127+
128+
func waitForPodReady(clientset *kubernetes.Clientset, namespace, podName string) error {
129+
return wait.PollImmediate(time.Second, 5*time.Minute, func() (bool, error) {
130+
pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
131+
if err != nil {
132+
return false, err
133+
}
134+
for _, cond := range pod.Status.Conditions {
135+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
136+
return true, nil
92137
}
93138
}
139+
return false, nil
140+
})
141+
}
142+
143+
func setupPortForwarding(namespace, podName string, port int) error {
144+
cmd := exec.Command("kubectl", "port-forward", fmt.Sprintf("pod/%s", podName), fmt.Sprintf("%d:22", port), "-n", namespace)
145+
cmd.Stdout = os.Stdout
146+
cmd.Stderr = os.Stderr
147+
if err := cmd.Start(); err != nil {
148+
return fmt.Errorf("failed to start port-forward: %v", err)
149+
}
150+
time.Sleep(5 * time.Second) // Wait a bit for the port forwarding to establish
151+
return nil
152+
}
153+
154+
func mountPVCOverSSH(namespace, podName string, port int, localMountPoint, pvcName string) error {
155+
sshfsCmd := exec.Command("sshfs", "-o", "StrictHostKeyChecking=no,UserKnownHostsFile=/dev/null", fmt.Sprintf("root@localhost:/volume"), localMountPoint, "-p", fmt.Sprintf("%d", port))
156+
sshfsCmd.Stdout = os.Stdout
157+
sshfsCmd.Stderr = os.Stderr
158+
if err := sshfsCmd.Run(); err != nil {
159+
return fmt.Errorf("failed to mount PVC using SSHFS: %v", err)
94160
}
161+
fmt.Printf("PVC %s mounted successfully to %s\n", pvcName, localMountPoint)
162+
return nil
163+
}
95164

96-
pod := &corev1.Pod{
165+
func generatePodNameAndPort(pvcName string) (string, int) {
166+
rand.Seed(time.Now().UnixNano())
167+
suffix := randSeq(5)
168+
podName := fmt.Sprintf("volume-exposer-%s", suffix)
169+
port := rand.Intn(64511) + 1024 // Generate a random port between 1024 and 65535
170+
return podName, port
171+
}
172+
173+
func createPodSpec(podName string, port int, pvcName, sshKey string) *corev1.Pod {
174+
return &corev1.Pod{
97175
ObjectMeta: metav1.ObjectMeta{
98176
Name: podName,
99177
Labels: map[string]string{
@@ -121,7 +199,7 @@ func Mount(namespace, pvcName, localMountPoint string) error {
121199
Env: []corev1.EnvVar{
122200
{
123201
Name: "SSH_KEY",
124-
Value: string(sshKey),
202+
Value: sshKey,
125203
},
126204
},
127205
},
@@ -138,54 +216,6 @@ func Mount(namespace, pvcName, localMountPoint string) error {
138216
},
139217
},
140218
}
141-
142-
podClient := clientset.CoreV1().Pods(namespace)
143-
144-
createdPod, err := podClient.Create(ctx, pod, metav1.CreateOptions{})
145-
if err != nil {
146-
return fmt.Errorf("failed to create pod: %v", err)
147-
}
148-
149-
fmt.Printf("Pod %s created successfully\n", createdPod.Name)
150-
151-
err = wait.PollImmediate(time.Second, 5*time.Minute, func() (bool, error) {
152-
pod, err := podClient.Get(ctx, podName, metav1.GetOptions{})
153-
if err != nil {
154-
return false, err
155-
}
156-
157-
for _, cond := range pod.Status.Conditions {
158-
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
159-
return true, nil
160-
}
161-
}
162-
163-
return false, nil
164-
})
165-
if err != nil {
166-
return fmt.Errorf("failed to wait for pod to be ready: %v", err)
167-
}
168-
169-
portForwardCmd := exec.Command("kubectl", "port-forward", fmt.Sprintf("pod/%s", podName), fmt.Sprintf("%d:22", port), "-n", namespace)
170-
portForwardCmd.Stdout = os.Stdout
171-
portForwardCmd.Stderr = os.Stderr
172-
if err := portForwardCmd.Start(); err != nil {
173-
return fmt.Errorf("failed to start port-forward: %v", err)
174-
}
175-
176-
time.Sleep(5 * time.Second)
177-
178-
sshfsCmd := exec.Command("sshfs", "-o", "StrictHostKeyChecking=no,UserKnownHostsFile=/dev/null", fmt.Sprintf("root@localhost:/volume"), localMountPoint, "-p", fmt.Sprintf("%d", port))
179-
sshfsCmd.Stdout = os.Stdout
180-
sshfsCmd.Stderr = os.Stderr
181-
sshfsCmd.Stdin = os.Stdin
182-
if err := sshfsCmd.Run(); err != nil {
183-
return fmt.Errorf("failed to mount PVC using SSHFS: %v", err)
184-
}
185-
186-
fmt.Printf("PVC %s mounted successfully to %s\n", pvcName, localMountPoint)
187-
188-
return nil
189219
}
190220

191221
func randSeq(n int) string {

pkg/plugin/utils.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package plugin
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"k8s.io/client-go/kubernetes"
8+
"k8s.io/client-go/tools/clientcmd"
9+
)
10+
11+
// BuildKubeClient creates a Kubernetes client from a kubeconfig
12+
func BuildKubeClient() (*kubernetes.Clientset, error) {
13+
kubeconfig := os.Getenv("KUBECONFIG")
14+
if kubeconfig == "" {
15+
home := os.Getenv("HOME")
16+
kubeconfig = fmt.Sprintf("%s/.kube/config", home)
17+
}
18+
19+
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
20+
if err != nil {
21+
return nil, fmt.Errorf("failed to build Kubernetes config: %v", err)
22+
}
23+
24+
clientset, err := kubernetes.NewForConfig(config)
25+
if err != nil {
26+
return nil, fmt.Errorf("failed to create Kubernetes client: %v", err)
27+
}
28+
29+
return clientset, nil
30+
}

0 commit comments

Comments
 (0)