From aef3926e6c295d97decc78f4ef3a5861d3669a44 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 13:05:37 +0000 Subject: [PATCH 01/20] feat: add atomic upload/delete api to neonvm-daemon --- neonvm-daemon/cmd/main.go | 92 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 3 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 4a675e688..b2806fb62 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "os" + "path/filepath" "strconv" "sync" "time" @@ -43,9 +44,10 @@ func main() { type cpuServer struct { // Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently // and ensure that status response is always actual - cpuOperationsMutex *sync.Mutex - cpuScaler *cpuscaling.CPUScaler - logger *zap.Logger + cpuOperationsMutex *sync.Mutex + cpuScaler *cpuscaling.CPUScaler + fileOperationsMutex *sync.Mutex + logger *zap.Logger } func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) { @@ -94,6 +96,78 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request) { + s.fileOperationsMutex.Lock() + defer s.fileOperationsMutex.Unlock() + + path := r.PathValue("path") + if !filepath.IsLocal(path) { + s.logger.Error("path is not local") + w.WriteHeader(http.StatusBadRequest) + return + } + path = filepath.Clean(filepath.Join("/var/sync", path)) + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + s.logger.Error("could not create directory", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + file, err := os.CreateTemp("", "") + if err != nil { + s.logger.Error("could not create file", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer file.Close() + defer os.Remove(file.Name()) + + if _, err := io.Copy(file, r.Body); err != nil { + s.logger.Error("could not read request body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + defer r.Body.Close() + + // rename is an atomic operation on unix. + // this ensures that the file is either not updated at all, + // or is updated entirely. + // + // this ensures that other processes reading the file will never + // have any inconsistencies. they will either read the old contents + // or the new contents. Any open files will still point to the old inode + // and thus still read the old contents. + if err := os.Rename(file.Name(), path); err != nil { + s.logger.Error("could not rename file", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (s *cpuServer) handleDeleteFile(w http.ResponseWriter, r *http.Request) { + s.fileOperationsMutex.Lock() + defer s.fileOperationsMutex.Unlock() + + path := r.PathValue("path") + if !filepath.IsLocal(path) { + s.logger.Error("path is not local") + w.WriteHeader(http.StatusBadRequest) + return + } + path = filepath.Clean(filepath.Join("/var/sync", path)) + + if err := os.Remove(path); err != nil { + s.logger.Error("could not delete file", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + func (s *cpuServer) run(addr string) { mux := http.NewServeMux() mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) { @@ -108,6 +182,18 @@ func (s *cpuServer) run(addr string) { w.WriteHeader(http.StatusNotFound) } }) + mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPut { + s.handleUploadFile(w, r) + return + } else if r.Method == http.MethodDelete { + s.handleDeleteFile(w, r) + return + } else { + // unknown method + w.WriteHeader(http.StatusNotFound) + } + }) timeout := 5 * time.Second server := http.Server{ From 8a57d570e097b7ad0f54fc07de8e9436b64072c0 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 13:39:56 +0000 Subject: [PATCH 02/20] add checksum --- neonvm-daemon/cmd/main.go | 71 ++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index b2806fb62..b29c9f296 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -1,6 +1,8 @@ package main import ( + "encoding/base64" + "errors" "flag" "fmt" "io" @@ -12,6 +14,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/crypto/blake2b" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling" @@ -96,17 +99,65 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (s *cpuServer) getFile(path string) (string, error) { + if !filepath.IsLocal(path) { + return "", errors.New("non-local path") + } + path = filepath.Clean(filepath.Join("/var/sync", path)) + return path, nil +} + +func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request) { + s.fileOperationsMutex.Lock() + defer s.fileOperationsMutex.Unlock() + + path, err := s.getFile(r.PathValue("path")) + if err != nil { + s.logger.Error("invalid file path", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + file, err := os.Open(path) + if err != nil { + s.logger.Error("could not open file", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + defer file.Close() + + hasher, err := blake2b.New256(nil) + if err != nil { + s.logger.Error("could not create hasher", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if _, err := io.Copy(hasher, file); err != nil { + s.logger.Error("could not read file", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + sum := hasher.Sum(nil) + sumBase64 := base64.RawStdEncoding.EncodeToString(sum) + if _, err := w.Write([]byte(sumBase64)); err != nil { + s.logger.Error("could not write response", zap.Error(err)) + } + + w.WriteHeader(http.StatusOK) +} + func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path := r.PathValue("path") - if !filepath.IsLocal(path) { - s.logger.Error("path is not local") + path, err := s.getFile(r.PathValue("path")) + if err != nil { + s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - path = filepath.Clean(filepath.Join("/var/sync", path)) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { s.logger.Error("could not create directory", zap.Error(err)) @@ -151,13 +202,12 @@ func (s *cpuServer) handleDeleteFile(w http.ResponseWriter, r *http.Request) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path := r.PathValue("path") - if !filepath.IsLocal(path) { - s.logger.Error("path is not local") + path, err := s.getFile(r.PathValue("path")) + if err != nil { + s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - path = filepath.Clean(filepath.Join("/var/sync", path)) if err := os.Remove(path); err != nil { s.logger.Error("could not delete file", zap.Error(err)) @@ -183,7 +233,10 @@ func (s *cpuServer) run(addr string) { } }) mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPut { + if r.Method == http.MethodGet { + s.handleGetFileChecksum(w, r) + return + } else if r.Method == http.MethodPut { s.handleUploadFile(w, r) return } else if r.Method == http.MethodDelete { From d2444625c6bd990ad6a92550fdb76b8a7b8b6eda Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 15:56:55 +0000 Subject: [PATCH 03/20] add file reconciler to neonvm-runner --- neonvm-daemon/cmd/main.go | 36 +++-------- neonvm-runner/cmd/main.go | 133 ++++++++++++++++++++++++++++++++++++++ pkg/util/checksum.go | 32 +++++++++ 3 files changed, 175 insertions(+), 26 deletions(-) create mode 100644 pkg/util/checksum.go diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index b29c9f296..a776a9f9d 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -1,7 +1,6 @@ package main import ( - "encoding/base64" "errors" "flag" "fmt" @@ -14,10 +13,10 @@ import ( "time" "go.uber.org/zap" - "golang.org/x/crypto/blake2b" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling" + "github.com/neondatabase/autoscaling/pkg/util" ) func main() { @@ -37,9 +36,10 @@ func main() { logger.Info("Starting neonvm-daemon", zap.String("addr", *addr)) srv := cpuServer{ - cpuOperationsMutex: &sync.Mutex{}, - cpuScaler: cpuscaling.NewCPUScaler(), - logger: logger.Named("cpu-srv"), + cpuOperationsMutex: &sync.Mutex{}, + cpuScaler: cpuscaling.NewCPUScaler(), + fileOperationsMutex: &sync.Mutex{}, + logger: logger.Named("cpu-srv"), } srv.run(*addr) } @@ -103,7 +103,7 @@ func (s *cpuServer) getFile(path string) (string, error) { if !filepath.IsLocal(path) { return "", errors.New("non-local path") } - path = filepath.Clean(filepath.Join("/var/sync", path)) + path = filepath.Clean(filepath.Join("var", "sync", path)) return path, nil } @@ -118,30 +118,14 @@ func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request return } - file, err := os.Open(path) + checksum, err := util.ChecksumFile(path) if err != nil { - s.logger.Error("could not open file", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer file.Close() - - hasher, err := blake2b.New256(nil) - if err != nil { - s.logger.Error("could not create hasher", zap.Error(err)) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if _, err := io.Copy(hasher, file); err != nil { - s.logger.Error("could not read file", zap.Error(err)) + s.logger.Error("could not checksum file", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - sum := hasher.Sum(nil) - sumBase64 := base64.RawStdEncoding.EncodeToString(sum) - if _, err := w.Write([]byte(sumBase64)); err != nil { + if _, err := w.Write([]byte(checksum)); err != nil { s.logger.Error("could not write response", zap.Error(err)) } @@ -209,7 +193,7 @@ func (s *cpuServer) handleDeleteFile(w http.ResponseWriter, r *http.Request) { return } - if err := os.Remove(path); err != nil { + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { s.logger.Error("could not delete file", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) return diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 0b60e12da..3d816ba4a 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -23,6 +23,7 @@ import ( "time" "github.com/digitalocean/go-qemu/qmp" + "github.com/fsnotify/fsnotify" "github.com/jpillora/backoff" "github.com/samber/lo" "go.uber.org/zap" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) @@ -47,6 +49,8 @@ const ( bufferedReaderSize = 4096 ) +var synchronisedFiles = map[string]string{} + func checkKVM() bool { info, err := os.Stat("/dev/kvm") if err != nil { @@ -504,6 +508,8 @@ func runQEMU( go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring) wg.Add(1) go forwardLogs(ctx, logger, &wg) + wg.Add(1) + go monitorFiles(ctx, logger, &wg) qemuBin := getQemuBinaryName(cfg.architecture) var bin string @@ -665,6 +671,59 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { } } +// monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon. +func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { + defer wg.Done() + + notify, err := fsnotify.NewBufferedWatcher(2) + if err != nil { + logger.Error("failed to create inotify instance", zap.Error(err)) + return + } + + for k := range synchronisedFiles { + if err := notify.Add(k); err != nil { + logger.Error("failed to add file to inotify instance", zap.Error(err)) + } + } + + for { + select { + case <-ctx.Done(): + return + case event := <-notify.Events: + guestPath, ok := synchronisedFiles[event.Name] + if !ok { + // not tracking this file + continue + } + + if err := sendFileToNeonvmDaemon(event.Name, guestPath); err != nil { + logger.Error("failed to upload file to vm guest", zap.Error(err)) + } + case <-time.After(5 * time.Minute): + for hostpath, guestpath := range synchronisedFiles { + hostsum, err := util.ChecksumFile(hostpath) + if err != nil { + logger.Error("failed to get file checksum from host", zap.Error(err)) + } + + guestsum, err := getFileChecksumFromNeonvmDaemon(guestpath) + if err != nil { + logger.Error("failed to get file checksum from guest", zap.Error(err)) + } + + if guestsum != hostsum { + if err = sendFileToNeonvmDaemon(hostpath, guestsum); err != nil { + logger.Error("failed to upload file to vm guest", zap.Error(err)) + } + } + } + continue + } + } +} + func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { logger = logger.Named("terminate-qemu-on-sigterm") @@ -753,3 +812,77 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error { return nil } + +func sendFileToNeonvmDaemon(hostpath, guestpath string) error { + _, vmIP, _, err := calcIPs(defaultNetworkCIDR) + if err != nil { + return fmt.Errorf("could not calculate VM IP address: %w", err) + } + + file, err := os.Open(hostpath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("could not open file: %w", err) + } + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) + + var req *http.Request + if err != nil && os.IsNotExist(err) { + req, err = http.NewRequestWithContext(ctx, http.MethodDelete, url, http.NoBody) + } else { + req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, file) + } + + if err != nil { + return fmt.Errorf("could not build request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("could not send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode) + } + + return nil +} + +func getFileChecksumFromNeonvmDaemon(guestpath string) (string, error) { + _, vmIP, _, err := calcIPs(defaultNetworkCIDR) + if err != nil { + return "", fmt.Errorf("could not calculate VM IP address: %w", err) + } + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) + if err != nil { + return "", fmt.Errorf("could not build request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("could not send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return "", fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode) + } + + checksum, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("could not read response: %w", err) + } + + return string(checksum), nil +} diff --git a/pkg/util/checksum.go b/pkg/util/checksum.go new file mode 100644 index 000000000..a34fd67b5 --- /dev/null +++ b/pkg/util/checksum.go @@ -0,0 +1,32 @@ +package util + +import ( + "encoding/base64" + "io" + "os" + + "golang.org/x/crypto/blake2b" +) + +func ChecksumFile(path string) (string, error) { + file, err := os.Open(path) + if err != nil && os.IsNotExist(err) { + return "", nil + } else if err != nil { + return "", err + } + defer file.Close() + + hasher, err := blake2b.New256(nil) + if err != nil { + return "", err + } + + if _, err := io.Copy(hasher, file); err != nil { + return "", err + } + + sum := hasher.Sum(nil) + sumBase64 := base64.RawStdEncoding.EncodeToString(sum) + return sumBase64, nil +} From 74a867210273dd6d3c1303dc96d9e6486e80d713 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 16:14:44 +0000 Subject: [PATCH 04/20] don't recreate the timer --- neonvm-runner/cmd/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 3d816ba4a..470ca7529 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -687,6 +687,8 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { } } + ticker := time.After(5 * time.Minute) + for { select { case <-ctx.Done(): @@ -701,7 +703,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { if err := sendFileToNeonvmDaemon(event.Name, guestPath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) } - case <-time.After(5 * time.Minute): + case <-ticker: for hostpath, guestpath := range synchronisedFiles { hostsum, err := util.ChecksumFile(hostpath) if err != nil { From 163f109de712611c467ac7b694f7503ca5f4298a Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 16:57:52 +0000 Subject: [PATCH 05/20] add e2e tests --- go.mod | 2 +- neonvm-runner/cmd/disks.go | 10 ++++- neonvm-runner/cmd/main.go | 20 ++++++++-- tests/e2e/vm-secret-sync/00-assert.yaml | 22 +++++++++++ tests/e2e/vm-secret-sync/00-create-vm.yaml | 39 +++++++++++++++++++ tests/e2e/vm-secret-sync/01-assert.yaml | 9 +++++ .../e2e/vm-secret-sync/01-update-secret.yaml | 17 ++++++++ 7 files changed, 113 insertions(+), 6 deletions(-) create mode 100644 tests/e2e/vm-secret-sync/00-assert.yaml create mode 100644 tests/e2e/vm-secret-sync/00-create-vm.yaml create mode 100644 tests/e2e/vm-secret-sync/01-assert.yaml create mode 100644 tests/e2e/vm-secret-sync/01-update-secret.yaml diff --git a/go.mod b/go.mod index 325f02a43..d362cc11a 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/docker/cli v25.0.3+incompatible github.com/docker/docker v24.0.9+incompatible github.com/docker/libnetwork v0.8.0-dev.2.0.20210525090646-64b7a4574d14 + github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.1 github.com/go-logr/zapr v1.3.0 github.com/jpillora/backoff v1.0.0 @@ -127,7 +128,6 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/neonvm-runner/cmd/disks.go b/neonvm-runner/cmd/disks.go index 46e21378a..9edbd21b6 100644 --- a/neonvm-runner/cmd/disks.go +++ b/neonvm-runner/cmd/disks.go @@ -202,7 +202,7 @@ func createISO9660runtime( mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mount %s $(/neonvm/bin/blkid -L %s) %s`, opts, disk.Name, disk.MountPath)) // Note: chmod must be after mount, otherwise it gets overwritten by mount. mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/chmod 0777 %s`, disk.MountPath)) - case disk.ConfigMap != nil || disk.Secret != nil: + case disk.ConfigMap != nil || (disk.Secret != nil && !secretNeedsSynchronisation(disk.MountPath)): mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mount -t iso9660 -o ro,mode=0644 $(/neonvm/bin/blkid -L %s) %s`, disk.Name, disk.MountPath)) case disk.Tmpfs != nil: mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/chmod 0777 %s`, disk.MountPath)) @@ -274,6 +274,14 @@ func createISO9660runtime( return nil } +func secretNeedsSynchronisation(path string) bool { + rel, err := filepath.Rel("/var/sync", path) + if err != nil { + return false + } + return filepath.IsLocal(rel) +} + func calcDirUsage(dirPath string) (int64, error) { stat, err := os.Lstat(dirPath) if err != nil { diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 470ca7529..7ad736c98 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -15,6 +15,7 @@ import ( "os" "os/exec" "os/signal" + "path/filepath" "runtime" "strings" "sync" @@ -49,8 +50,6 @@ const ( bufferedReaderSize = 4096 ) -var synchronisedFiles = map[string]string{} - func checkKVM() bool { info, err := os.Stat("/dev/kvm") if err != nil { @@ -509,7 +508,7 @@ func runQEMU( wg.Add(1) go forwardLogs(ctx, logger, &wg) wg.Add(1) - go monitorFiles(ctx, logger, &wg) + go monitorFiles(ctx, logger, &wg, vmSpec.Disks) qemuBin := getQemuBinaryName(cfg.architecture) var bin string @@ -672,7 +671,7 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { } // monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon. -func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { +func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) { defer wg.Done() notify, err := fsnotify.NewBufferedWatcher(2) @@ -681,6 +680,19 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { return } + synchronisedFiles := make(map[string]string) + for _, disk := range disks { + if disk.Secret != nil && secretNeedsSynchronisation(disk.MountPath) { + rel, _ := filepath.Rel("/var/sync", disk.MountPath) + root := fmt.Sprintf("/vm/mounts%s", disk.MountPath) + for _, item := range disk.Secret.Items { + root := filepath.Join(root, item.Path) + rel := filepath.Join(rel, item.Path) + synchronisedFiles[root] = rel + } + } + } + for k := range synchronisedFiles { if err := notify.Add(k); err != nil { logger.Error("failed to add file to inotify instance", zap.Error(err)) diff --git a/tests/e2e/vm-secret-sync/00-assert.yaml b/tests/e2e/vm-secret-sync/00-assert.yaml new file mode 100644 index 000000000..55afba5f8 --- /dev/null +++ b/tests/e2e/vm-secret-sync/00-assert.yaml @@ -0,0 +1,22 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +commands: + - script: | + set -eux + pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile + grep -q "hello world" testfile +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 250m + memorySize: 1Gi diff --git a/tests/e2e/vm-secret-sync/00-create-vm.yaml b/tests/e2e/vm-secret-sync/00-create-vm.yaml new file mode 100644 index 000000000..e33356501 --- /dev/null +++ b/tests/e2e/vm-secret-sync/00-create-vm.yaml @@ -0,0 +1,39 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Secret +metadata: + name: example-secret +data: + foo: aGVsbG8gd29ybGQ= +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +spec: + schedulerName: autoscale-scheduler + enableSSH: true + guest: + cpus: + min: 0.25 + use: 0.25 + max: 0.25 + memorySlotSize: 1Gi + memorySlots: + min: 1 + use: 1 + max: 1 + rootDisk: + image: vm-postgres:15-bullseye + size: 1Gi + disks: + - secret: + secretName: example-secret + items: + - key: foo + path: foo + mountPath: /var/sync/example + name: secret-foo diff --git a/tests/e2e/vm-secret-sync/01-assert.yaml b/tests/e2e/vm-secret-sync/01-assert.yaml new file mode 100644 index 000000000..2fc4d083c --- /dev/null +++ b/tests/e2e/vm-secret-sync/01-assert.yaml @@ -0,0 +1,9 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +commands: + - script: | + set -eux + pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile + grep -q "goodbye world" testfile diff --git a/tests/e2e/vm-secret-sync/01-update-secret.yaml b/tests/e2e/vm-secret-sync/01-update-secret.yaml new file mode 100644 index 000000000..8aaad414b --- /dev/null +++ b/tests/e2e/vm-secret-sync/01-update-secret.yaml @@ -0,0 +1,17 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +commands: + - script: | + set -eux + pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + new_size=$(( 1 * 1024 * 1024 )) # 1 Gi + mountpoint="/var/db/postgres/compute" + kubectl exec -n "$NAMESPACE" "$pod" -- ssh guest-vm /neonvm/bin/set-disk-quota "$new_size" "$mountpoint" +--- +apiVersion: v1 +kind: Secret +metadata: + name: example-secret +data: + foo: Z29vZGJ5ZSB3b3JsZA== From f5afb7b232d119a465e81d3082d31e7ec80ec06c Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 17:25:29 +0000 Subject: [PATCH 06/20] sync early --- neonvm-runner/cmd/main.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 7ad736c98..399099dee 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -693,12 +693,18 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d } } - for k := range synchronisedFiles { + for k, _ := range synchronisedFiles { if err := notify.Add(k); err != nil { logger.Error("failed to add file to inotify instance", zap.Error(err)) } } + for hostpath, guestpath := range synchronisedFiles { + if err := sendFileToNeonvmDaemon(hostpath, guestpath); err != nil { + logger.Error("failed to upload file to vm guest", zap.Error(err)) + } + } + ticker := time.After(5 * time.Minute) for { From edfcc356c19f42513edd40ec76dd36c25821527b Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 18:18:37 +0000 Subject: [PATCH 07/20] wait before uploading --- neonvm-daemon/cmd/main.go | 3 +-- neonvm-runner/cmd/main.go | 10 +++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index a776a9f9d..6c558f7b3 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -125,11 +125,10 @@ func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request return } + w.WriteHeader(http.StatusOK) if _, err := w.Write([]byte(checksum)); err != nil { s.logger.Error("could not write response", zap.Error(err)) } - - w.WriteHeader(http.StatusOK) } func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request) { diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 399099dee..e3a7e1b03 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -693,12 +693,20 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d } } - for k, _ := range synchronisedFiles { + for k := range synchronisedFiles { if err := notify.Add(k); err != nil { logger.Error("failed to add file to inotify instance", zap.Error(err)) } } + // Wait a bit to reduce the chance we attempt dialing before + // QEMU is started + select { + case <-time.After(200 * time.Millisecond): + case <-ctx.Done(): + logger.Warn("QEMU shut down too soon to start forwarding logs") + } + for hostpath, guestpath := range synchronisedFiles { if err := sendFileToNeonvmDaemon(hostpath, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) From cdb39eca5accce6183f113aa87cd88a8578904aa Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 27 Jan 2025 19:18:26 +0000 Subject: [PATCH 08/20] better logging --- neonvm-daemon/cmd/main.go | 3 +-- neonvm-runner/cmd/main.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 6c558f7b3..d7dbcebf5 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -1,7 +1,6 @@ package main import ( - "errors" "flag" "fmt" "io" @@ -101,7 +100,7 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { func (s *cpuServer) getFile(path string) (string, error) { if !filepath.IsLocal(path) { - return "", errors.New("non-local path") + return "", fmt.Errorf("\"%s\" is not a local path", path) } path = filepath.Clean(filepath.Join("var", "sync", path)) return path, nil diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index e3a7e1b03..c08959f19 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -702,7 +702,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d // Wait a bit to reduce the chance we attempt dialing before // QEMU is started select { - case <-time.After(200 * time.Millisecond): + case <-time.After(1 * time.Second): case <-ctx.Done(): logger.Warn("QEMU shut down too soon to start forwarding logs") } From 9a3b2adfb2dfaa09ea45c5d78ec6b42a2aeda42f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 28 Jan 2025 12:47:49 +0000 Subject: [PATCH 09/20] fixes from testing locally --- neonvm-runner/cmd/main.go | 49 ++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index c08959f19..67c856d96 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -679,6 +679,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d logger.Error("failed to create inotify instance", zap.Error(err)) return } + defer notify.Close() synchronisedFiles := make(map[string]string) for _, disk := range disks { @@ -686,9 +687,9 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d rel, _ := filepath.Rel("/var/sync", disk.MountPath) root := fmt.Sprintf("/vm/mounts%s", disk.MountPath) for _, item := range disk.Secret.Items { - root := filepath.Join(root, item.Path) - rel := filepath.Join(rel, item.Path) - synchronisedFiles[root] = rel + hostpath := filepath.Join(root, item.Path) + guestpath := filepath.Join(rel, item.Path) + synchronisedFiles[hostpath] = guestpath } } } @@ -701,48 +702,54 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d // Wait a bit to reduce the chance we attempt dialing before // QEMU is started - select { - case <-time.After(1 * time.Second): - case <-ctx.Done(): - logger.Warn("QEMU shut down too soon to start forwarding logs") - } + success := false + for !success { + success = true + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + logger.Warn("QEMU shut down too soon to start forwarding logs") + } - for hostpath, guestpath := range synchronisedFiles { - if err := sendFileToNeonvmDaemon(hostpath, guestpath); err != nil { - logger.Error("failed to upload file to vm guest", zap.Error(err)) + for hostpath, guestpath := range synchronisedFiles { + if err := sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { + success = false + logger.Error("failed to upload file to vm guest", zap.Error(err)) + } } } - ticker := time.After(5 * time.Minute) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() for { select { case <-ctx.Done(): return case event := <-notify.Events: - guestPath, ok := synchronisedFiles[event.Name] + guestpath, ok := synchronisedFiles[event.Name] if !ok { // not tracking this file continue } - if err := sendFileToNeonvmDaemon(event.Name, guestPath); err != nil { + if err := sendFileToNeonvmDaemon(ctx, event.Name, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) } - case <-ticker: + case <-ticker.C: for hostpath, guestpath := range synchronisedFiles { hostsum, err := util.ChecksumFile(hostpath) if err != nil { logger.Error("failed to get file checksum from host", zap.Error(err)) } - guestsum, err := getFileChecksumFromNeonvmDaemon(guestpath) + guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath) if err != nil { logger.Error("failed to get file checksum from guest", zap.Error(err)) } if guestsum != hostsum { - if err = sendFileToNeonvmDaemon(hostpath, guestsum); err != nil { + if err = sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) } } @@ -841,7 +848,7 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error { return nil } -func sendFileToNeonvmDaemon(hostpath, guestpath string) error { +func sendFileToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error { _, vmIP, _, err := calcIPs(defaultNetworkCIDR) if err != nil { return fmt.Errorf("could not calculate VM IP address: %w", err) @@ -852,7 +859,7 @@ func sendFileToNeonvmDaemon(hostpath, guestpath string) error { return fmt.Errorf("could not open file: %w", err) } - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) @@ -881,13 +888,13 @@ func sendFileToNeonvmDaemon(hostpath, guestpath string) error { return nil } -func getFileChecksumFromNeonvmDaemon(guestpath string) (string, error) { +func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (string, error) { _, vmIP, _, err := calcIPs(defaultNetworkCIDR) if err != nil { return "", fmt.Errorf("could not calculate VM IP address: %w", err) } - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) From 4148a357d54fee62e4fb81a723de59b824425940 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 28 Jan 2025 14:36:36 +0000 Subject: [PATCH 10/20] fix tests --- neonvm-daemon/cmd/main.go | 3 ++- neonvm-runner/cmd/main.go | 14 ++++++++++++++ tests/e2e/vm-secret-sync/00-assert.yaml | 5 +++-- tests/e2e/vm-secret-sync/01-assert.yaml | 5 +++-- tests/e2e/vm-secret-sync/01-update-secret.yaml | 7 ------- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index d7dbcebf5..54a0869f8 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -102,7 +102,8 @@ func (s *cpuServer) getFile(path string) (string, error) { if !filepath.IsLocal(path) { return "", fmt.Errorf("\"%s\" is not a local path", path) } - path = filepath.Clean(filepath.Join("var", "sync", path)) + //nolint:gocritic // filepathJoin lint wrongly complains about path separators + path = filepath.Clean(filepath.Join("/var/sync", path)) return path, nil } diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 67c856d96..d28740389 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -732,6 +732,20 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d // not tracking this file continue } + if event.Op == fsnotify.Chmod { + // not interesting. + continue + } + + // kubernetes secrets are mounted as symbolic links. + // When the link changes, there's no event. We only see the deletion + // of the file the link used to point to. + // This doesn't mean the file was actually deleted though. + if event.Op == fsnotify.Remove { + if err := notify.Add(event.Name); err != nil { + logger.Error("failed to add file to inotify instance", zap.Error(err)) + } + } if err := sendFileToNeonvmDaemon(ctx, event.Name, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) diff --git a/tests/e2e/vm-secret-sync/00-assert.yaml b/tests/e2e/vm-secret-sync/00-assert.yaml index 55afba5f8..23d71c8ac 100644 --- a/tests/e2e/vm-secret-sync/00-assert.yaml +++ b/tests/e2e/vm-secret-sync/00-assert.yaml @@ -1,12 +1,13 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 90 +timeout: 70 commands: - script: | set -eux pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- grep -q "hello world" /vm/mounts/var/sync/example/foo kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile - grep -q "hello world" testfile + kubectl exec -n "$NAMESPACE" $pod -- grep -q "hello world" testfile --- apiVersion: vm.neon.tech/v1 kind: VirtualMachine diff --git a/tests/e2e/vm-secret-sync/01-assert.yaml b/tests/e2e/vm-secret-sync/01-assert.yaml index 2fc4d083c..4bf904533 100644 --- a/tests/e2e/vm-secret-sync/01-assert.yaml +++ b/tests/e2e/vm-secret-sync/01-assert.yaml @@ -1,9 +1,10 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 90 +timeout: 70 commands: - script: | set -eux pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- grep -q "goodbye world" /vm/mounts/var/sync/example/foo kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile - grep -q "goodbye world" testfile + kubectl exec -n "$NAMESPACE" $pod -- grep -q "goodbye world" testfile diff --git a/tests/e2e/vm-secret-sync/01-update-secret.yaml b/tests/e2e/vm-secret-sync/01-update-secret.yaml index 8aaad414b..5b60678d7 100644 --- a/tests/e2e/vm-secret-sync/01-update-secret.yaml +++ b/tests/e2e/vm-secret-sync/01-update-secret.yaml @@ -1,13 +1,6 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep unitTest: false -commands: - - script: | - set -eux - pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" - new_size=$(( 1 * 1024 * 1024 )) # 1 Gi - mountpoint="/var/db/postgres/compute" - kubectl exec -n "$NAMESPACE" "$pod" -- ssh guest-vm /neonvm/bin/set-disk-quota "$new_size" "$mountpoint" --- apiVersion: v1 kind: Secret From 557f465d9b629cfb41fcfe329f213451287a1229 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 3 Feb 2025 14:28:19 +0000 Subject: [PATCH 11/20] refactor to track entire secret dir --- neonvm-daemon/cmd/main.go | 94 +++++++++++++++++++++------------------ neonvm-runner/cmd/main.go | 93 +++++++++++++++++++++----------------- pkg/util/checksum.go | 58 ++++++++++++++++++++---- 3 files changed, 153 insertions(+), 92 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 54a0869f8..0d813c720 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -1,6 +1,8 @@ package main import ( + "encoding/base64" + "encoding/json" "flag" "fmt" "io" @@ -12,6 +14,7 @@ import ( "time" "go.uber.org/zap" + k8sutil "k8s.io/kubernetes/pkg/volume/util" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling" @@ -107,20 +110,21 @@ func (s *cpuServer) getFile(path string) (string, error) { return path, nil } -func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request) { +func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getFile(r.PathValue("path")) + path, err := s.getFile(path) if err != nil { s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - checksum, err := util.ChecksumFile(path) + dir := filepath.Join(path, "..data") + checksum, err := util.ChecksumFlatDir(dir) if err != nil { - s.logger.Error("could not checksum file", zap.Error(err)) + s.logger.Error("could not checksum dir", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } @@ -131,69 +135,75 @@ func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request } } -func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request) { +type File struct { + // base64 encoded file contents + Data string `json:"data"` +} + +func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getFile(r.PathValue("path")) + path, err := s.getFile(path) if err != nil { s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { - s.logger.Error("could not create directory", zap.Error(err)) - w.WriteHeader(http.StatusInternalServerError) + if r.Body == nil { + s.logger.Error("no body") + w.WriteHeader(http.StatusBadRequest) return } + defer r.Body.Close() - file, err := os.CreateTemp("", "") + body, err := io.ReadAll(r.Body) if err != nil { - s.logger.Error("could not create file", zap.Error(err)) - w.WriteHeader(http.StatusInternalServerError) + s.logger.Error("could not ready body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) return } - defer file.Close() - defer os.Remove(file.Name()) - if _, err := io.Copy(file, r.Body); err != nil { - s.logger.Error("could not read request body", zap.Error(err)) + var files map[string]File + if err := json.Unmarshal(body, &files); err != nil { + s.logger.Error("could not ready body", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - defer r.Body.Close() - // rename is an atomic operation on unix. - // this ensures that the file is either not updated at all, - // or is updated entirely. - // - // this ensures that other processes reading the file will never - // have any inconsistencies. they will either read the old contents - // or the new contents. Any open files will still point to the old inode - // and thus still read the old contents. - if err := os.Rename(file.Name(), path); err != nil { - s.logger.Error("could not rename file", zap.Error(err)) + payload := make(map[string]k8sutil.FileProjection) + for k, v := range files { + data, err := base64.StdEncoding.DecodeString(v.Data) + if err != nil { + s.logger.Error("could not ready body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + payload[k] = k8sutil.FileProjection{ + Data: data, + // read-write by root + // read-only otherwise + Mode: 0o644, + FsUser: nil, + } + } + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + s.logger.Error("could not create directory", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) return } - w.WriteHeader(http.StatusOK) -} - -func (s *cpuServer) handleDeleteFile(w http.ResponseWriter, r *http.Request) { - s.fileOperationsMutex.Lock() - defer s.fileOperationsMutex.Unlock() - - path, err := s.getFile(r.PathValue("path")) + aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon") if err != nil { - s.logger.Error("invalid file path", zap.Error(err)) + s.logger.Error("could not create writer", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - s.logger.Error("could not delete file", zap.Error(err)) + if err := aw.Write(payload, nil); err != nil { + s.logger.Error("could not create files", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) return } @@ -216,14 +226,12 @@ func (s *cpuServer) run(addr string) { } }) mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { + path := r.PathValue("path") if r.Method == http.MethodGet { - s.handleGetFileChecksum(w, r) + s.handleGetFileChecksum(w, path) return } else if r.Method == http.MethodPut { - s.handleUploadFile(w, r) - return - } else if r.Method == http.MethodDelete { - s.handleDeleteFile(w, r) + s.handleUploadFile(w, r, path) return } else { // unknown method diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index d28740389..ceed54188 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -34,6 +34,7 @@ import ( vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" + corev1 "k8s.io/api/core/v1" ) const ( @@ -674,52 +675,52 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) { defer wg.Done() - notify, err := fsnotify.NewBufferedWatcher(2) + notify, err := fsnotify.NewWatcher() if err != nil { logger.Error("failed to create inotify instance", zap.Error(err)) return } defer notify.Close() - synchronisedFiles := make(map[string]string) + secrets := make(map[string][]corev1.KeyToPath) + // watched := make(map[string]string) + watched2 := make(map[string]string) for _, disk := range disks { if disk.Secret != nil && secretNeedsSynchronisation(disk.MountPath) { rel, _ := filepath.Rel("/var/sync", disk.MountPath) root := fmt.Sprintf("/vm/mounts%s", disk.MountPath) - for _, item := range disk.Secret.Items { - hostpath := filepath.Join(root, item.Path) - guestpath := filepath.Join(rel, item.Path) - synchronisedFiles[hostpath] = guestpath + + if err := notify.Add(filepath.Join(root, "..data")); err != nil { + logger.Error("failed to add file to inotify instance", zap.Error(err)) } - } - } + watched2[filepath.Join(root, "..data")] = rel - for k := range synchronisedFiles { - if err := notify.Add(k); err != nil { - logger.Error("failed to add file to inotify instance", zap.Error(err)) + secrets[root] = disk.Secret.Items } } // Wait a bit to reduce the chance we attempt dialing before // QEMU is started - success := false - for !success { - success = true + for { select { case <-time.After(1 * time.Second): case <-ctx.Done(): logger.Warn("QEMU shut down too soon to start forwarding logs") } - for hostpath, guestpath := range synchronisedFiles { - if err := sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { + success := true + for hostpath, guestpath := range watched2 { + if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { success = false logger.Error("failed to upload file to vm guest", zap.Error(err)) } } + if success { + break + } } - ticker := time.NewTicker(1 * time.Minute) + ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { @@ -727,32 +728,31 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d case <-ctx.Done(): return case event := <-notify.Events: - guestpath, ok := synchronisedFiles[event.Name] + guestpath, ok := watched2[event.Name] if !ok { // not tracking this file continue } - if event.Op == fsnotify.Chmod { - // not interesting. - continue - } // kubernetes secrets are mounted as symbolic links. // When the link changes, there's no event. We only see the deletion // of the file the link used to point to. // This doesn't mean the file was actually deleted though. - if event.Op == fsnotify.Remove { - if err := notify.Add(event.Name); err != nil { - logger.Error("failed to add file to inotify instance", zap.Error(err)) - } + if event.Op != fsnotify.Remove { + continue } - if err := sendFileToNeonvmDaemon(ctx, event.Name, guestpath); err != nil { + logger.Info("mounted secrets changed", zap.String("secret_path", event.Name)) + if err := notify.Add(event.Name); err != nil { + logger.Error("failed to add file to inotify instance", zap.Error(err)) + } + + if err := sendFilesToNeonvmDaemon(ctx, event.Name, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) } case <-ticker.C: - for hostpath, guestpath := range synchronisedFiles { - hostsum, err := util.ChecksumFile(hostpath) + for hostpath, guestpath := range watched2 { + hostsum, err := util.ChecksumFlatDir(hostpath) if err != nil { logger.Error("failed to get file checksum from host", zap.Error(err)) } @@ -763,7 +763,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d } if guestsum != hostsum { - if err = sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { + if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { logger.Error("failed to upload file to vm guest", zap.Error(err)) } } @@ -862,34 +862,47 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error { return nil } -func sendFileToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error { +type File struct { + // base64 encoded file contents + Data string `json:"data"` +} + +func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error { _, vmIP, _, err := calcIPs(defaultNetworkCIDR) if err != nil { return fmt.Errorf("could not calculate VM IP address: %w", err) } - file, err := os.Open(hostpath) + files, err := util.ReadAllFiles(hostpath) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("could not open file: %w", err) } + dto := make(map[string]File) + for k, v := range files { + dto[k] = File{Data: base64.StdEncoding.EncodeToString(v)} + } + body, err := json.Marshal(dto) + if err != nil { + return fmt.Errorf("could not encode files: %w", err) + } + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) - var req *http.Request - if err != nil && os.IsNotExist(err) { - req, err = http.NewRequestWithContext(ctx, http.MethodDelete, url, http.NoBody) - } else { - req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, file) - } - + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) if err != nil { return fmt.Errorf("could not build request: %w", err) } - resp, err := http.DefaultClient.Do(req) + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("could not send request: %w", err) } diff --git a/pkg/util/checksum.go b/pkg/util/checksum.go index a34fd67b5..e6f9c4e9d 100644 --- a/pkg/util/checksum.go +++ b/pkg/util/checksum.go @@ -2,31 +2,71 @@ package util import ( "encoding/base64" - "io" + "encoding/binary" "os" + "path/filepath" + "sort" "golang.org/x/crypto/blake2b" ) -func ChecksumFile(path string) (string, error) { - file, err := os.Open(path) - if err != nil && os.IsNotExist(err) { - return "", nil - } else if err != nil { +// Calculate the checksum over all files in a directory, assuming the directory is flat (contains no subdirs). +func ChecksumFlatDir(path string) (string, error) { + files, err := ReadAllFiles(path) + if err != nil { return "", err } - defer file.Close() + + // sort the file names for a reproducible hash + var keys []string + for k := range files { + keys = append(keys, k) + } + sort.Strings(keys) hasher, err := blake2b.New256(nil) if err != nil { return "", err } - if _, err := io.Copy(hasher, file); err != nil { - return "", err + for _, filename := range keys { + data := files[filename] + var length []byte + + // hash as "{name}\0{len(data)}{data}" + // this prevents any possible hash confusion problems + hasher.Write([]byte(filename)) + hasher.Write([]byte{0}) + hasher.Write(binary.LittleEndian.AppendUint64(length, uint64(len(data)))) + hasher.Write(data) } sum := hasher.Sum(nil) sumBase64 := base64.RawStdEncoding.EncodeToString(sum) return sumBase64, nil } + +// Read all files in a directory, assuming the directory is flat (contains no subdirs). +func ReadAllFiles(path string) (map[string][]byte, error) { + entries, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + output := make(map[string][]byte) + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + data, err := os.ReadFile(filepath.Join(path, entry.Name())) + if err != nil { + return nil, err + } + + output[entry.Name()] = data + } + + return output, nil +} From da908b43f88afedd85bdee970ab244155abe808f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 3 Feb 2025 15:36:10 +0000 Subject: [PATCH 12/20] increate timeout for test --- tests/e2e/vm-secret-sync/00-create-vm.yaml | 3 --- tests/e2e/vm-secret-sync/01-assert.yaml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/e2e/vm-secret-sync/00-create-vm.yaml b/tests/e2e/vm-secret-sync/00-create-vm.yaml index e33356501..5a80fe357 100644 --- a/tests/e2e/vm-secret-sync/00-create-vm.yaml +++ b/tests/e2e/vm-secret-sync/00-create-vm.yaml @@ -32,8 +32,5 @@ spec: disks: - secret: secretName: example-secret - items: - - key: foo - path: foo mountPath: /var/sync/example name: secret-foo diff --git a/tests/e2e/vm-secret-sync/01-assert.yaml b/tests/e2e/vm-secret-sync/01-assert.yaml index 4bf904533..0789da534 100644 --- a/tests/e2e/vm-secret-sync/01-assert.yaml +++ b/tests/e2e/vm-secret-sync/01-assert.yaml @@ -1,6 +1,6 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 70 +timeout: 120 commands: - script: | set -eux From 26f66b5d876c58fc434a97f8d2a6a25a823b8f21 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 3 Feb 2025 16:13:00 +0000 Subject: [PATCH 13/20] fix lints --- neonvm-daemon/cmd/main.go | 1 + neonvm-runner/cmd/main.go | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 0d813c720..550272980 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -14,6 +14,7 @@ import ( "time" "go.uber.org/zap" + k8sutil "k8s.io/kubernetes/pkg/volume/util" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index ceed54188..27eba4b94 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -34,7 +34,6 @@ import ( vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" - corev1 "k8s.io/api/core/v1" ) const ( @@ -682,20 +681,18 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d } defer notify.Close() - secrets := make(map[string][]corev1.KeyToPath) - // watched := make(map[string]string) - watched2 := make(map[string]string) + secrets := make(map[string]string) for _, disk := range disks { if disk.Secret != nil && secretNeedsSynchronisation(disk.MountPath) { rel, _ := filepath.Rel("/var/sync", disk.MountPath) - root := fmt.Sprintf("/vm/mounts%s", disk.MountPath) - if err := notify.Add(filepath.Join(root, "..data")); err != nil { + // secrets are mounted using the atomicwriter utility, which loads the secret directory + // into `..data`. + dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath) + if err := notify.Add(dataDir); err != nil { logger.Error("failed to add file to inotify instance", zap.Error(err)) } - watched2[filepath.Join(root, "..data")] = rel - - secrets[root] = disk.Secret.Items + secrets[dataDir] = rel } } @@ -709,7 +706,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d } success := true - for hostpath, guestpath := range watched2 { + for hostpath, guestpath := range secrets { if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { success = false logger.Error("failed to upload file to vm guest", zap.Error(err)) @@ -728,9 +725,10 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d case <-ctx.Done(): return case event := <-notify.Events: - guestpath, ok := watched2[event.Name] + guestpath, ok := secrets[event.Name] if !ok { // not tracking this file + // this can occur due to recursive file tracking continue } @@ -751,7 +749,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d logger.Error("failed to upload file to vm guest", zap.Error(err)) } case <-ticker.C: - for hostpath, guestpath := range watched2 { + for hostpath, guestpath := range secrets { hostsum, err := util.ChecksumFlatDir(hostpath) if err != nil { logger.Error("failed to get file checksum from host", zap.Error(err)) From d885222c873b6e58855ca574ee228662d6b2f1d6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 3 Feb 2025 17:20:44 +0000 Subject: [PATCH 14/20] slightly refactor --- neonvm-daemon/cmd/main.go | 13 ++++--------- neonvm-runner/cmd/disks.go | 18 +++++++++++++++--- neonvm-runner/cmd/main.go | 13 ++++++------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 550272980..35c924fb6 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -102,20 +102,15 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (s *cpuServer) getFile(path string) (string, error) { - if !filepath.IsLocal(path) { - return "", fmt.Errorf("\"%s\" is not a local path", path) - } - //nolint:gocritic // filepathJoin lint wrongly complains about path separators - path = filepath.Clean(filepath.Join("/var/sync", path)) - return path, nil +func (s *cpuServer) getMountDir(path string) (string, error) { + return fmt.Sprintf("/%s", path), nil } func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getFile(path) + path, err := s.getMountDir(path) if err != nil { s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) @@ -145,7 +140,7 @@ func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, pat s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getFile(path) + path, err := s.getMountDir(path) if err != nil { s.logger.Error("invalid file path", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) diff --git a/neonvm-runner/cmd/disks.go b/neonvm-runner/cmd/disks.go index 9edbd21b6..b1a787717 100644 --- a/neonvm-runner/cmd/disks.go +++ b/neonvm-runner/cmd/disks.go @@ -184,6 +184,10 @@ func createISO9660runtime( if len(disks) != 0 { for _, disk := range disks { + if diskNeedsSynchronisation(disk) { + // do nothing as we will mount it into the VM via neonvm-daemon later + continue + } if disk.MountPath != "" { mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath)) } @@ -202,7 +206,7 @@ func createISO9660runtime( mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mount %s $(/neonvm/bin/blkid -L %s) %s`, opts, disk.Name, disk.MountPath)) // Note: chmod must be after mount, otherwise it gets overwritten by mount. mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/chmod 0777 %s`, disk.MountPath)) - case disk.ConfigMap != nil || (disk.Secret != nil && !secretNeedsSynchronisation(disk.MountPath)): + case disk.ConfigMap != nil || disk.Secret != nil: mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mount -t iso9660 -o ro,mode=0644 $(/neonvm/bin/blkid -L %s) %s`, disk.Name, disk.MountPath)) case disk.Tmpfs != nil: mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/chmod 0777 %s`, disk.MountPath)) @@ -274,11 +278,19 @@ func createISO9660runtime( return nil } -func secretNeedsSynchronisation(path string) bool { - rel, err := filepath.Rel("/var/sync", path) +func diskNeedsSynchronisation(disk vmv1.Disk) bool { + if disk.DiskSource.Secret == nil { + // only supporting secrets at the moment + return false + } + + // for now, `/var/sync` is our sentinal that this is synchronised. + // TODO: should we instead put a sync flag on the Disk object itself? + rel, err := filepath.Rel("/var/sync", disk.MountPath) if err != nil { return false } + return filepath.IsLocal(rel) } diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 27eba4b94..748acd9ac 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -15,7 +15,6 @@ import ( "os" "os/exec" "os/signal" - "path/filepath" "runtime" "strings" "sync" @@ -683,16 +682,14 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d secrets := make(map[string]string) for _, disk := range disks { - if disk.Secret != nil && secretNeedsSynchronisation(disk.MountPath) { - rel, _ := filepath.Rel("/var/sync", disk.MountPath) - + if diskNeedsSynchronisation(disk) { // secrets are mounted using the atomicwriter utility, which loads the secret directory // into `..data`. dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath) if err := notify.Add(dataDir); err != nil { logger.Error("failed to add file to inotify instance", zap.Error(err)) } - secrets[dataDir] = rel + secrets[dataDir] = disk.MountPath } } @@ -888,7 +885,8 @@ func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) er ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) + // guestpath has a leading forward slash + url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath) req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) if err != nil { @@ -922,7 +920,8 @@ func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (str ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath) + // guestpath has a leading forward slash + url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) if err != nil { From c272e6983d4b5742463ffc61e813a5d280d77301 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 3 Feb 2025 18:22:13 +0000 Subject: [PATCH 15/20] fix where creating the mount dir takes place --- neonvm-daemon/cmd/main.go | 26 +------------------------- neonvm-runner/cmd/disks.go | 6 +++--- 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 35c924fb6..76fd4e7b7 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -102,21 +102,10 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (s *cpuServer) getMountDir(path string) (string, error) { - return fmt.Sprintf("/%s", path), nil -} - func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getMountDir(path) - if err != nil { - s.logger.Error("invalid file path", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - dir := filepath.Join(path, "..data") checksum, err := util.ChecksumFlatDir(dir) if err != nil { @@ -140,13 +129,6 @@ func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, pat s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() - path, err := s.getMountDir(path) - if err != nil { - s.logger.Error("invalid file path", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - if r.Body == nil { s.logger.Error("no body") w.WriteHeader(http.StatusBadRequest) @@ -185,12 +167,6 @@ func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, pat } } - if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { - s.logger.Error("could not create directory", zap.Error(err)) - w.WriteHeader(http.StatusInternalServerError) - return - } - aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon") if err != nil { s.logger.Error("could not create writer", zap.Error(err)) @@ -222,7 +198,7 @@ func (s *cpuServer) run(addr string) { } }) mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { - path := r.PathValue("path") + path := fmt.Sprintf("/%s", r.PathValue("path")) if r.Method == http.MethodGet { s.handleGetFileChecksum(w, path) return diff --git a/neonvm-runner/cmd/disks.go b/neonvm-runner/cmd/disks.go index b1a787717..7f266a592 100644 --- a/neonvm-runner/cmd/disks.go +++ b/neonvm-runner/cmd/disks.go @@ -184,13 +184,13 @@ func createISO9660runtime( if len(disks) != 0 { for _, disk := range disks { + if disk.MountPath != "" { + mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath)) + } if diskNeedsSynchronisation(disk) { // do nothing as we will mount it into the VM via neonvm-daemon later continue } - if disk.MountPath != "" { - mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath)) - } switch { case disk.EmptyDisk != nil: opts := "" From 546f86924450caf4ed0f043867e125f021f274a8 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 4 Feb 2025 12:04:33 +0000 Subject: [PATCH 16/20] remove inotify --- go.mod | 2 +- neonvm-runner/cmd/main.go | 67 +++++++++++---------------------------- 2 files changed, 20 insertions(+), 49 deletions(-) diff --git a/go.mod b/go.mod index d362cc11a..325f02a43 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,6 @@ require ( github.com/docker/cli v25.0.3+incompatible github.com/docker/docker v24.0.9+incompatible github.com/docker/libnetwork v0.8.0-dev.2.0.20210525090646-64b7a4574d14 - github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.1 github.com/go-logr/zapr v1.3.0 github.com/jpillora/backoff v1.0.0 @@ -128,6 +127,7 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 748acd9ac..1f711f958 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -23,7 +23,6 @@ import ( "time" "github.com/digitalocean/go-qemu/qmp" - "github.com/fsnotify/fsnotify" "github.com/jpillora/backoff" "github.com/samber/lo" "go.uber.org/zap" @@ -673,35 +672,20 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) { defer wg.Done() - notify, err := fsnotify.NewWatcher() - if err != nil { - logger.Error("failed to create inotify instance", zap.Error(err)) - return - } - defer notify.Close() - secrets := make(map[string]string) for _, disk := range disks { if diskNeedsSynchronisation(disk) { // secrets are mounted using the atomicwriter utility, which loads the secret directory // into `..data`. dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath) - if err := notify.Add(dataDir); err != nil { - logger.Error("failed to add file to inotify instance", zap.Error(err)) - } secrets[dataDir] = disk.MountPath } } - // Wait a bit to reduce the chance we attempt dialing before - // QEMU is started + // Faster loop for the initial upload. + // The VM might need the secrets in order for postgres to actually start up, + // so it's important we sync them as soon as the daemon is available. for { - select { - case <-time.After(1 * time.Second): - case <-ctx.Done(): - logger.Warn("QEMU shut down too soon to start forwarding logs") - } - success := true for hostpath, guestpath := range secrets { if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { @@ -712,58 +696,45 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d if success { break } + + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + logger.Warn("QEMU shut down too soon to start forwarding logs") + } } - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return - case event := <-notify.Events: - guestpath, ok := secrets[event.Name] - if !ok { - // not tracking this file - // this can occur due to recursive file tracking - continue - } - - // kubernetes secrets are mounted as symbolic links. - // When the link changes, there's no event. We only see the deletion - // of the file the link used to point to. - // This doesn't mean the file was actually deleted though. - if event.Op != fsnotify.Remove { - continue - } - - logger.Info("mounted secrets changed", zap.String("secret_path", event.Name)) - if err := notify.Add(event.Name); err != nil { - logger.Error("failed to add file to inotify instance", zap.Error(err)) - } - - if err := sendFilesToNeonvmDaemon(ctx, event.Name, guestpath); err != nil { - logger.Error("failed to upload file to vm guest", zap.Error(err)) - } case <-ticker.C: + // for each secret we are tracking for hostpath, guestpath := range secrets { + // get the checksum for the pod directory hostsum, err := util.ChecksumFlatDir(hostpath) if err != nil { - logger.Error("failed to get file checksum from host", zap.Error(err)) + logger.Error("failed to get dir checksum from host", zap.Error(err), zap.String("dir", hostpath)) + continue } + // get the checksum for the VM directory guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath) if err != nil { - logger.Error("failed to get file checksum from guest", zap.Error(err)) + logger.Error("failed to get dir checksum from guest", zap.Error(err), zap.String("dir", guestpath)) + continue } + // if not equal, update the files inside the VM. if guestsum != hostsum { if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { - logger.Error("failed to upload file to vm guest", zap.Error(err)) + logger.Error("failed to upload files to vm guest", zap.Error(err)) } } } - continue } } } From d6f277fba3bad61bcb13cce534952a3e57320c8c Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 10 Feb 2025 15:16:38 +0000 Subject: [PATCH 17/20] Add watch value to vm disk spec --- neonvm-runner/cmd/disks.go | 18 +----------------- neonvm-runner/cmd/main.go | 15 +++++++++++---- neonvm/apis/neonvm/v1/virtualmachine_types.go | 6 ++++++ neonvm/apis/neonvm/v1/zz_generated.deepcopy.go | 5 +++++ .../bases/vm.neon.tech_virtualmachines.yaml | 7 +++++++ 5 files changed, 30 insertions(+), 21 deletions(-) diff --git a/neonvm-runner/cmd/disks.go b/neonvm-runner/cmd/disks.go index 7f266a592..770cffc2f 100644 --- a/neonvm-runner/cmd/disks.go +++ b/neonvm-runner/cmd/disks.go @@ -187,7 +187,7 @@ func createISO9660runtime( if disk.MountPath != "" { mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath)) } - if diskNeedsSynchronisation(disk) { + if disk.Watch != nil && *disk.Watch { // do nothing as we will mount it into the VM via neonvm-daemon later continue } @@ -278,22 +278,6 @@ func createISO9660runtime( return nil } -func diskNeedsSynchronisation(disk vmv1.Disk) bool { - if disk.DiskSource.Secret == nil { - // only supporting secrets at the moment - return false - } - - // for now, `/var/sync` is our sentinal that this is synchronised. - // TODO: should we instead put a sync flag on the Disk object itself? - rel, err := filepath.Rel("/var/sync", disk.MountPath) - if err != nil { - return false - } - - return filepath.IsLocal(rel) -} - func calcDirUsage(dirPath string) (int64, error) { stat, err := os.Lstat(dirPath) if err != nil { diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 1f711f958..5b985a5f7 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -673,21 +673,28 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d defer wg.Done() secrets := make(map[string]string) + secretsOrd := []string{} for _, disk := range disks { - if diskNeedsSynchronisation(disk) { - // secrets are mounted using the atomicwriter utility, which loads the secret directory - // into `..data`. + if disk.Watch != nil && *disk.Watch { + // secrets/configmaps are mounted using the atomicwriter utility, + // which loads the directory into `..data`. dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath) secrets[dataDir] = disk.MountPath + secretsOrd = append(secretsOrd, dataDir) } } + if len(secretsOrd) == 0 { + return + } + // Faster loop for the initial upload. // The VM might need the secrets in order for postgres to actually start up, // so it's important we sync them as soon as the daemon is available. for { success := true - for hostpath, guestpath := range secrets { + for _, hostpath := range secretsOrd { + guestpath := secrets[hostpath] if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { success = false logger.Error("failed to upload file to vm guest", zap.Error(err)) diff --git a/neonvm/apis/neonvm/v1/virtualmachine_types.go b/neonvm/apis/neonvm/v1/virtualmachine_types.go index 3619f77af..88c6c51f1 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_types.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_types.go @@ -499,6 +499,12 @@ type Disk struct { // Path within the virtual machine at which the disk should be mounted. Must // not contain ':'. MountPath string `json:"mountPath"` + // The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified). + // This only works if the disk source is a configmap, a secret, or a projected volume. + // Defaults to false. + // +optional + // +kubebuilder:default:=false + Watch *bool `json:"watch,omitempty"` // DiskSource represents the location and type of the mounted disk. DiskSource `json:",inline"` } diff --git a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go index 9f43de748..187312274 100644 --- a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go +++ b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go @@ -49,6 +49,11 @@ func (in *Disk) DeepCopyInto(out *Disk) { *out = new(bool) **out = **in } + if in.Watch != nil { + in, out := &in.Watch, &out.Watch + *out = new(bool) + **out = **in + } in.DiskSource.DeepCopyInto(&out.DiskSource) } diff --git a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml index c41fd15ee..148d14a8e 100644 --- a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml +++ b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml @@ -1189,6 +1189,13 @@ spec: required: - size type: object + watch: + default: false + description: |- + The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified). + This only works if the disk source is a configmap, a secret, or a projected volume. + Defaults to false. + type: boolean required: - mountPath - name From ea9c0c1af620a6961dc70f043af18ee424cf111d Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 10 Feb 2025 15:16:53 +0000 Subject: [PATCH 18/20] add comments for test --- tests/e2e/vm-secret-sync/00-create-vm.yaml | 2 ++ tests/e2e/vm-secret-sync/01-update-secret.yaml | 1 + 2 files changed, 3 insertions(+) diff --git a/tests/e2e/vm-secret-sync/00-create-vm.yaml b/tests/e2e/vm-secret-sync/00-create-vm.yaml index 5a80fe357..f22fda10d 100644 --- a/tests/e2e/vm-secret-sync/00-create-vm.yaml +++ b/tests/e2e/vm-secret-sync/00-create-vm.yaml @@ -7,6 +7,7 @@ kind: Secret metadata: name: example-secret data: + # "hello world" foo: aGVsbG8gd29ybGQ= --- apiVersion: vm.neon.tech/v1 @@ -34,3 +35,4 @@ spec: secretName: example-secret mountPath: /var/sync/example name: secret-foo + watch: true diff --git a/tests/e2e/vm-secret-sync/01-update-secret.yaml b/tests/e2e/vm-secret-sync/01-update-secret.yaml index 5b60678d7..5fa8cd6ea 100644 --- a/tests/e2e/vm-secret-sync/01-update-secret.yaml +++ b/tests/e2e/vm-secret-sync/01-update-secret.yaml @@ -7,4 +7,5 @@ kind: Secret metadata: name: example-secret data: + # "goodbye world" foo: Z29vZGJ5ZSB3b3JsZA== From ef61026e9b0dc00beb6117ec324da7beedaaf62c Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 10 Feb 2025 15:32:48 +0000 Subject: [PATCH 19/20] add comments to checksum --- pkg/util/checksum.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/util/checksum.go b/pkg/util/checksum.go index e6f9c4e9d..d558449ea 100644 --- a/pkg/util/checksum.go +++ b/pkg/util/checksum.go @@ -24,6 +24,9 @@ func ChecksumFlatDir(path string) (string, error) { } sort.Strings(keys) + // note: any changes to the hash need to be sychronised between neonvm-runner and neonvm-daemon. + // Since they are updated independantly, this is not trivial. + // If in doubt, make a new function and don't touch this one. hasher, err := blake2b.New256(nil) if err != nil { return "", err @@ -31,13 +34,21 @@ func ChecksumFlatDir(path string) (string, error) { for _, filename := range keys { data := files[filename] - var length []byte - // hash as "{name}\0{len(data)}{data}" - // this prevents any possible hash confusion problems + // File hash with the following encoding: "{name}\0{len(data)}{data}". + // + // This format prevents any possible (even if unrealistic) hash confusion problems. + // If we only hashed filename and data, then there's no difference between: + // name = "file1" + // data = [] + // and + // name = "file" + // data = [b'1'] + // + // We are trusting that filenames on linux cannot have a nul character. hasher.Write([]byte(filename)) hasher.Write([]byte{0}) - hasher.Write(binary.LittleEndian.AppendUint64(length, uint64(len(data)))) + hasher.Write(binary.LittleEndian.AppendUint64([]byte{}, uint64(len(data)))) hasher.Write(data) } From fa15484f1f152b3f7f63e47ee1eadc0db8afd4a6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 10 Feb 2025 15:49:39 +0000 Subject: [PATCH 20/20] address further comments --- neonvm-daemon/cmd/main.go | 14 ++++++++++++-- neonvm-runner/cmd/main.go | 5 ++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 76fd4e7b7..c1c58e6c3 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -102,10 +102,15 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) { +func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request, path string) { s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() + if err := r.Context().Err(); err != nil { + w.WriteHeader(http.StatusRequestTimeout) + return + } + dir := filepath.Join(path, "..data") checksum, err := util.ChecksumFlatDir(dir) if err != nil { @@ -129,6 +134,11 @@ func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, pat s.fileOperationsMutex.Lock() defer s.fileOperationsMutex.Unlock() + if err := r.Context().Err(); err != nil { + w.WriteHeader(http.StatusRequestTimeout) + return + } + if r.Body == nil { s.logger.Error("no body") w.WriteHeader(http.StatusBadRequest) @@ -200,7 +210,7 @@ func (s *cpuServer) run(addr string) { mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { path := fmt.Sprintf("/%s", r.PathValue("path")) if r.Method == http.MethodGet { - s.handleGetFileChecksum(w, path) + s.handleGetFileChecksum(w, r, path) return } else if r.Method == http.MethodPut { s.handleUploadFile(w, r, path) diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 5b985a5f7..ac56b3de3 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -706,11 +706,14 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d select { case <-time.After(1 * time.Second): + continue case <-ctx.Done(): - logger.Warn("QEMU shut down too soon to start forwarding logs") + return } } + // For the entire duration the VM is alive, periodically check whether any of the watched disks + // still match what's inside the VM, and if not, send the update. ticker := time.NewTicker(30 * time.Second) defer ticker.Stop()