Skip to content

Commit

Permalink
copy node before enqueue to avoid courrent read-write in NotifyNodeSt…
Browse files Browse the repository at this point in the history
…atus
  • Loading branch information
cwdsuzhou committed Jul 12, 2020
1 parent 890dbb2 commit 42d0a19
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 14 deletions.
8 changes: 8 additions & 0 deletions pkg/common/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ func (n *ProviderNode) SubResource(resource *Resource) error {
vkResource.SetCapacityToNode(n.Node)
return nil
}

// DeepCopy deepcopy node with lock, to avoid concurrent read-write
func (n *ProviderNode) DeepCopy() *corev1.Node {
n.Lock()
node := n.Node.DeepCopy()
n.Unlock()
return node
}
4 changes: 2 additions & 2 deletions pkg/provider/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func (v *VirtualK8S) Ping(ctx context.Context) error {
// the status.
//
// NotifyNodeStatus should not block callers.
func (v *VirtualK8S) NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node)) {
func (v *VirtualK8S) NotifyNodeStatus(ctx context.Context, f func(*corev1.Node)) {
klog.Info("Called NotifyNodeStatus")
go func() {
for {
select {
case node := <-v.updatedNode:
klog.Infof("Enqueue updated node %v", node.Name)
cb(node)
f(node)
case <-v.stopCh:
return
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (v *VirtualK8S) RunInContainer(ctx context.Context, namespace string, podNa
// this function is called.
//
// NotifyPods will not block callers.
func (v *VirtualK8S) NotifyPods(ctx context.Context, cb func(*corev1.Pod)) {
func (v *VirtualK8S) NotifyPods(ctx context.Context, f func(*corev1.Pod)) {
klog.Info("Called NotifyPods")
go func() {
// to make sure pods have been add to known pods
Expand All @@ -324,7 +324,7 @@ func (v *VirtualK8S) NotifyPods(ctx context.Context, cb func(*corev1.Pod)) {
klog.V(4).Infof("Enqueue updated pod %v", pod.Name)
// need trim pod, e.g. UID
util.RecoverLabels(pod.Labels, pod.Annotations)
cb(pod)
f(pod)
case <-v.stopCh:
return
case <-ctx.Done():
Expand Down
25 changes: 15 additions & 10 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func (v *VirtualK8S) buildNodeInformer(nodeInformer v12.NodeInformer) {
}
// resource we did not add when ConfigureNode should sub
v.providerNode.SubResource(v.getResourceFromPodsByNodeName(addNode.Name))
if !reflect.DeepEqual(nodeCopy, v.providerNode) {
v.updatedNode <- v.providerNode.Node
copy := v.providerNode.DeepCopy()
if !reflect.DeepEqual(nodeCopy, copy) {
v.updatedNode <- copy
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -198,8 +199,9 @@ func (v *VirtualK8S) buildNodeInformer(nodeInformer v12.NodeInformer) {
}
// resource we did not add when ConfigureNode should add
v.providerNode.AddResource(v.getResourceFromPodsByNodeName(deleteNode.Name))
if !reflect.DeepEqual(nodeCopy, v.providerNode) {
v.updatedNode <- v.providerNode.Node
copy := v.providerNode.DeepCopy()
if !reflect.DeepEqual(nodeCopy, copy) {
v.updatedNode <- copy
}
},
},
Expand Down Expand Up @@ -234,7 +236,8 @@ func (v *VirtualK8S) buildPodInformer(podInformer v12.PodInformer) {
if v.providerNode.Node == nil {
return
}
v.updatedNode <- v.providerNode.Node
copy := v.providerNode.DeepCopy()
v.updatedNode <- copy
}
return
}
Expand Down Expand Up @@ -281,24 +284,25 @@ func (v *VirtualK8S) updateVKCapacityFromNode(old, new *corev1.Node) {
v.providerNode.SubResource(v.getResourceFromPodsByNodeName(old.Name))
}
if !old.Spec.Unschedulable && new.Spec.Unschedulable || oldStatus && !newStatus {
v.providerNode.SubResource(toRemove)
v.providerNode.AddResource(v.getResourceFromPodsByNodeName(old.Name))
v.providerNode.SubResource(toRemove)

}
if !reflect.DeepEqual(old.Status.Allocatable, new.Status.Allocatable) ||
!reflect.DeepEqual(old.Status.Capacity, new.Status.Capacity) {
klog.Infof("Start to update node resource, old: %v, new %v", old.Status.Capacity,
new.Status.Capacity)
v.providerNode.SubResource(toRemove)
v.providerNode.AddResource(toAdd)
v.providerNode.SubResource(toRemove)
klog.Infof("Current node resource, resource: %v, allocatable %v", v.providerNode.Status.Capacity,
v.providerNode.Status.Allocatable)
}
if v.providerNode.Node == nil {
return
}
if !reflect.DeepEqual(nodeCopy, v.providerNode) {
v.updatedNode <- v.providerNode.Node
copy := v.providerNode.DeepCopy()
if !reflect.DeepEqual(nodeCopy, copy) {
v.updatedNode <- copy
}
}

Expand Down Expand Up @@ -334,6 +338,7 @@ func (v *VirtualK8S) updateVKCapacityFromPod(old, new *corev1.Pod) {
if v.providerNode.Node == nil {
return
}
v.updatedNode <- v.providerNode.Node
copy := v.providerNode.DeepCopy()
v.updatedNode <- copy
return
}

0 comments on commit 42d0a19

Please sign in to comment.