diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 4a675e688..c1c58e6c3 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -1,19 +1,25 @@ package main import ( + "encoding/base64" + "encoding/json" "flag" "fmt" "io" "net/http" "os" + "path/filepath" "strconv" "sync" "time" "go.uber.org/zap" + k8sutil "k8s.io/kubernetes/pkg/volume/util" + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling" + "github.com/neondatabase/autoscaling/pkg/util" ) func main() { @@ -33,9 +39,10 @@ func main() { logger.Info("Starting neonvm-daemon", zap.String("addr", *addr)) srv := cpuServer{ - cpuOperationsMutex: &sync.Mutex{}, - cpuScaler: cpuscaling.NewCPUScaler(), - logger: logger.Named("cpu-srv"), + cpuOperationsMutex: &sync.Mutex{}, + cpuScaler: cpuscaling.NewCPUScaler(), + fileOperationsMutex: &sync.Mutex{}, + logger: logger.Named("cpu-srv"), } srv.run(*addr) } @@ -43,9 +50,10 @@ func main() { type cpuServer struct { // Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently // and ensure that status response is always actual - cpuOperationsMutex *sync.Mutex - cpuScaler *cpuscaling.CPUScaler - logger *zap.Logger + cpuOperationsMutex *sync.Mutex + cpuScaler *cpuscaling.CPUScaler + fileOperationsMutex *sync.Mutex + logger *zap.Logger } func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) { @@ -94,6 +102,97 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request, path string) { + s.fileOperationsMutex.Lock() + defer s.fileOperationsMutex.Unlock() + + if err := r.Context().Err(); err != nil { + w.WriteHeader(http.StatusRequestTimeout) + return + } + + dir := filepath.Join(path, "..data") + checksum, err := util.ChecksumFlatDir(dir) + if err != nil { + s.logger.Error("could not checksum dir", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte(checksum)); err != nil { + s.logger.Error("could not write response", zap.Error(err)) + } +} + +type File struct { + // base64 encoded file contents + Data string `json:"data"` +} + +func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) { + s.fileOperationsMutex.Lock() + defer s.fileOperationsMutex.Unlock() + + if err := r.Context().Err(); err != nil { + w.WriteHeader(http.StatusRequestTimeout) + return + } + + if r.Body == nil { + s.logger.Error("no body") + w.WriteHeader(http.StatusBadRequest) + return + } + defer r.Body.Close() + + body, err := io.ReadAll(r.Body) + if err != nil { + s.logger.Error("could not ready body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + var files map[string]File + if err := json.Unmarshal(body, &files); err != nil { + s.logger.Error("could not ready body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + payload := make(map[string]k8sutil.FileProjection) + for k, v := range files { + data, err := base64.StdEncoding.DecodeString(v.Data) + if err != nil { + s.logger.Error("could not ready body", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + payload[k] = k8sutil.FileProjection{ + Data: data, + // read-write by root + // read-only otherwise + Mode: 0o644, + FsUser: nil, + } + } + + aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon") + if err != nil { + s.logger.Error("could not create writer", zap.Error(err)) + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := aw.Write(payload, nil); err != nil { + s.logger.Error("could not create files", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + func (s *cpuServer) run(addr string) { mux := http.NewServeMux() mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) { @@ -108,6 +207,19 @@ func (s *cpuServer) run(addr string) { w.WriteHeader(http.StatusNotFound) } }) + mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) { + path := fmt.Sprintf("/%s", r.PathValue("path")) + if r.Method == http.MethodGet { + s.handleGetFileChecksum(w, r, path) + return + } else if r.Method == http.MethodPut { + s.handleUploadFile(w, r, path) + return + } else { + // unknown method + w.WriteHeader(http.StatusNotFound) + } + }) timeout := 5 * time.Second server := http.Server{ diff --git a/neonvm-runner/cmd/disks.go b/neonvm-runner/cmd/disks.go index 46e21378a..770cffc2f 100644 --- a/neonvm-runner/cmd/disks.go +++ b/neonvm-runner/cmd/disks.go @@ -187,6 +187,10 @@ func createISO9660runtime( if disk.MountPath != "" { mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath)) } + if disk.Watch != nil && *disk.Watch { + // do nothing as we will mount it into the VM via neonvm-daemon later + continue + } switch { case disk.EmptyDisk != nil: opts := "" diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 0b60e12da..ac56b3de3 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) @@ -504,6 +505,8 @@ func runQEMU( go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring) wg.Add(1) go forwardLogs(ctx, logger, &wg) + wg.Add(1) + go monitorFiles(ctx, logger, &wg, vmSpec.Disks) qemuBin := getQemuBinaryName(cfg.architecture) var bin string @@ -665,6 +668,87 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { } } +// monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon. +func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) { + defer wg.Done() + + secrets := make(map[string]string) + secretsOrd := []string{} + for _, disk := range disks { + if disk.Watch != nil && *disk.Watch { + // secrets/configmaps are mounted using the atomicwriter utility, + // which loads the directory into `..data`. + dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath) + secrets[dataDir] = disk.MountPath + secretsOrd = append(secretsOrd, dataDir) + } + } + + if len(secretsOrd) == 0 { + return + } + + // Faster loop for the initial upload. + // The VM might need the secrets in order for postgres to actually start up, + // so it's important we sync them as soon as the daemon is available. + for { + success := true + for _, hostpath := range secretsOrd { + guestpath := secrets[hostpath] + if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { + success = false + logger.Error("failed to upload file to vm guest", zap.Error(err)) + } + } + if success { + break + } + + select { + case <-time.After(1 * time.Second): + continue + case <-ctx.Done(): + return + } + } + + // For the entire duration the VM is alive, periodically check whether any of the watched disks + // still match what's inside the VM, and if not, send the update. + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // for each secret we are tracking + for hostpath, guestpath := range secrets { + // get the checksum for the pod directory + hostsum, err := util.ChecksumFlatDir(hostpath) + if err != nil { + logger.Error("failed to get dir checksum from host", zap.Error(err), zap.String("dir", hostpath)) + continue + } + + // get the checksum for the VM directory + guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath) + if err != nil { + logger.Error("failed to get dir checksum from guest", zap.Error(err), zap.String("dir", guestpath)) + continue + } + + // if not equal, update the files inside the VM. + if guestsum != hostsum { + if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil { + logger.Error("failed to upload files to vm guest", zap.Error(err)) + } + } + } + } + } +} + func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) { logger = logger.Named("terminate-qemu-on-sigterm") @@ -753,3 +837,92 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error { return nil } + +type File struct { + // base64 encoded file contents + Data string `json:"data"` +} + +func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error { + _, vmIP, _, err := calcIPs(defaultNetworkCIDR) + if err != nil { + return fmt.Errorf("could not calculate VM IP address: %w", err) + } + + files, err := util.ReadAllFiles(hostpath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("could not open file: %w", err) + } + + dto := make(map[string]File) + for k, v := range files { + dto[k] = File{Data: base64.StdEncoding.EncodeToString(v)} + } + body, err := json.Marshal(dto) + if err != nil { + return fmt.Errorf("could not encode files: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + // guestpath has a leading forward slash + url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath) + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("could not build request: %w", err) + } + + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("could not send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode) + } + + return nil +} + +func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (string, error) { + _, vmIP, _, err := calcIPs(defaultNetworkCIDR) + if err != nil { + return "", fmt.Errorf("could not calculate VM IP address: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + // guestpath has a leading forward slash + url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) + if err != nil { + return "", fmt.Errorf("could not build request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("could not send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return "", fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode) + } + + checksum, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("could not read response: %w", err) + } + + return string(checksum), nil +} diff --git a/neonvm/apis/neonvm/v1/virtualmachine_types.go b/neonvm/apis/neonvm/v1/virtualmachine_types.go index 3619f77af..88c6c51f1 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_types.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_types.go @@ -499,6 +499,12 @@ type Disk struct { // Path within the virtual machine at which the disk should be mounted. Must // not contain ':'. MountPath string `json:"mountPath"` + // The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified). + // This only works if the disk source is a configmap, a secret, or a projected volume. + // Defaults to false. + // +optional + // +kubebuilder:default:=false + Watch *bool `json:"watch,omitempty"` // DiskSource represents the location and type of the mounted disk. DiskSource `json:",inline"` } diff --git a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go index 9f43de748..187312274 100644 --- a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go +++ b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go @@ -49,6 +49,11 @@ func (in *Disk) DeepCopyInto(out *Disk) { *out = new(bool) **out = **in } + if in.Watch != nil { + in, out := &in.Watch, &out.Watch + *out = new(bool) + **out = **in + } in.DiskSource.DeepCopyInto(&out.DiskSource) } diff --git a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml index c41fd15ee..148d14a8e 100644 --- a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml +++ b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml @@ -1189,6 +1189,13 @@ spec: required: - size type: object + watch: + default: false + description: |- + The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified). + This only works if the disk source is a configmap, a secret, or a projected volume. + Defaults to false. + type: boolean required: - mountPath - name diff --git a/pkg/util/checksum.go b/pkg/util/checksum.go new file mode 100644 index 000000000..d558449ea --- /dev/null +++ b/pkg/util/checksum.go @@ -0,0 +1,83 @@ +package util + +import ( + "encoding/base64" + "encoding/binary" + "os" + "path/filepath" + "sort" + + "golang.org/x/crypto/blake2b" +) + +// Calculate the checksum over all files in a directory, assuming the directory is flat (contains no subdirs). +func ChecksumFlatDir(path string) (string, error) { + files, err := ReadAllFiles(path) + if err != nil { + return "", err + } + + // sort the file names for a reproducible hash + var keys []string + for k := range files { + keys = append(keys, k) + } + sort.Strings(keys) + + // note: any changes to the hash need to be sychronised between neonvm-runner and neonvm-daemon. + // Since they are updated independantly, this is not trivial. + // If in doubt, make a new function and don't touch this one. + hasher, err := blake2b.New256(nil) + if err != nil { + return "", err + } + + for _, filename := range keys { + data := files[filename] + + // File hash with the following encoding: "{name}\0{len(data)}{data}". + // + // This format prevents any possible (even if unrealistic) hash confusion problems. + // If we only hashed filename and data, then there's no difference between: + // name = "file1" + // data = [] + // and + // name = "file" + // data = [b'1'] + // + // We are trusting that filenames on linux cannot have a nul character. + hasher.Write([]byte(filename)) + hasher.Write([]byte{0}) + hasher.Write(binary.LittleEndian.AppendUint64([]byte{}, uint64(len(data)))) + hasher.Write(data) + } + + sum := hasher.Sum(nil) + sumBase64 := base64.RawStdEncoding.EncodeToString(sum) + return sumBase64, nil +} + +// Read all files in a directory, assuming the directory is flat (contains no subdirs). +func ReadAllFiles(path string) (map[string][]byte, error) { + entries, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + output := make(map[string][]byte) + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + data, err := os.ReadFile(filepath.Join(path, entry.Name())) + if err != nil { + return nil, err + } + + output[entry.Name()] = data + } + + return output, nil +} diff --git a/tests/e2e/vm-secret-sync/00-assert.yaml b/tests/e2e/vm-secret-sync/00-assert.yaml new file mode 100644 index 000000000..23d71c8ac --- /dev/null +++ b/tests/e2e/vm-secret-sync/00-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 70 +commands: + - script: | + set -eux + pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- grep -q "hello world" /vm/mounts/var/sync/example/foo + kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile + kubectl exec -n "$NAMESPACE" $pod -- grep -q "hello world" testfile +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 250m + memorySize: 1Gi diff --git a/tests/e2e/vm-secret-sync/00-create-vm.yaml b/tests/e2e/vm-secret-sync/00-create-vm.yaml new file mode 100644 index 000000000..f22fda10d --- /dev/null +++ b/tests/e2e/vm-secret-sync/00-create-vm.yaml @@ -0,0 +1,38 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Secret +metadata: + name: example-secret +data: + # "hello world" + foo: aGVsbG8gd29ybGQ= +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +spec: + schedulerName: autoscale-scheduler + enableSSH: true + guest: + cpus: + min: 0.25 + use: 0.25 + max: 0.25 + memorySlotSize: 1Gi + memorySlots: + min: 1 + use: 1 + max: 1 + rootDisk: + image: vm-postgres:15-bullseye + size: 1Gi + disks: + - secret: + secretName: example-secret + mountPath: /var/sync/example + name: secret-foo + watch: true diff --git a/tests/e2e/vm-secret-sync/01-assert.yaml b/tests/e2e/vm-secret-sync/01-assert.yaml new file mode 100644 index 000000000..0789da534 --- /dev/null +++ b/tests/e2e/vm-secret-sync/01-assert.yaml @@ -0,0 +1,10 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 120 +commands: + - script: | + set -eux + pod="$(kubectl get neonvm -n "$NAMESPACE" example -o jsonpath='{.status.podName}')" + kubectl exec -n "$NAMESPACE" $pod -- grep -q "goodbye world" /vm/mounts/var/sync/example/foo + kubectl exec -n "$NAMESPACE" $pod -- scp guest-vm:/var/sync/example/foo testfile + kubectl exec -n "$NAMESPACE" $pod -- grep -q "goodbye world" testfile diff --git a/tests/e2e/vm-secret-sync/01-update-secret.yaml b/tests/e2e/vm-secret-sync/01-update-secret.yaml new file mode 100644 index 000000000..5fa8cd6ea --- /dev/null +++ b/tests/e2e/vm-secret-sync/01-update-secret.yaml @@ -0,0 +1,11 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Secret +metadata: + name: example-secret +data: + # "goodbye world" + foo: Z29vZGJ5ZSB3b3JsZA==