Skip to content

Commit

Permalink
refactor to track entire secret dir
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Feb 3, 2025
1 parent 4148a35 commit 557f465
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 92 deletions.
94 changes: 51 additions & 43 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
Expand All @@ -12,6 +14,7 @@ import (
"time"

"go.uber.org/zap"
k8sutil "k8s.io/kubernetes/pkg/volume/util"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
Expand Down Expand Up @@ -107,20 +110,21 @@ func (s *cpuServer) getFile(path string) (string, error) {
return path, nil
}

func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request) {
func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

path, err := s.getFile(r.PathValue("path"))
path, err := s.getFile(path)
if err != nil {
s.logger.Error("invalid file path", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

checksum, err := util.ChecksumFile(path)
dir := filepath.Join(path, "..data")
checksum, err := util.ChecksumFlatDir(dir)
if err != nil {
s.logger.Error("could not checksum file", zap.Error(err))
s.logger.Error("could not checksum dir", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -131,69 +135,75 @@ func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request
}
}

func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request) {
type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

path, err := s.getFile(r.PathValue("path"))
path, err := s.getFile(path)
if err != nil {
s.logger.Error("invalid file path", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
s.logger.Error("could not create directory", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
if r.Body == nil {
s.logger.Error("no body")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

file, err := os.CreateTemp("", "")
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not create file", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer file.Close()
defer os.Remove(file.Name())

if _, err := io.Copy(file, r.Body); err != nil {
s.logger.Error("could not read request body", zap.Error(err))
var files map[string]File
if err := json.Unmarshal(body, &files); err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

// rename is an atomic operation on unix.
// this ensures that the file is either not updated at all,
// or is updated entirely.
//
// this ensures that other processes reading the file will never
// have any inconsistencies. they will either read the old contents
// or the new contents. Any open files will still point to the old inode
// and thus still read the old contents.
if err := os.Rename(file.Name(), path); err != nil {
s.logger.Error("could not rename file", zap.Error(err))
payload := make(map[string]k8sutil.FileProjection)
for k, v := range files {
data, err := base64.StdEncoding.DecodeString(v.Data)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
payload[k] = k8sutil.FileProjection{
Data: data,
// read-write by root
// read-only otherwise
Mode: 0o644,
FsUser: nil,
}
}

if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
s.logger.Error("could not create directory", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) handleDeleteFile(w http.ResponseWriter, r *http.Request) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

path, err := s.getFile(r.PathValue("path"))
aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon")
if err != nil {
s.logger.Error("invalid file path", zap.Error(err))
s.logger.Error("could not create writer", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
s.logger.Error("could not delete file", zap.Error(err))
if err := aw.Write(payload, nil); err != nil {
s.logger.Error("could not create files", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -216,14 +226,12 @@ func (s *cpuServer) run(addr string) {
}
})
mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) {
path := r.PathValue("path")
if r.Method == http.MethodGet {
s.handleGetFileChecksum(w, r)
s.handleGetFileChecksum(w, path)
return
} else if r.Method == http.MethodPut {
s.handleUploadFile(w, r)
return
} else if r.Method == http.MethodDelete {
s.handleDeleteFile(w, r)
s.handleUploadFile(w, r, path)
return
} else {
// unknown method
Expand Down
93 changes: 53 additions & 40 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
corev1 "k8s.io/api/core/v1"
)

const (
Expand Down Expand Up @@ -674,85 +675,84 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) {
defer wg.Done()

notify, err := fsnotify.NewBufferedWatcher(2)
notify, err := fsnotify.NewWatcher()
if err != nil {
logger.Error("failed to create inotify instance", zap.Error(err))
return
}
defer notify.Close()

synchronisedFiles := make(map[string]string)
secrets := make(map[string][]corev1.KeyToPath)
// watched := make(map[string]string)
watched2 := make(map[string]string)
for _, disk := range disks {
if disk.Secret != nil && secretNeedsSynchronisation(disk.MountPath) {
rel, _ := filepath.Rel("/var/sync", disk.MountPath)
root := fmt.Sprintf("/vm/mounts%s", disk.MountPath)
for _, item := range disk.Secret.Items {
hostpath := filepath.Join(root, item.Path)
guestpath := filepath.Join(rel, item.Path)
synchronisedFiles[hostpath] = guestpath

if err := notify.Add(filepath.Join(root, "..data")); err != nil {
logger.Error("failed to add file to inotify instance", zap.Error(err))
}
}
}
watched2[filepath.Join(root, "..data")] = rel

for k := range synchronisedFiles {
if err := notify.Add(k); err != nil {
logger.Error("failed to add file to inotify instance", zap.Error(err))
secrets[root] = disk.Secret.Items
}
}

// Wait a bit to reduce the chance we attempt dialing before
// QEMU is started
success := false
for !success {
success = true
for {
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
logger.Warn("QEMU shut down too soon to start forwarding logs")
}

for hostpath, guestpath := range synchronisedFiles {
if err := sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
success := true
for hostpath, guestpath := range watched2 {
if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
success = false
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
}
if success {
break
}
}

ticker := time.NewTicker(1 * time.Minute)
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case event := <-notify.Events:
guestpath, ok := synchronisedFiles[event.Name]
guestpath, ok := watched2[event.Name]
if !ok {
// not tracking this file
continue
}
if event.Op == fsnotify.Chmod {
// not interesting.
continue
}

// kubernetes secrets are mounted as symbolic links.
// When the link changes, there's no event. We only see the deletion
// of the file the link used to point to.
// This doesn't mean the file was actually deleted though.
if event.Op == fsnotify.Remove {
if err := notify.Add(event.Name); err != nil {
logger.Error("failed to add file to inotify instance", zap.Error(err))
}
if event.Op != fsnotify.Remove {
continue
}

if err := sendFileToNeonvmDaemon(ctx, event.Name, guestpath); err != nil {
logger.Info("mounted secrets changed", zap.String("secret_path", event.Name))
if err := notify.Add(event.Name); err != nil {
logger.Error("failed to add file to inotify instance", zap.Error(err))
}

if err := sendFilesToNeonvmDaemon(ctx, event.Name, guestpath); err != nil {
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
case <-ticker.C:
for hostpath, guestpath := range synchronisedFiles {
hostsum, err := util.ChecksumFile(hostpath)
for hostpath, guestpath := range watched2 {
hostsum, err := util.ChecksumFlatDir(hostpath)
if err != nil {
logger.Error("failed to get file checksum from host", zap.Error(err))
}
Expand All @@ -763,7 +763,7 @@ func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, d
}

if guestsum != hostsum {
if err = sendFileToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
}
Expand Down Expand Up @@ -862,34 +862,47 @@ func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {
return nil
}

func sendFileToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error {
type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

file, err := os.Open(hostpath)
files, err := util.ReadAllFiles(hostpath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("could not open file: %w", err)
}

dto := make(map[string]File)
for k, v := range files {
dto[k] = File{Data: base64.StdEncoding.EncodeToString(v)}
}
body, err := json.Marshal(dto)
if err != nil {
return fmt.Errorf("could not encode files: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

url := fmt.Sprintf("http://%s:25183/files/%s", vmIP, guestpath)

var req *http.Request
if err != nil && os.IsNotExist(err) {
req, err = http.NewRequestWithContext(ctx, http.MethodDelete, url, http.NoBody)
} else {
req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, file)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
Expand Down
Loading

0 comments on commit 557f465

Please sign in to comment.