diff --git a/pkg/backuphelpers/backupvars.go b/pkg/backuphelpers/backupvars.go index 25205282e..b65304e1b 100644 --- a/pkg/backuphelpers/backupvars.go +++ b/pkg/backuphelpers/backupvars.go @@ -10,6 +10,11 @@ import ( prune "github.com/openshift/cluster-etcd-operator/pkg/cmd/prune-backups" ) +const ( + defaultNumberBackups = 3 + defaultBackupSchedule = "0 0 * * *" +) + type Enqueueable interface { Enqueue() } @@ -71,6 +76,16 @@ func (b *BackupConfig) ArgList() []string { if b.spec.Schedule != "" { args = append(args, fmt.Sprintf("--%s=%s", "schedule", b.spec.Schedule)) + } else { + // fix OCPBUGS-43687 + args = append(args, fmt.Sprintf("--%s=%s", "schedule", defaultBackupSchedule)) + } + + // fix OCPBUGS-43676 + if b.spec.RetentionPolicy.RetentionType == "" { + args = append(args, fmt.Sprintf("--%s=%s", "type", backupv1alpha1.RetentionTypeNumber)) + args = append(args, fmt.Sprintf("--%s=%d", "maxNumberOfBackups", defaultNumberBackups)) + return args } if b.spec.RetentionPolicy.RetentionType == prune.RetentionTypeNumber { @@ -101,6 +116,16 @@ func (b *BackupConfig) ArgString() string { if b.spec.Schedule != "" { args = append(args, fmt.Sprintf("- --%s=%s", "schedule", b.spec.Schedule)) + } else { + // fix OCPBUGS-43687 + args = append(args, fmt.Sprintf("- --%s=%s", "schedule", defaultBackupSchedule)) + } + + // fix OCPBUGS-43676 + if b.spec.RetentionPolicy.RetentionType == "" { + args = append(args, fmt.Sprintf("--%s=%s", "type", backupv1alpha1.RetentionTypeNumber)) + args = append(args, fmt.Sprintf("--%s=%d", "maxNumberOfBackups", defaultNumberBackups)) + return strings.Join(args, "\n ") } if b.spec.RetentionPolicy.RetentionType == prune.RetentionTypeNumber { diff --git a/pkg/backuphelpers/backupvars_test.go b/pkg/backuphelpers/backupvars_test.go index 2acc104a3..dd23fbfcc 100644 --- a/pkg/backuphelpers/backupvars_test.go +++ b/pkg/backuphelpers/backupvars_test.go @@ -23,17 +23,17 @@ func TestBackupConfig_ToArgs(t *testing.T) { { "backup spec with timezone and schedule", createEtcdBackupSpec(timezone, schedule), - " args:\n - --enabled=true\n - --timezone=GMT\n - --schedule=0 */2 * * *", + " args:\n - --enabled=true\n - --timezone=GMT\n - --schedule=0 */2 * * *\n --type=RetentionNumber\n --maxNumberOfBackups=3", }, { "backup spec with timezone and empty schedule", createEtcdBackupSpec(timezone, ""), - " args:\n - --enabled=true\n - --timezone=GMT", + " args:\n - --enabled=true\n - --timezone=GMT\n - --schedule=0 0 * * *\n --type=RetentionNumber\n --maxNumberOfBackups=3", }, { "backup spec with empty timezone and schedule", createEtcdBackupSpec("", schedule), - " args:\n - --enabled=true\n - --schedule=0 */2 * * *", + " args:\n - --enabled=true\n - --schedule=0 */2 * * *\n --type=RetentionNumber\n --maxNumberOfBackups=3", }, { "backup spec with timezone and schedule and retention number", @@ -78,6 +78,8 @@ func TestBackupConfig_ToArgList(t *testing.T) { "--enabled=true", "--timezone=GMT", "--schedule=0 */2 * * *", + "--type=RetentionNumber", + "--maxNumberOfBackups=3", }, }, { @@ -86,6 +88,9 @@ func TestBackupConfig_ToArgList(t *testing.T) { []string{ "--enabled=true", "--timezone=GMT", + "--schedule=0 0 * * *", + "--type=RetentionNumber", + "--maxNumberOfBackups=3", }, }, { @@ -94,6 +99,8 @@ func TestBackupConfig_ToArgList(t *testing.T) { []string{ "--enabled=true", "--schedule=0 */2 * * *", + "--type=RetentionNumber", + "--maxNumberOfBackups=3", }, }, { diff --git a/pkg/cmd/backuprestore/backupserver.go b/pkg/cmd/backuprestore/backupserver.go index 75acb9c78..5a6e124bf 100644 --- a/pkg/cmd/backuprestore/backupserver.go +++ b/pkg/cmd/backuprestore/backupserver.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "path" + "strings" "syscall" "time" @@ -17,7 +18,13 @@ import ( "github.com/spf13/cobra" ) -const backupVolume = "/var/lib/etcd-auto-backup" +const ( + BackupVolume = "/var/lib/etcd-auto-backup" + etcdCtlKeyName = "ETCDCTL_KEY" + etcdCtlCertName = "ETCDCTL_CERT" + etcdCtlCACertName = "ETCDCTL_CACERT" + nodeNameEnvVar = "NODE_NAME" +) var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} @@ -29,7 +36,7 @@ type backupRunnerImpl struct{} func (b backupRunnerImpl) runBackup(backupOpts *backupOptions, pruneOpts *prune.PruneOpts) error { dateString := time.Now().Format("2006-01-02_150405") - backupOpts.backupDir = path.Join(backupVolume, dateString) + backupOpts.backupDir = path.Join(BackupVolume, dateString) err := backup(backupOpts) if err != nil { return err @@ -46,6 +53,7 @@ func (b backupRunnerImpl) runBackup(backupOpts *backupOptions, pruneOpts *prune. type backupServer struct { schedule string timeZone string + nodeName string enabled bool cronSchedule cron.Schedule backupOptions @@ -57,7 +65,7 @@ func NewBackupServer(ctx context.Context) *cobra.Command { backupOptions: backupOptions{errOut: os.Stderr}, PruneOpts: prune.PruneOpts{ RetentionType: "None", - BackupPath: backupVolume, + BackupPath: BackupVolume, }, } @@ -65,7 +73,6 @@ func NewBackupServer(ctx context.Context) *cobra.Command { Use: "backup-server", Short: "Backs up a snapshot of etcd database and static pod resources without config", Run: func(cmd *cobra.Command, args []string) { - if err := backupSrv.Validate(); err != nil { klog.Fatal(err) } @@ -95,13 +102,22 @@ func (b *backupServer) Validate() error { return nil } + if err := b.validateNameNode(); err != nil { + return err + } + + if err := b.constructEnvVars(); err != nil { + klog.Infof("error constructing envVars: [%v]", err) + return err + } + cronSchedule, err := cron.ParseStandard(b.schedule) if err != nil { return fmt.Errorf("error parsing backup schedule %v: %w", b.schedule, err) } b.cronSchedule = cronSchedule - b.backupOptions.backupDir = backupVolume + b.backupOptions.backupDir = BackupVolume err = b.backupOptions.Validate() if err != nil { return fmt.Errorf("error validating backup %v: %w", b.backupOptions, err) @@ -111,7 +127,6 @@ func (b *backupServer) Validate() error { if err != nil { return fmt.Errorf("error validating prune args %v: %w", b.PruneOpts, err) } - return nil } @@ -136,7 +151,6 @@ func (b *backupServer) Run(ctx context.Context) error { func (b *backupServer) scheduleBackup(ctx context.Context, bck backupRunner) error { ticker := time.NewTicker(time.Until(b.cronSchedule.Next(time.Now()))) defer ticker.Stop() - for { select { case <-ticker.C: @@ -151,3 +165,34 @@ func (b *backupServer) scheduleBackup(ctx context.Context, bck backupRunner) err } } } + +func (b *backupServer) validateNameNode() error { + nodeNameEnv := os.Getenv(nodeNameEnvVar) + if len(nodeNameEnv) == 0 { + return fmt.Errorf("[%v] environment variable is empty", nodeNameEnvVar) + } + b.nodeName = nodeNameEnv + return nil +} + +func (b *backupServer) constructEnvVars() error { + etcdCtlKeyVal := strings.Replace("/etc/kubernetes/static-pod-certs/secrets/etcd-all-certs/etcd-peer-NODE_NAME.key", nodeNameEnvVar, b.nodeName, -1) + err := os.Setenv(etcdCtlKeyName, etcdCtlKeyVal) + if err != nil { + return fmt.Errorf("error exporting [%v]: val is [%v]: %w", etcdCtlKeyName, etcdCtlKeyVal, err) + } + + etcdCtlCertVal := strings.Replace("/etc/kubernetes/static-pod-certs/secrets/etcd-all-certs/etcd-peer-NODE_NAME.crt", nodeNameEnvVar, b.nodeName, -1) + err = os.Setenv(etcdCtlCertName, etcdCtlCertVal) + if err != nil { + return fmt.Errorf("error writing [%v]: val is [%v]: %w", etcdCtlCertName, etcdCtlCertVal, err) + } + + etcdCtlCACertVal := "/etc/kubernetes/static-pod-certs/configmaps/etcd-all-bundles/server-ca-bundle.crt" + err = os.Setenv(etcdCtlCACertName, etcdCtlCACertVal) + if err != nil { + return fmt.Errorf("error writing [%v]: val is [%v]: %w", etcdCtlCACertName, etcdCtlCACertVal, err) + } + + return nil +} diff --git a/pkg/cmd/backuprestore/backupserver_test.go b/pkg/cmd/backuprestore/backupserver_test.go index 399f9966c..13039b17b 100644 --- a/pkg/cmd/backuprestore/backupserver_test.go +++ b/pkg/cmd/backuprestore/backupserver_test.go @@ -3,6 +3,8 @@ package backuprestore import ( "context" "errors" + "fmt" + "os" "testing" "time" @@ -12,17 +14,22 @@ import ( "github.com/stretchr/testify/require" ) -const validSchedule = "* * * * *" +const ( + validSchedule = "* * * * *" + localHost = "localhost" +) func TestBackupServer_Validate(t *testing.T) { testCases := []struct { name string backupServer backupServer + envExist bool expErr error }{ { "BackupServer is disabled", backupServer{enabled: false}, + false, nil, }, { @@ -31,6 +38,7 @@ func TestBackupServer_Validate(t *testing.T) { enabled: false, schedule: "invalid schedule", }, + false, nil, }, { @@ -42,6 +50,7 @@ func TestBackupServer_Validate(t *testing.T) { backupDir: "", }, }, + false, nil, }, { @@ -49,6 +58,7 @@ func TestBackupServer_Validate(t *testing.T) { backupServer{ enabled: true, }, + true, errors.New("error parsing backup schedule : empty spec string"), }, { @@ -57,6 +67,7 @@ func TestBackupServer_Validate(t *testing.T) { enabled: true, schedule: "invalid schedule", }, + true, errors.New("error parsing backup schedule invalid schedule"), }, { @@ -68,6 +79,7 @@ func TestBackupServer_Validate(t *testing.T) { RetentionType: prune.RetentionTypeNone, }, }, + true, nil, }, { @@ -79,6 +91,7 @@ func TestBackupServer_Validate(t *testing.T) { backupDir: "", }, }, + true, errors.New("error parsing backup schedule invalid schedule"), }, { @@ -93,18 +106,29 @@ func TestBackupServer_Validate(t *testing.T) { RetentionType: prune.RetentionTypeNone, }, }, + true, nil, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.envExist { + err := os.Setenv(nodeNameEnvVar, localHost) + require.NoError(t, err) + } + actErr := tc.backupServer.Validate() if tc.expErr != nil { require.Contains(t, actErr.Error(), tc.expErr.Error()) } else { require.Equal(t, tc.expErr, actErr) } + + t.Cleanup(func() { + err := os.Unsetenv(nodeNameEnvVar) + require.NoError(t, err) + }) }) } } @@ -175,6 +199,67 @@ func TestNewBackupServer_scheduleBackup(t *testing.T) { } } +func TestBackupServer_validateNameNode(t *testing.T) { + testCases := []struct { + name string + inputNodeName string + envExist bool + expErr error + }{ + { + name: "env var exist", + inputNodeName: localHost, + envExist: true, + expErr: nil, + }, + { + name: "env var not exist", + envExist: false, + expErr: fmt.Errorf("[NODE_NAME] environment variable is empty"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.envExist { + err := os.Setenv(nodeNameEnvVar, tc.inputNodeName) + require.NoError(t, err) + } + + b := &backupServer{} + err := b.validateNameNode() + require.Equal(t, tc.expErr, err) + require.Equal(t, b.nodeName, tc.inputNodeName) + + t.Cleanup(func() { + err := os.Unsetenv(nodeNameEnvVar) + require.NoError(t, err) + }) + }) + } +} + +func TestBackupServer_constructEnvVars(t *testing.T) { + b := &backupServer{ + nodeName: localHost, + } + + err := b.constructEnvVars() + require.NoError(t, err) + + expEtcdKey := "/etc/kubernetes/static-pod-certs/secrets/etcd-all-certs/etcd-peer-localhost.key" + act := os.Getenv(etcdCtlKeyName) + require.Equal(t, expEtcdKey, act) + + expEtcdCert := "/etc/kubernetes/static-pod-certs/secrets/etcd-all-certs/etcd-peer-localhost.crt" + act = os.Getenv(etcdCtlCertName) + require.Equal(t, expEtcdCert, act) + + expEtcdCACert := "/etc/kubernetes/static-pod-certs/configmaps/etcd-all-bundles/server-ca-bundle.crt" + act = os.Getenv(etcdCtlCACertName) + require.Equal(t, expEtcdCACert, act) +} + type backupRunnerMock struct { counter int slow bool diff --git a/pkg/operator/ceohelpers/unsupported_override_test.go b/pkg/operator/ceohelpers/unsupported_override_test.go index 52ff3335d..e6c9719af 100644 --- a/pkg/operator/ceohelpers/unsupported_override_test.go +++ b/pkg/operator/ceohelpers/unsupported_override_test.go @@ -2,9 +2,10 @@ package ceohelpers import ( "fmt" + "testing" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" - "testing" operatorv1 "github.com/openshift/api/operator/v1" ) diff --git a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go index c9c2a7cbe..0006c1e90 100644 --- a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go +++ b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go @@ -3,8 +3,20 @@ package periodicbackupcontroller import ( "context" "fmt" + "os" + "slices" + "strings" "time" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" + "github.com/openshift/library-go/pkg/operator/resource/resourcemerge" + + "k8s.io/apimachinery/pkg/labels" + + "github.com/openshift/cluster-etcd-operator/pkg/cmd/backuprestore" + clientv1 "k8s.io/client-go/listers/core/v1" backupv1alpha1 "github.com/openshift/api/config/v1alpha1" @@ -19,21 +31,41 @@ import ( "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/v1helpers" + appv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) const ( backupJobLabel = "backup-name" defaultBackupCRName = "default" etcdBackupServerContainerName = "etcd-backup-server" + backupServerDaemonSet = "backup-server-daemon-set" + etcdEndpointConfigMapName = "etcd-endpoints" + etcdClientPort = ":2379" + etcdDataDirVolName = "data-dir" + etcdDataDirVolPath = "/var/lib/etcd" + etcdConfigDirVolName = "config-dir" + etcdConfigDirVolPath = "/etc/kubernetes" + etcdAutoBackupDirVolName = "etcd-auto-backup-dir" + etcdAutoBackupDirVolPath = "/var/lib/etcd-auto-backup" + etcdCertDirVolName = "cert-dir" + etcdCertDirVolPath = "/etc/kubernetes/static-pod-certs" + nodeNameEnvVar = "NODE_NAME" + nodeNameFieldPath = "spec.nodeName" + masterNodeSelector = "node-role.kubernetes.io/master" + votingNodeSelector = "node-role.kubernetes.io/voting" + backupDSLabelKey = "app" + backupDSLabelValue = "etcd-auto-backup" ) type PeriodicBackupController struct { @@ -42,6 +74,7 @@ type PeriodicBackupController struct { backupsClient backupv1client.BackupsGetter kubeClient kubernetes.Interface operatorImagePullSpec string + backupVarGetter backuphelpers.BackupVar featureGateAccessor featuregates.FeatureGateAccess kubeInformers v1helpers.KubeInformersForNamespaces } @@ -54,6 +87,7 @@ func NewPeriodicBackupController( eventRecorder events.Recorder, operatorImagePullSpec string, accessor featuregates.FeatureGateAccess, + backupVarGetter backuphelpers.BackupVar, backupsInformer factory.Informer, kubeInformers v1helpers.KubeInformersForNamespaces) factory.Controller { @@ -63,22 +97,25 @@ func NewPeriodicBackupController( backupsClient: backupsClient, kubeClient: kubeClient, operatorImagePullSpec: operatorImagePullSpec, + backupVarGetter: backupVarGetter, featureGateAccessor: accessor, kubeInformers: kubeInformers, } + syncCtx := factory.NewSyncContext("PeriodicBackupController", eventRecorder.WithComponentSuffix("periodic-backup-controller")) syncer := health.NewDefaultCheckingSyncWrapper(c.sync) livenessChecker.Add("PeriodicBackupController", syncer) return factory.New(). + WithSyncContext(syncCtx). ResyncEvery(1*time.Minute). WithInformers(backupsInformer, kubeInformers.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Informer()). WithSync(syncer.Sync). - ToController("PeriodicBackupController", eventRecorder.WithComponentSuffix("periodic-backup-controller")) + ToController("PeriodicBackupController", syncCtx.Recorder()) } -func (c *PeriodicBackupController) sync(ctx context.Context, _ factory.SyncContext) error { +func (c *PeriodicBackupController) sync(ctx context.Context, syncContext factory.SyncContext) error { if enabled, err := backuphelpers.AutoBackupFeatureGateEnabled(c.featureGateAccessor); !enabled { if err != nil { klog.V(4).Infof("PeriodicBackupController error while checking feature flags: %v", err) @@ -92,7 +129,42 @@ func (c *PeriodicBackupController) sync(ctx context.Context, _ factory.SyncConte return fmt.Errorf("PeriodicBackupController could not list backup CRDs, error was: %w", err) } + defaultFound := false for _, item := range backups.Items { + if item.Name == defaultBackupCRName { + defaultFound = true + + currentEtcdBackupDS, err := c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(ctx, backupServerDaemonSet, v1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("PeriodicBackupController could not retrieve ds/%s: %w", backupServerDaemonSet, err) + } + + endpoints, err := getEtcdEndpoints(ctx, c.kubeClient) + if err != nil { + return fmt.Errorf("PeriodicBackupController failed to list etcd-endpoints config-map: %w", err) + } + + if err = ensureVotingNodesLabeled(ctx, c.kubeClient); err != nil { + return fmt.Errorf("PeriodicBackupController could not label voting master nodes: %w", err) + } + + desiredEtcdBackupDS := createBackupServerDaemonSet(item, endpoints) + if etcdBackupServerDSDiffers(desiredEtcdBackupDS.Spec, currentEtcdBackupDS.Spec) { + _, opStatus, _, oErr := c.operatorClient.GetOperatorState() + if oErr != nil { + return fmt.Errorf("PeriodicBackupController could not retrieve operator's state: %w", err) + } + _, _, dErr := resourceapply.ApplyDaemonSet(ctx, c.kubeClient.AppsV1(), syncContext.Recorder(), desiredEtcdBackupDS, + resourcemerge.ExpectedDaemonSetGeneration(desiredEtcdBackupDS, opStatus.Generations), + ) + if dErr != nil { + return fmt.Errorf("PeriodicBackupController could not apply ds/%v: %w", backupServerDaemonSet, err) + } + klog.V(4).Infof("PeriodicBackupController applied DaemonSet [%v] successfully", backupServerDaemonSet) + } + continue + } + err := reconcileCronJob(ctx, cronJobsClient, item, c.operatorImagePullSpec) if err != nil { _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ @@ -109,6 +181,38 @@ func (c *PeriodicBackupController) sync(ctx context.Context, _ factory.SyncConte } } + if !defaultFound { + err = c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Delete(ctx, backupServerDaemonSet, v1.DeleteOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("PeriodicBackupController could not delete ds/%s: %w", backupServerDaemonSet, err) + } + klog.V(4).Infof("PeriodicBackupController deleted DaemonSet [%v] successfully", backupServerDaemonSet) + } + err = ensureVotingNodesUnLabeled(ctx, c.kubeClient) + if err != nil { + return fmt.Errorf("PeriodicBackupController could not unlabel voting master nodes: %w", err) + } + } else { + terminationReasons, err := checkBackupServerPodsStatus(c.podLister) + if err != nil { + return err + } + + if len(terminationReasons) > 0 { + _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ + Type: "PeriodicBackupControllerDegraded", + Status: operatorv1.ConditionTrue, + Reason: "Error", + Message: fmt.Sprintf("found default backup errors: %s", strings.Join(terminationReasons, " ,")), + })) + if updateErr != nil { + klog.V(4).Infof("PeriodicBackupController error during [etcd-backup-server] UpdateStatus: %v", err) + } + return nil + } + } + _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ Type: "PeriodicBackupControllerDegraded", Status: operatorv1.ConditionFalse, @@ -272,3 +376,215 @@ func newCronJob() (*batchv1.CronJob, error) { return obj.(*batchv1.CronJob), nil } + +func createBackupServerDaemonSet(cr backupv1alpha1.Backup, endpoints []string) *appv1.DaemonSet { + ds := appv1.DaemonSet{ + ObjectMeta: v1.ObjectMeta{ + Name: backupServerDaemonSet, + Namespace: operatorclient.TargetNamespace, + Labels: map[string]string{ + backupDSLabelKey: backupDSLabelValue, + }, + }, + Spec: appv1.DaemonSetSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: map[string]string{ + backupDSLabelKey: backupDSLabelValue, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + backupDSLabelKey: backupDSLabelValue, + }, + }, + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: masterNodeSelector, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: votingNodeSelector, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + NodeSelector: map[string]string{ + masterNodeSelector: "", + votingNodeSelector: "", + }, + Volumes: []corev1.Volume{ + {Name: etcdDataDirVolName, VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: etcdDataDirVolPath, + Type: ptr.To(corev1.HostPathUnset)}}}, + + {Name: etcdConfigDirVolName, VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: etcdConfigDirVolPath}}}, + + {Name: etcdAutoBackupDirVolName, VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: etcdAutoBackupDirVolPath}}}, + + {Name: etcdCertDirVolName, VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/etc/kubernetes/static-pod-resources/etcd-certs"}}}, + }, + Containers: []corev1.Container{ + { + Name: etcdBackupServerContainerName, + Image: os.Getenv("OPERATOR_IMAGE"), + Command: []string{"cluster-etcd-operator", "backup-server"}, + Env: []corev1.EnvVar{ + { + Name: nodeNameEnvVar, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: nodeNameFieldPath, + }, + }, + }, + }, + Args: constructBackupServerArgs(cr, strings.Join(endpoints, ",")), + VolumeMounts: []corev1.VolumeMount{ + {Name: etcdDataDirVolName, MountPath: etcdDataDirVolPath}, + {Name: etcdConfigDirVolName, MountPath: etcdConfigDirVolPath}, + {Name: etcdAutoBackupDirVolName, MountPath: etcdAutoBackupDirVolPath}, + {Name: etcdCertDirVolName, MountPath: etcdCertDirVolPath}, + }, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + ImagePullPolicy: corev1.PullIfNotPresent, + SecurityContext: &corev1.SecurityContext{ + Privileged: ptr.To(true), + }, + }, + }, + }, + }, + }, + } + + return &ds +} + +func getEtcdEndpoints(ctx context.Context, client kubernetes.Interface) ([]string, error) { + etcdEndPointsCM, err := client.CoreV1().ConfigMaps(operatorclient.TargetNamespace).Get(ctx, etcdEndpointConfigMapName, v1.GetOptions{}) + if err != nil { + return nil, err + } + + var endpoints []string + for _, v := range etcdEndPointsCM.Data { + ep := v + etcdClientPort + endpoints = append(endpoints, ep) + } + + slices.Sort(endpoints) + return endpoints, nil +} + +func constructBackupServerArgs(cr backupv1alpha1.Backup, endpoints string) []string { + backupConfig := backuphelpers.NewDisabledBackupConfig() + backupConfig.SetBackupSpec(&cr.Spec.EtcdBackupSpec) + argList := backupConfig.ArgList() + + argList = append(argList, fmt.Sprintf("--endpoints=%s", endpoints)) + argList = append(argList, fmt.Sprintf("--backupPath=%s", backuprestore.BackupVolume)) + + return argList +} + +func checkBackupServerPodsStatus(podLister clientv1.PodLister) ([]string, error) { + backupPods, err := podLister.List(labels.Set{backupDSLabelKey: backupDSLabelValue}.AsSelector()) + if err != nil { + return nil, fmt.Errorf("PeriodicBackupController could not list etcd pods: %w", err) + } + + var terminationReasons []string + for _, p := range backupPods { + if p.Status.Phase == corev1.PodFailed { + for _, cStatus := range p.Status.ContainerStatuses { + if cStatus.Name == etcdBackupServerContainerName { + // TODO we can also try different cStatus.State.Terminated.ExitCode + terminationReasons = append(terminationReasons, fmt.Sprintf("container %s within pod %s has been terminated: %s", etcdBackupServerContainerName, p.Name, cStatus.State.Terminated.Message)) + } + } + } + } + + return terminationReasons, nil +} + +func etcdBackupServerDSDiffers(l, r appv1.DaemonSetSpec) bool { + lBytes, _ := l.Marshal() + rBytes, _ := r.Marshal() + + if len(lBytes) != len(rBytes) { + return true + } + + for i := 0; i < len(lBytes); i++ { + if lBytes[i] != rBytes[i] { + return true + } + } + + return false +} + +func ensureVotingNodesLabeled(ctx context.Context, client kubernetes.Interface) error { + etcdEndPointsCM, err := client.CoreV1().ConfigMaps(operatorclient.TargetNamespace).Get(ctx, etcdEndpointConfigMapName, v1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to retrieve etcd-endpoints configmap: %v", err) + } + + votingIPs := sets.NewString() + for _, v := range etcdEndPointsCM.Data { + votingIPs.Insert(v) + } + + masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + if err != nil { + return fmt.Errorf("failed to list master nodes: %w", err) + } + + for _, node := range masterNodes.Items { + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + if votingIPs.Has(addr.Address) { + // update node's labels + node.Labels[votingNodeSelector] = "" + _, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update master node [%v] with label [%v]", node, votingNodeSelector) + } + } + } + } + } + + return nil +} + +func ensureVotingNodesUnLabeled(ctx context.Context, client kubernetes.Interface) error { + masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + if err != nil { + return fmt.Errorf("failed to list master nodes: %w", err) + } + + for _, node := range masterNodes.Items { + delete(node.Labels, votingNodeSelector) + + _, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update master node [%v] with deleting label [%v]", node, votingNodeSelector) + } + } + + return nil +} diff --git a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go index c4bd74281..7daed5bdb 100644 --- a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go +++ b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go @@ -3,7 +3,21 @@ package periodicbackupcontroller import ( "context" "fmt" + "slices" + "strings" "testing" + "time" + + testing2 "k8s.io/utils/clock/testing" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + + "k8s.io/apimachinery/pkg/labels" + + corev1 "k8s.io/api/core/v1" + + u "github.com/openshift/cluster-etcd-operator/pkg/testutils" configv1 "github.com/openshift/api/config/v1" backupv1alpha1 "github.com/openshift/api/config/v1alpha1" @@ -57,6 +71,477 @@ func TestSyncLoopHappyPath(t *testing.T) { requireOperatorStatus(t, fakeOperatorClient, false) } +func TestSyncLoopWithDefaultBackupCR(t *testing.T) { + var backups backupv1alpha1.BackupList + + backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "20 4 * * *", + TimeZone: "UTC", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}}, + PVCName: "backup-happy-path-pvc"}}} + + // no default CR + backups.Items = append(backups.Items, backup) + operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + client := k8sfakeclient.NewClientset([]runtime.Object{defaultEtcdEndpointCM()}...) + fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace) + fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + &operatorv1.StaticPodOperatorStatus{}, nil, nil) + + controller := PeriodicBackupController{ + operatorClient: fakeOperatorClient, + podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), + backupsClient: operatorFake.ConfigV1alpha1(), + kubeClient: client, + operatorImagePullSpec: "pullspec-image", + backupVarGetter: backuphelpers.NewDisabledBackupConfig(), + featureGateAccessor: backupFeatureGateAccessor, + kubeInformers: fakeKubeInformerForNamespace, + } + + syncCtx := factory.NewSyncContext("test", events.NewRecorder( + controller.kubeClient.CoreV1().Events(operatorclient.TargetNamespace), + "test-periodic-backup-controller", + &corev1.ObjectReference{}, + testing2.NewFakePassiveClock(time.Now()), + )) + stopChan := make(chan struct{}) + t.Cleanup(func() { + close(stopChan) + }) + fakeKubeInformerForNamespace.Start(stopChan) + + expDisabledBackupVar := " args:\n - --enabled=false" + err := controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) + + // create default CR + defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "0 */2 * * *", + TimeZone: "GMT", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}} + + backups.Items = append(backups.Items, defaultBackup) + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err := controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + endpoints, err := getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + + // removing defaultCR + backups.Items = backups.Items[:len(backups.Items)-1] + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + _, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.Error(t, err) + require.Equal(t, fmt.Errorf("daemonsets.apps \"backup-server-daemon-set\" not found").Error(), err.Error()) +} + +func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) { + var backups backupv1alpha1.BackupList + + backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "20 4 * * *", + TimeZone: "UTC", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}}, + PVCName: "backup-happy-path-pvc"}}} + + // no default CR + backups.Items = append(backups.Items, backup) + operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + client := k8sfakeclient.NewClientset([]runtime.Object{defaultEtcdEndpointCM()}...) + fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace) + fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + &operatorv1.StaticPodOperatorStatus{}, nil, nil) + + controller := PeriodicBackupController{ + operatorClient: fakeOperatorClient, + podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), + backupsClient: operatorFake.ConfigV1alpha1(), + kubeClient: client, + operatorImagePullSpec: "pullspec-image", + backupVarGetter: backuphelpers.NewDisabledBackupConfig(), + featureGateAccessor: backupFeatureGateAccessor, + kubeInformers: fakeKubeInformerForNamespace, + } + + syncCtx := factory.NewSyncContext("test", events.NewRecorder( + controller.kubeClient.CoreV1().Events(operatorclient.TargetNamespace), + "test-periodic-backup-controller", + &corev1.ObjectReference{}, + testing2.NewFakePassiveClock(time.Now()), + )) + stopChan := make(chan struct{}) + t.Cleanup(func() { + close(stopChan) + }) + fakeKubeInformerForNamespace.Start(stopChan) + + expDisabledBackupVar := " args:\n - --enabled=false" + err := controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) + + // create default CR without Retention specified + // OCPBUGS-43676 + defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "0 */2 * * *", + TimeZone: "GMT", + }}} + + backups.Items = append(backups.Items, defaultBackup) + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err := controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + require.Equal(t, fmt.Sprintf("%d", 3), extractEtcdBackupServerArgVal(t, "maxNumberOfBackups", act.Spec.Template.Spec.Containers[0].Args)) + endpoints, err := getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + + // removing defaultCR + backups.Items = backups.Items[:len(backups.Items)-1] + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + _, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.Error(t, err) + require.Equal(t, fmt.Errorf("daemonsets.apps \"backup-server-daemon-set\" not found").Error(), err.Error()) +} + +func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) { + var backups backupv1alpha1.BackupList + + backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "20 4 * * *", + TimeZone: "UTC", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}}, + PVCName: "backup-happy-path-pvc"}}} + + // no default CR + backups.Items = append(backups.Items, backup) + operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + client := k8sfakeclient.NewClientset([]runtime.Object{defaultEtcdEndpointCM()}...) + fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace) + fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + &operatorv1.StaticPodOperatorStatus{}, nil, nil) + + controller := PeriodicBackupController{ + operatorClient: fakeOperatorClient, + podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), + backupsClient: operatorFake.ConfigV1alpha1(), + kubeClient: client, + operatorImagePullSpec: "pullspec-image", + backupVarGetter: backuphelpers.NewDisabledBackupConfig(), + featureGateAccessor: backupFeatureGateAccessor, + kubeInformers: fakeKubeInformerForNamespace, + } + + syncCtx := factory.NewSyncContext("test", events.NewRecorder( + controller.kubeClient.CoreV1().Events(operatorclient.TargetNamespace), + "test-periodic-backup-controller", + &corev1.ObjectReference{}, + testing2.NewFakePassiveClock(time.Now()), + )) + stopChan := make(chan struct{}) + t.Cleanup(func() { + close(stopChan) + }) + fakeKubeInformerForNamespace.Start(stopChan) + + expDisabledBackupVar := " args:\n - --enabled=false" + err := controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) + + // create default CR without Retention specified + // OCPBUGS-43687 + defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + TimeZone: "GMT", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}} + + backups.Items = append(backups.Items, defaultBackup) + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err := controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + require.Equal(t, "0 0 * * *", extractEtcdBackupServerArgVal(t, "schedule", act.Spec.Template.Spec.Containers[0].Args)) + endpoints, err := getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + + // removing defaultCR + backups.Items = backups.Items[:len(backups.Items)-1] + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + _, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.Error(t, err) + require.Equal(t, fmt.Errorf("daemonsets.apps \"backup-server-daemon-set\" not found").Error(), err.Error()) +} + +func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) { + var backups backupv1alpha1.BackupList + + backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "20 4 * * *", + TimeZone: "UTC", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}}, + PVCName: "backup-happy-path-pvc"}}} + + // no default CR + backups.Items = append(backups.Items, backup) + operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + client := k8sfakeclient.NewClientset([]runtime.Object{defaultEtcdEndpointCM()}...) + fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace) + fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + &operatorv1.StaticPodOperatorStatus{}, nil, nil) + + controller := PeriodicBackupController{ + operatorClient: fakeOperatorClient, + podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), + backupsClient: operatorFake.ConfigV1alpha1(), + kubeClient: client, + operatorImagePullSpec: "pullspec-image", + backupVarGetter: backuphelpers.NewDisabledBackupConfig(), + featureGateAccessor: backupFeatureGateAccessor, + kubeInformers: fakeKubeInformerForNamespace, + } + + syncCtx := factory.NewSyncContext("test", events.NewRecorder( + controller.kubeClient.CoreV1().Events(operatorclient.TargetNamespace), + "test-periodic-backup-controller", + &corev1.ObjectReference{}, + testing2.NewFakePassiveClock(time.Now()), + )) + stopChan := make(chan struct{}) + t.Cleanup(func() { + close(stopChan) + }) + fakeKubeInformerForNamespace.Start(stopChan) + + expDisabledBackupVar := " args:\n - --enabled=false" + err := controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) + + // create default CR + defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "0 */2 * * *", + TimeZone: "GMT", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}} + + backups.Items = append(backups.Items, defaultBackup) + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err := controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + require.Equal(t, fmt.Sprintf("%d", 3), extractEtcdBackupServerArgVal(t, "maxNumberOfBackups", act.Spec.Template.Spec.Containers[0].Args)) + endpoints, err := getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + + // edit default CR spec within fake + backups.Items[1].Spec.EtcdBackupSpec.RetentionPolicy.RetentionNumber.MaxNumberOfBackups = 4 + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + require.Equal(t, fmt.Sprintf("%d", 4), extractEtcdBackupServerArgVal(t, "maxNumberOfBackups", act.Spec.Template.Spec.Containers[0].Args)) + endpoints, err = getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + + // removing defaultCR + backups.Items = backups.Items[:len(backups.Items)-1] + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + _, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.Error(t, err) + require.Equal(t, fmt.Errorf("daemonsets.apps \"backup-server-daemon-set\" not found").Error(), err.Error()) +} + +func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) { + var backups backupv1alpha1.BackupList + + backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "20 4 * * *", + TimeZone: "UTC", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}}, + PVCName: "backup-happy-path-pvc"}}} + + backupServerFailureMsg := fmt.Sprintf("error running etcd backup: %s", "error running backup") + client := k8sfakeclient.NewClientset([]runtime.Object{ + etcdBackupServerFailingPod("1", backupServerFailureMsg), + etcdBackupServerFailingPod("2", backupServerFailureMsg), + etcdBackupServerFailingPod("3", backupServerFailureMsg), + defaultEtcdEndpointCM()}...) + + fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace) + + fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient( + &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + &operatorv1.StaticPodOperatorStatus{}, nil, nil) + + // no default CR + backups.Items = append(backups.Items, backup) + operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + + controller := PeriodicBackupController{ + operatorClient: fakeOperatorClient, + podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), + backupsClient: operatorFake.ConfigV1alpha1(), + kubeClient: client, + operatorImagePullSpec: "pullspec-image", + backupVarGetter: backuphelpers.NewDisabledBackupConfig(), + featureGateAccessor: backupFeatureGateAccessor, + kubeInformers: fakeKubeInformerForNamespace, + } + + syncCtx := factory.NewSyncContext("test", events.NewRecorder( + controller.kubeClient.CoreV1().Events(operatorclient.TargetNamespace), + "test-periodic-backup-controller", + &corev1.ObjectReference{}, + testing2.NewFakePassiveClock(time.Now()), + )) + stopChan := make(chan struct{}) + t.Cleanup(func() { + close(stopChan) + }) + fakeKubeInformerForNamespace.Start(stopChan) + + expDisabledBackupVar := " args:\n - --enabled=false" + err := controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) + requireOperatorStatus(t, fakeOperatorClient, false) + + // create default CR + defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName}, + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "0 */2 * * *", + TimeZone: "GMT", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}} + + backups.Items = append(backups.Items, defaultBackup) + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + act, err := controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, 1, len(act.Spec.Template.Spec.Containers)) + require.Equal(t, etcdBackupServerContainerName, act.Spec.Template.Spec.Containers[0].Name) + require.Equal(t, []string{"cluster-etcd-operator", "backup-server"}, act.Spec.Template.Spec.Containers[0].Command) + endpoints, err := getEtcdEndpoints(context.TODO(), controller.kubeClient) + require.NoError(t, err) + slices.Sort(endpoints) + require.Equal(t, constructBackupServerArgs(defaultBackup, strings.Join(endpoints, ",")), act.Spec.Template.Spec.Containers[0].Args) + requireOperatorStatus(t, fakeOperatorClient, true) + + // removing defaultCR + backups.Items = backups.Items[:len(backups.Items)-1] + operatorFake = fake.NewClientset([]runtime.Object{&backups}...) + controller.backupsClient = operatorFake.ConfigV1alpha1() + + err = controller.sync(context.TODO(), syncCtx) + require.NoError(t, err) + _, err = controller.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(context.TODO(), backupServerDaemonSet, v1.GetOptions{}) + require.Error(t, err) + require.Equal(t, fmt.Errorf("daemonsets.apps \"backup-server-daemon-set\" not found").Error(), err.Error()) + requireOperatorStatus(t, fakeOperatorClient, false) +} + func TestSyncLoopExistingCronJob(t *testing.T) { backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"}, Spec: backupv1alpha1.BackupSpec{ @@ -184,6 +669,38 @@ func TestCronJobSpecDiffs(t *testing.T) { require.True(t, cronSpecDiffers(job.Spec, job2.Spec)) } +func TestConstructBackupServerArgs(t *testing.T) { + testEtcdEndpoints := "10.0.109.40:2379,10.0.63.58:2379,10.0.44.255:2379" + testCR := backupv1alpha1.Backup{ + Spec: backupv1alpha1.BackupSpec{ + EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{ + Schedule: "* * * * *", + TimeZone: "GMT", + RetentionPolicy: backupv1alpha1.RetentionPolicy{ + RetentionType: backupv1alpha1.RetentionTypeNumber, + RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}, + }, + }, + }, + } + + exp := []string{"--enabled=true", "--timezone=GMT", "--schedule=* * * * *", "--type=RetentionNumber", "--maxNumberOfBackups=3", "--endpoints=10.0.109.40:2379,10.0.63.58:2379,10.0.44.255:2379", "--backupPath=/var/lib/etcd-auto-backup"} + + act := constructBackupServerArgs(testCR, testEtcdEndpoints) + require.Equal(t, exp, act) +} + +func TestGetEtcdEndpoints(t *testing.T) { + testEtcdEndpointCM := defaultEtcdEndpointCM() + + exp := []string{"10.0.0.0:2379", "10.0.0.1:2379", "10.0.0.2:2379"} + + client := k8sfakeclient.NewClientset([]runtime.Object{testEtcdEndpointCM}...) + act, err := getEtcdEndpoints(context.TODO(), client) + require.NoError(t, err) + require.ElementsMatch(t, exp, act) +} + func requireOperatorStatus(t *testing.T, client v1helpers.StaticPodOperatorClient, degraded bool) { _, status, _, _ := client.GetOperatorState() require.Equal(t, 1, len(status.Conditions)) @@ -241,3 +758,142 @@ func findFirstCreateAction(client *k8sfakeclient.Clientset) *k8stesting.CreateAc } return createAction } + +func defaultEtcdEndpointCM() *corev1.ConfigMap { + return u.FakeConfigMap(operatorclient.TargetNamespace, etcdEndpointConfigMapName, map[string]string{ + "0": "10.0.0.0", + "1": "10.0.0.1", + "2": "10.0.0.2", + }) +} + +func etcdBackupServerFailingPod(nodeName string, failureMsg string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("etcd-backup-server-%v", nodeName), + Namespace: "openshift-etcd", + Labels: labels.Set{backupDSLabelKey: backupDSLabelValue}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + ContainerStatuses: []corev1.ContainerStatus{{ + Name: etcdBackupServerContainerName, + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + Message: failureMsg, + }, + }}, + }, + }, + } +} + +func extractEtcdBackupServerArgVal(t testing.TB, argName string, args []string) string { + for _, arg := range args { + splits := strings.Split(arg, "=") + require.Equal(t, 2, len(splits)) + name := strings.TrimPrefix(splits[0], "--") + if name == argName { + return splits[1] + } + } + + t.Errorf("expected [etcd-backup-server] arg [%v], but found none", argName) + return "" +} + +func TestEnsureVotingNodesLabeled(t *testing.T) { + // arrange + allClusterNodes := defaultClusterNodes() + var objects []runtime.Object + for _, n := range allClusterNodes { + objects = append(objects, n) + } + + endpointCM := u.EndpointsConfigMap( + u.WithEndpoint(1, "10.0.0.1"+etcdClientPort), + u.WithEndpoint(2, "10.0.0.2"+etcdClientPort), + u.WithEndpoint(3, "10.0.0.3"+etcdClientPort), + ) + objects = append(objects, endpointCM) + client := k8sfakeclient.NewClientset(objects...) + + // act + err := ensureVotingNodesLabeled(context.TODO(), client) + require.NoError(t, err) + + // assert + masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + require.NoError(t, err) + for _, m := range masterNodes.Items { + require.Contains(t, m.Labels, votingNodeSelector) + } +} + +func TestEnsureVotingNodesUnLabeled(t *testing.T) { + // arrange + allClusterNodes := defaultClusterNodes() + for _, n := range allClusterNodes { + if _, ok := n.Labels[masterNodeSelector]; ok { + n.Labels[votingNodeSelector] = "" + } + } + + var objects []runtime.Object + for _, n := range allClusterNodes { + objects = append(objects, n) + } + client := k8sfakeclient.NewClientset(objects...) + + // act + err := ensureVotingNodesUnLabeled(context.TODO(), client) + require.NoError(t, err) + + // assert + masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + require.NoError(t, err) + for _, m := range masterNodes.Items { + require.NotContains(t, m.Labels, votingNodeSelector) + } +} + +func defaultClusterNodes() []*corev1.Node { + var nodes []*corev1.Node + + for i := 1; i <= 6; i++ { + isMaster := false + if i <= 3 { + isMaster = true + } + nodes = append(nodes, createNode(i, isMaster)) + } + return nodes +} + +func createNode(idx int, isMaster bool) *corev1.Node { + node := &corev1.Node{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("n-%d", idx), + }, + } + + if isMaster { + node.ObjectMeta.Labels = map[string]string{ + masterNodeSelector: "", + } + + if node.Status.Addresses == nil { + node.Status.Addresses = []corev1.NodeAddress{} + } + node.Status.Addresses = append(node.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeInternalIP, + Address: fmt.Sprintf("10.0.0.%d", idx), + }) + } + + return node +} diff --git a/pkg/operator/starter.go b/pkg/operator/starter.go index 45a6e9ae6..ac3c164aa 100644 --- a/pkg/operator/starter.go +++ b/pkg/operator/starter.go @@ -491,6 +491,7 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle controllerContext.EventRecorder, os.Getenv("OPERATOR_IMAGE"), featureGateAccessor, + backuphelpers.NewDisabledBackupConfig(), configBackupInformer, kubeInformersForNamespaces) diff --git a/pkg/operator/targetconfigcontroller/targetconfigcontroller.go b/pkg/operator/targetconfigcontroller/targetconfigcontroller.go index 3f5d43100..6f1b70e29 100644 --- a/pkg/operator/targetconfigcontroller/targetconfigcontroller.go +++ b/pkg/operator/targetconfigcontroller/targetconfigcontroller.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" "reflect" "strings" "text/template" "time" + "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" + operatorv1 "github.com/openshift/api/operator/v1" configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1" "github.com/openshift/cluster-etcd-operator/pkg/etcdenvvar" diff --git a/pkg/operator/targetconfigcontroller/targetconfigcontroller_test.go b/pkg/operator/targetconfigcontroller/targetconfigcontroller_test.go index ca972263a..0b2a24f16 100644 --- a/pkg/operator/targetconfigcontroller/targetconfigcontroller_test.go +++ b/pkg/operator/targetconfigcontroller/targetconfigcontroller_test.go @@ -3,9 +3,10 @@ package targetconfigcontroller import ( "context" "fmt" - "k8s.io/client-go/kubernetes/scheme" "testing" + "k8s.io/client-go/kubernetes/scheme" + configv1 "github.com/openshift/api/config/v1" operatorv1 "github.com/openshift/api/operator/v1" "github.com/openshift/cluster-etcd-operator/pkg/etcdenvvar"