Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

SPIKE: Operator to evict tainted pods - issue 18 #47

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
k-rail
vendor
/k-rail
/evicter
vendor
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ ARG GO_VERSION=1.13
FROM golang:${GO_VERSION}-buster AS builder
WORKDIR /build
COPY ./ /build/
RUN make test
RUN make clean test
RUN make build

# Production image build stage
FROM scratch
EXPOSE 8443/tcp
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /build/k-rail /k-rail
COPY --from=builder /build/evicter /evicter
USER 65534
ENTRYPOINT ["/k-rail", "-config", "/config/config.yml"]
24 changes: 19 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
.PHONY: all build test image clean

ensure:
dep ensure
LDFLAGS = -extldflags=-static -s -w
BUILD_FLAGS = -mod=readonly -ldflags '$(LDFLAGS)' -trimpath
BUILD_VERSION ?= manual
IMAGE_NAME = "cruise/k-rail:${BUILD_VERSION}"

all: dist

dist: image

clean:
rm -f evicter k-rail
go mod verify

build:
GO111MODULE=on CGO_ENABLED=0 go build -o k-rail cmd/main.go
GO111MODULE=on CGO_ENABLED=0 go build ${BUILD_FLAGS} -o k-rail cmd/k-rail/main.go
GO111MODULE=on CGO_ENABLED=0 go build ${BUILD_FLAGS} -o evicter cmd/evicter/*.go

test:
GO111MODULE=on CGO_ENABLED=1 go test -race -cover $(shell go list ./... | grep -v /vendor/)

GO111MODULE=on CGO_ENABLED=1 go test -mod=readonly -race ./...

image: build
docker build --pull -t $(IMAGE_NAME) .
5 changes: 5 additions & 0 deletions cmd/evicter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Evict tainted pods after period
Operator that runs within k8s to find and evict tainted pods

* `k-rails/tainted-timestamp`
* `k-rails/tainted-prevent-eviction`
186 changes: 186 additions & 0 deletions cmd/evicter/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

type podProvisioner interface {
Evict(pod *v1.Pod, reason string) error
}

type Controller struct {
podStore cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
podProvisioner podProvisioner
incubationPeriodSeconds time.Duration
started time.Time
}

func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller, podProvisioner podProvisioner, incubationPeriodSeconds int64) *Controller {
return &Controller{
informer: informer,
podStore: indexer,
queue: queue,
podProvisioner: podProvisioner,
incubationPeriodSeconds: time.Duration(incubationPeriodSeconds) * time.Second,
}
}

func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.evictPod(key.(string))
c.handleErr(err, key)
return true
}

const (
annotationPreventEviction = "k-rails/tainted-prevent-eviction"
alpe marked this conversation as resolved.
Show resolved Hide resolved
annotationTimestamp = "k-rails/tainted-timestamp"
annotationReason = "k-rails/tainted-reason"
)
const defaultEvictionReason = "exec"

// evictPod is the business logic of the controller. it checks the the eviction rules and conditions before calling the pod provisioner.
func (c *Controller) evictPod(key string) error {
obj, exists, err := c.podStore.GetByKey(key)
switch {
case err != nil:
return err
case !exists:
return nil
}
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("unsupported type: %T", obj)
}
if !canEvict(pod, c.incubationPeriodSeconds) {
return nil
}

reason, ok := pod.Annotations[annotationReason]
if !ok || reason == "" {
reason = defaultEvictionReason
}

return c.podProvisioner.Evict(pod, reason)
}

func canEvict(pod *v1.Pod, incubationPeriod time.Duration) bool {
if pod == nil {
return false
}
val, ok := pod.Annotations[annotationPreventEviction]
if ok {
if val == "yes" || val == "true" {
return false
}
}

val, ok = pod.Annotations[annotationTimestamp]
if ok {
i, err := strconv.ParseInt(val, 10, 64)
if err != nil {
// todo: log
return true
}
timestamp := time.Unix(i, 0)
if time.Since(timestamp) < incubationPeriod {
return false
}
}
return true
}

const maxWorkerRetries = 5

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < maxWorkerRetries {
klog.Infof("Error syncing pod %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}

c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

const reconciliationTick = 30 * time.Second
const startupGracePeriod = 90 * time.Second

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()

// Let the workers stop when we are done
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")

go c.informer.Run(stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

wait.Until(func() {
if time.Since(c.started) < startupGracePeriod {
return
}
if err := c.doReconciliation(); err != nil {
klog.Errorf("Reconciliation failed: %s", err)
}
}, reconciliationTick, stopCh)

klog.Info("Stopping Pod controller")
}

func (c *Controller) runWorker() {
for c.processNextItem() {
}
}

func (c *Controller) doReconciliation() error {
klog.Info("Reconciliation started")
for _, key := range c.podStore.ListKeys() {
if err := c.evictPod(key); err != nil {
return errors.Wrapf(err, "pod %q", key)
}
}
klog.Info("Reconciliation completed")
return nil
}
118 changes: 118 additions & 0 deletions cmd/evicter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"flag"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/typed/policy/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

func main() {
var (
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file: `<home>/.kube/config`")
master = flag.String("master", "", "master url")
labelSelector = flag.String("label-selector", "tainted=true", "label selector to discover tainted pods")
terminationGracePeriodSeconds = flag.Int64("termination-grace-period", 30, "pod termination grace period in seconds")
taintedIncubationPeriodSeconds = flag.Int64("incubation-period", 24*60*60, "time in seconds a tainted pod can run before eviction")
)
flag.Parse()
flag.Set("logtostderr", "true") // glog: no disk log

config, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
alpe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
klog.Fatal(err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceDefault,
func(options *metav1.ListOptions) {
options.LabelSelector = *labelSelector
})

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this key function.
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})

stop := make(chan struct{})
defer close(stop)

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientset.EventsV1beta1().Events("")})
eventBroadcaster.StartRecordingToSink(stop)
defer eventBroadcaster.Shutdown()

evicter := newPodEvicter(clientset.PolicyV1beta1(), eventBroadcaster.NewRecorder(scheme.Scheme, "k-rail-evicter"), *terminationGracePeriodSeconds)
controller := NewController(queue, indexer, informer, evicter, *taintedIncubationPeriodSeconds)

go controller.Run(1, stop)

// todo: watch sigterm
// todo: recover panic to log
select {}
}

type podEvicter struct {
client v1beta1.PolicyV1beta1Interface
eventRecorder events.EventRecorder
defaultDeleteOptions *metav1.DeleteOptions
}

func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder events.EventRecorder, gracePeriodSeconds int64) *podEvicter {
return &podEvicter{
client: client,
eventRecorder: recorder,
defaultDeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds},
}
}

func (p *podEvicter) Evict(pod *v1.Pod, reason string) error {
err := p.client.Evictions(pod.Namespace).Evict(newEviction(pod, p.defaultDeleteOptions))
if err != nil {
return errors.Wrap(err, "eviction")
}
p.eventRecorder.Eventf(pod, nil, v1.EventTypeNormal, reason, "Eviction", "")
return nil
}

func newEviction(pod *v1.Pod, deleteOption *metav1.DeleteOptions) *policy.Eviction {
return &policy.Eviction{
TypeMeta: metav1.TypeMeta{
APIVersion: "Policy/v1beta1",
Kind: "Eviction",
},
ObjectMeta: pod.ObjectMeta,
DeleteOptions: deleteOption,
}
}
File renamed without changes.
Loading