Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

neonvm: continuous synchronisation of files in pod/vm #1222

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 108 additions & 6 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"go.uber.org/zap"

k8sutil "k8s.io/kubernetes/pkg/volume/util"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
"github.com/neondatabase/autoscaling/pkg/util"
)

func main() {
Expand All @@ -33,19 +39,21 @@ func main() {

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
logger: logger.Named("cpu-srv"),
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
fileOperationsMutex: &sync.Mutex{},
logger: logger.Named("cpu-srv"),
}
srv.run(*addr)
}

type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
logger *zap.Logger
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
fileOperationsMutex *sync.Mutex
logger *zap.Logger
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
Expand Down Expand Up @@ -94,6 +102,87 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

dir := filepath.Join(path, "..data")
checksum, err := util.ChecksumFlatDir(dir)
if err != nil {
s.logger.Error("could not checksum dir", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte(checksum)); err != nil {
s.logger.Error("could not write response", zap.Error(err))
}
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()
conradludgate marked this conversation as resolved.
Show resolved Hide resolved

if r.Body == nil {
s.logger.Error("no body")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

var files map[string]File
if err := json.Unmarshal(body, &files); err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

payload := make(map[string]k8sutil.FileProjection)
for k, v := range files {
data, err := base64.StdEncoding.DecodeString(v.Data)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
payload[k] = k8sutil.FileProjection{
Data: data,
// read-write by root
// read-only otherwise
Mode: 0o644,
FsUser: nil,
}
}

aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon")
if err != nil {
s.logger.Error("could not create writer", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := aw.Write(payload, nil); err != nil {
s.logger.Error("could not create files", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
mux := http.NewServeMux()
mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -108,6 +197,19 @@ func (s *cpuServer) run(addr string) {
w.WriteHeader(http.StatusNotFound)
}
})
mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) {
path := fmt.Sprintf("/%s", r.PathValue("path"))
if r.Method == http.MethodGet {
s.handleGetFileChecksum(w, path)
return
} else if r.Method == http.MethodPut {
s.handleUploadFile(w, r, path)
return
} else {
// unknown method
w.WriteHeader(http.StatusNotFound)
}
})

timeout := 5 * time.Second
server := http.Server{
Expand Down
20 changes: 20 additions & 0 deletions neonvm-runner/cmd/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func createISO9660runtime(
if disk.MountPath != "" {
mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath))
}
if diskNeedsSynchronisation(disk) {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
// do nothing as we will mount it into the VM via neonvm-daemon later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Does this mean that programs starting inside the VM may initially not have access to those secrets? (if so: how difficult would it be to guarantee the initial versions of the secret once everything's mounted?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. Currently the solution is the "fast" 1-second loop to ping neonvm-daemon.

I suppose there is a solution we can do where we could mount the initial values as a static disk with a symlink, but I'm not sure how complicated that would end up being

continue
}
switch {
case disk.EmptyDisk != nil:
opts := ""
Expand Down Expand Up @@ -274,6 +278,22 @@ func createISO9660runtime(
return nil
}

func diskNeedsSynchronisation(disk vmv1.Disk) bool {
if disk.DiskSource.Secret == nil {
// only supporting secrets at the moment
return false
}

// for now, `/var/sync` is our sentinal that this is synchronised.
// TODO: should we instead put a sync flag on the Disk object itself?
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
rel, err := filepath.Rel("/var/sync", disk.MountPath)
if err != nil {
return false
}

return filepath.IsLocal(rel)
}

func calcDirUsage(dirPath string) (int64, error) {
stat, err := os.Lstat(dirPath)
if err != nil {
Expand Down
163 changes: 163 additions & 0 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

Expand Down Expand Up @@ -504,6 +505,8 @@ func runQEMU(
go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring)
wg.Add(1)
go forwardLogs(ctx, logger, &wg)
wg.Add(1)
go monitorFiles(ctx, logger, &wg, vmSpec.Disks)

qemuBin := getQemuBinaryName(cfg.architecture)
var bin string
Expand Down Expand Up @@ -665,6 +668,77 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
}
}

// monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon.
func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) {
defer wg.Done()

secrets := make(map[string]string)
for _, disk := range disks {
if diskNeedsSynchronisation(disk) {
// secrets are mounted using the atomicwriter utility, which loads the secret directory
// into `..data`.
dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath)
secrets[dataDir] = disk.MountPath
}
}

// Faster loop for the initial upload.
// The VM might need the secrets in order for postgres to actually start up,
// so it's important we sync them as soon as the daemon is available.
for {
success := true
for hostpath, guestpath := range secrets {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
success = false
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
}
if success {
break
}

select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
logger.Warn("QEMU shut down too soon to start forwarding logs")
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
}
}

ticker := time.NewTicker(30 * time.Second)
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// for each secret we are tracking
for hostpath, guestpath := range secrets {
// get the checksum for the pod directory
hostsum, err := util.ChecksumFlatDir(hostpath)
if err != nil {
logger.Error("failed to get dir checksum from host", zap.Error(err), zap.String("dir", hostpath))
continue
}

// get the checksum for the VM directory
guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath)
if err != nil {
logger.Error("failed to get dir checksum from guest", zap.Error(err), zap.String("dir", guestpath))
continue
}

// if not equal, update the files inside the VM.
if guestsum != hostsum {
if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
logger.Error("failed to upload files to vm guest", zap.Error(err))
}
}
}
}
}
}

func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
logger = logger.Named("terminate-qemu-on-sigterm")

Expand Down Expand Up @@ -753,3 +827,92 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {

return nil
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

files, err := util.ReadAllFiles(hostpath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("could not open file: %w", err)
}

dto := make(map[string]File)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What does dto mean here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data transfer object (it's very java-y). I couldn't think of a better name at the time

for k, v := range files {
dto[k] = File{Data: base64.StdEncoding.EncodeToString(v)}
}
body, err := json.Marshal(dto)
if err != nil {
return fmt.Errorf("could not encode files: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}

func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (string, error) {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return "", fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return "", fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

checksum, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %w", err)
}

return string(checksum), nil
}
Loading
Loading