Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

neonvm: continuous synchronisation of files in pod/vm #1222

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 118 additions & 6 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"go.uber.org/zap"

k8sutil "k8s.io/kubernetes/pkg/volume/util"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
"github.com/neondatabase/autoscaling/pkg/util"
)

func main() {
Expand All @@ -33,19 +39,21 @@ func main() {

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
logger: logger.Named("cpu-srv"),
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
fileOperationsMutex: &sync.Mutex{},
logger: logger.Named("cpu-srv"),
}
srv.run(*addr)
}

type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
logger *zap.Logger
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
fileOperationsMutex *sync.Mutex
logger *zap.Logger
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
Expand Down Expand Up @@ -94,6 +102,97 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

if err := r.Context().Err(); err != nil {
w.WriteHeader(http.StatusRequestTimeout)
return
}

dir := filepath.Join(path, "..data")
checksum, err := util.ChecksumFlatDir(dir)
if err != nil {
s.logger.Error("could not checksum dir", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte(checksum)); err != nil {
s.logger.Error("could not write response", zap.Error(err))
}
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()
conradludgate marked this conversation as resolved.
Show resolved Hide resolved

if err := r.Context().Err(); err != nil {
w.WriteHeader(http.StatusRequestTimeout)
return
}

if r.Body == nil {
s.logger.Error("no body")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

var files map[string]File
if err := json.Unmarshal(body, &files); err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

payload := make(map[string]k8sutil.FileProjection)
for k, v := range files {
data, err := base64.StdEncoding.DecodeString(v.Data)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
payload[k] = k8sutil.FileProjection{
Data: data,
// read-write by root
// read-only otherwise
Mode: 0o644,
FsUser: nil,
}
}

aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon")
if err != nil {
s.logger.Error("could not create writer", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := aw.Write(payload, nil); err != nil {
s.logger.Error("could not create files", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
mux := http.NewServeMux()
mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -108,6 +207,19 @@ func (s *cpuServer) run(addr string) {
w.WriteHeader(http.StatusNotFound)
}
})
mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) {
path := fmt.Sprintf("/%s", r.PathValue("path"))
if r.Method == http.MethodGet {
s.handleGetFileChecksum(w, r, path)
return
} else if r.Method == http.MethodPut {
s.handleUploadFile(w, r, path)
return
} else {
// unknown method
w.WriteHeader(http.StatusNotFound)
}
})

timeout := 5 * time.Second
server := http.Server{
Expand Down
4 changes: 4 additions & 0 deletions neonvm-runner/cmd/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func createISO9660runtime(
if disk.MountPath != "" {
mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath))
}
if disk.Watch != nil && *disk.Watch {
// do nothing as we will mount it into the VM via neonvm-daemon later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Does this mean that programs starting inside the VM may initially not have access to those secrets? (if so: how difficult would it be to guarantee the initial versions of the secret once everything's mounted?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. Currently the solution is the "fast" 1-second loop to ping neonvm-daemon.

I suppose there is a solution we can do where we could mount the initial values as a static disk with a symlink, but I'm not sure how complicated that would end up being

continue
}
switch {
case disk.EmptyDisk != nil:
opts := ""
Expand Down
173 changes: 173 additions & 0 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

Expand Down Expand Up @@ -504,6 +505,8 @@ func runQEMU(
go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring)
wg.Add(1)
go forwardLogs(ctx, logger, &wg)
wg.Add(1)
go monitorFiles(ctx, logger, &wg, vmSpec.Disks)

qemuBin := getQemuBinaryName(cfg.architecture)
var bin string
Expand Down Expand Up @@ -665,6 +668,87 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
}
}

// monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon.
func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) {
defer wg.Done()

secrets := make(map[string]string)
secretsOrd := []string{}
for _, disk := range disks {
if disk.Watch != nil && *disk.Watch {
// secrets/configmaps are mounted using the atomicwriter utility,
// which loads the directory into `..data`.
dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath)
secrets[dataDir] = disk.MountPath
secretsOrd = append(secretsOrd, dataDir)
}
}

if len(secretsOrd) == 0 {
return
}

// Faster loop for the initial upload.
// The VM might need the secrets in order for postgres to actually start up,
// so it's important we sync them as soon as the daemon is available.
for {
success := true
for _, hostpath := range secretsOrd {
guestpath := secrets[hostpath]
if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
success = false
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
}
if success {
break
}

select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
return
}
}

// For the entire duration the VM is alive, periodically check whether any of the watched disks
// still match what's inside the VM, and if not, send the update.
ticker := time.NewTicker(30 * time.Second)
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// for each secret we are tracking
for hostpath, guestpath := range secrets {
// get the checksum for the pod directory
hostsum, err := util.ChecksumFlatDir(hostpath)
if err != nil {
logger.Error("failed to get dir checksum from host", zap.Error(err), zap.String("dir", hostpath))
continue
}

// get the checksum for the VM directory
guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath)
if err != nil {
logger.Error("failed to get dir checksum from guest", zap.Error(err), zap.String("dir", guestpath))
continue
}

// if not equal, update the files inside the VM.
if guestsum != hostsum {
if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
logger.Error("failed to upload files to vm guest", zap.Error(err))
}
}
}
}
}
}

func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
logger = logger.Named("terminate-qemu-on-sigterm")

Expand Down Expand Up @@ -753,3 +837,92 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {

return nil
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

files, err := util.ReadAllFiles(hostpath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("could not open file: %w", err)
}

dto := make(map[string]File)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What does dto mean here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data transfer object (it's very java-y). I couldn't think of a better name at the time

for k, v := range files {
dto[k] = File{Data: base64.StdEncoding.EncodeToString(v)}
}
body, err := json.Marshal(dto)
if err != nil {
return fmt.Errorf("could not encode files: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}

func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (string, error) {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return "", fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return "", fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

checksum, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %w", err)
}

return string(checksum), nil
}
6 changes: 6 additions & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ type Disk struct {
// Path within the virtual machine at which the disk should be mounted. Must
// not contain ':'.
MountPath string `json:"mountPath"`
// The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified).
// This only works if the disk source is a configmap, a secret, or a projected volume.
// Defaults to false.
// +optional
// +kubebuilder:default:=false
Watch *bool `json:"watch,omitempty"`
// DiskSource represents the location and type of the mounted disk.
DiskSource `json:",inline"`
}
Expand Down
Loading
Loading