Skip to content

Commit

Permalink
fix(controller): Fix race of csi controller calls on the same volume (#…
Browse files Browse the repository at this point in the history
…588)

* Fix race of csi controller calls

When a CreateVolume requests takes longer then 10 seconds the request
runs into a client-side timeout and gets retied by kubelet.
While processing the new request, the old request is still being canceled
on the server-side and the created ZFSVolume CR gets deleted.

Signed-off-by: Luca Berneking <[email protected]>

* Add LockVolumeWithSnapshot function to prevent future deadlocks

Signed-off-by: Luca Berneking <[email protected]>

* Use single mutex for volume locks to prevent memory leak

Signed-off-by: Luca Berneking <[email protected]>

---------

Signed-off-by: Luca Berneking <[email protected]>
  • Loading branch information
Lucaber authored Nov 4, 2024
1 parent 76617c0 commit b2f57b9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type controller struct {

k8sNodeInformer cache.SharedIndexInformer
zfsNodeInformer cache.SharedIndexInformer

volumeLock *volumeLock
}

// NewController returns a new instance
Expand All @@ -71,6 +73,7 @@ func NewController(d *CSIDriver) csi.ControllerServer {
ctrl := &controller{
driver: d,
capabilities: newControllerCapabilities(),
volumeLock: newVolumeLock(),
}
if err := ctrl.init(); err != nil {
klog.Fatalf("init controller: %v", err)
Expand Down Expand Up @@ -450,6 +453,9 @@ func (cs *controller) CreateVolume(
contentSource := req.GetVolumeContentSource()
pvcName := helpers.GetInsensitiveParameter(&parameters, "csi.storage.k8s.io/pvc/name")

unlock := cs.volumeLock.LockVolume(volName)
defer unlock()

if contentSource != nil && contentSource.GetSnapshot() != nil {
snapshotID := contentSource.GetSnapshot().GetSnapshotId()

Expand Down Expand Up @@ -493,6 +499,8 @@ func (cs *controller) DeleteVolume(
}

volumeID := strings.ToLower(req.GetVolumeId())
unlock := cs.volumeLock.LockVolume(volumeID)
defer unlock()

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
Expand Down Expand Up @@ -611,6 +619,8 @@ func (cs *controller) ControllerExpandVolume(
"ControllerExpandVolume: no volumeID provided",
)
}
unlock := cs.volumeLock.LockVolume(volumeID)
defer unlock()

/* round off the new size */
updatedSize := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
Expand Down Expand Up @@ -707,6 +717,8 @@ func (cs *controller) CreateSnapshot(
if err != nil {
return nil, err
}
unlock := cs.volumeLock.LockVolumeWithSnapshot(volumeID, snapName)
defer unlock()

snapTimeStamp := time.Now().Unix()
var state string
Expand Down Expand Up @@ -803,6 +815,8 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()
if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand Down
49 changes: 49 additions & 0 deletions pkg/driver/volume_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package driver

import (
"sync"
)

type volumeLock struct {
cond sync.Cond
locked map[string]struct{}
}

func newVolumeLock() *volumeLock {
return &volumeLock{
cond: *sync.NewCond(&sync.Mutex{}),
locked: map[string]struct{}{},
}
}

func (l *volumeLock) LockVolume(volume string) func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

for {
if _, locked := l.locked[volume]; !locked {
break
}

l.cond.Wait()
}

l.locked[volume] = struct{}{}

return func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

delete(l.locked, volume)
l.cond.Broadcast()
}
}

func (l *volumeLock) LockVolumeWithSnapshot(volume string, snapshot string) func() {
unlockVol := l.LockVolume(volume)
unlockSnap := l.LockVolume(snapshot)
return func() {
unlockVol()
unlockSnap()
}
}

0 comments on commit b2f57b9

Please sign in to comment.