vendor updates

This commit is contained in:
Serguei Bezverkhi
2018-03-06 17:33:18 -05:00
parent 4b3ebc171b
commit e9033989a0
5854 changed files with 248382 additions and 119809 deletions

View File

@ -23,7 +23,7 @@ go_library(
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
@ -42,8 +42,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["attach_detach_controller_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package volume implements a controller to manage volume attach and detach
// Package attachdetach implements a controller to manage volume attach and detach
// operations.
package attachdetach
@ -47,7 +47,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
// TimerConfig contains configuration of internal attach/detach timers and
@ -137,7 +137,7 @@ func NewAttachDetachController(
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumeutil.NewBlockVolumePathHandler()
blkutil := volumepathhandler.NewBlockVolumePathHandler()
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
@ -335,7 +335,7 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
}
for _, pod := range pods {
podToAdd := pod
adc.podAdd(&podToAdd)
adc.podAdd(podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
@ -361,7 +361,7 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
err)
continue
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
glog.Errorf(
"Failed to find unique name for volume %q, pod %q/%q: %v",
@ -587,10 +587,10 @@ func (adc *attachDetachController) GetExec(pluginName string) mount.Exec {
}
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
keepTerminatedPodVolumes := false
if t, ok := node.Annotations[volumehelper.KeepTerminatedPodVolumesAnnotation]; ok {
if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
keepTerminatedPodVolumes = (t == "true")
}

View File

@ -15,9 +15,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache",
deps = [
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@ -30,8 +30,7 @@ go_test(
"actual_state_of_world_test.go",
"desired_state_of_world_test.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/volume/testing:go_default_library",

View File

@ -31,8 +31,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// ActualStateOfWorld defines a set of thread-safe operations supported on
@ -73,7 +73,7 @@ type ActualStateOfWorld interface {
// SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified
// node to true indicating the AttachedVolume field in the Node's Status
// object needs to be updated by the node updater again.
// If the specifed node does not exist in the nodesToUpdateStatusFor list,
// If the specified node does not exist in the nodesToUpdateStatusFor list,
// log the error and return
SetNodeStatusUpdateNeeded(nodeName types.NodeName)
@ -131,8 +131,8 @@ type ActualStateOfWorld interface {
type AttachedVolume struct {
operationexecutor.AttachedVolume
// MountedByNode indicates that this volume has been been mounted by the
// node and is unsafe to detach.
// MountedByNode indicates that this volume has been mounted by the node and
// is unsafe to detach.
// The value is set and unset by SetVolumeMountedByNode(...).
MountedByNode bool
@ -275,7 +275,7 @@ func (asw *actualStateOfWorld) AddVolumeNode(
err)
}
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(
volumeName, err = util.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
@ -467,13 +467,13 @@ func (asw *actualStateOfWorld) addVolumeToReportAsAttached(
// Update the flag statusUpdateNeeded to indicate whether node status is already updated or
// needs to be updated again by the node status updater.
// If the specifed node does not exist in the nodesToUpdateStatusFor list, log the error and return
// If the specified node does not exist in the nodesToUpdateStatusFor list, log the error and return
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) updateNodeStatusUpdateNeeded(nodeName types.NodeName, needed bool) error {
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// should not happen
errMsg := fmt.Sprintf("Failed to set statusUpdateNeeded to needed %t because nodeName=%q does not exist",
errMsg := fmt.Sprintf("Failed to set statusUpdateNeeded to needed %t, because nodeName=%q does not exist",
needed, nodeName)
return fmt.Errorf(errMsg)
}
@ -488,7 +488,7 @@ func (asw *actualStateOfWorld) SetNodeStatusUpdateNeeded(nodeName types.NodeName
asw.Lock()
defer asw.Unlock()
if err := asw.updateNodeStatusUpdateNeeded(nodeName, true); err != nil {
glog.Errorf("Failed to update statusUpdateNeeded field in actual state of world: %v", err)
glog.Warningf("Failed to update statusUpdateNeeded field in actual state of world: %v", err)
}
}

View File

@ -28,9 +28,9 @@ import (
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorld defines a set of thread-safe operations supported on
@ -40,14 +40,14 @@ import (
// should be attached to the specified node, and pods are the pods that
// reference the volume and are scheduled to that node.
// Note: This is distinct from the DesiredStateOfWorld implemented by the
// kubelet volume manager. The both keep track of different objects. This
// kubelet volume manager. They both keep track of different objects. This
// contains attach/detach controller specific state.
type DesiredStateOfWorld interface {
// AddNode adds the given node to the list of nodes managed by the attach/
// detach controller.
// If the node already exists this is a no-op.
// keepTerminatedPodVolumes is a property of the node that determines
// if for terminated pods volumes should be mounted and attached.
// if volumes should be mounted and attached for terminated pods.
AddNode(nodeName k8stypes.NodeName, keepTerminatedPodVolumes bool)
// AddPod adds the given pod to the list of pods that reference the
@ -105,6 +105,10 @@ type DesiredStateOfWorld interface {
// Mark multiattach error as reported to prevent spamming multiple
// events for same error
SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)
// GetPodsOnNodes returns list of pods ("namespace/name") that require
// given volume on given nodes.
GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod
}
// VolumeToAttach represents a volume that should be attached to a node.
@ -152,7 +156,7 @@ type nodeManaged struct {
// volumesToAttach is a map containing the set of volumes that should be
// attached to this node. The key in the map is the name of the volume and
// the value is a pod object containing more information about the volume.
// the value is a volumeToAttach object containing more information about the volume.
volumesToAttach map[v1.UniqueVolumeName]volumeToAttach
// keepTerminatedPodVolumes determines if for terminated pods(on this node) - volumes
@ -160,10 +164,10 @@ type nodeManaged struct {
keepTerminatedPodVolumes bool
}
// The volume object represents a volume that should be attached to a node.
// The volumeToAttach object represents a volume that should be attached to a node.
type volumeToAttach struct {
// multiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
// It is used to to prevent reporting the error from being reported more than once for a given volume.
// It is used to prevent reporting the error from being reported more than once for a given volume.
multiAttachErrorReported bool
// volumeName contains the unique identifier for this volume.
@ -227,11 +231,12 @@ func (dsw *desiredStateOfWorld) AddPod(
err)
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
volumeName, err := util.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
"failed to get UniqueVolumeName from volumeSpec for plugin=%q and volume=%q err=%v",
attachableVolumePlugin.GetPluginName(),
volumeSpec.Name(),
err)
}
@ -412,3 +417,24 @@ func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
}
return pods
}
func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod {
dsw.RLock()
defer dsw.RUnlock()
pods := []*v1.Pod{}
for _, nodeName := range nodes {
node, ok := dsw.nodesManaged[nodeName]
if !ok {
continue
}
volume, ok := node.volumesToAttach[volumeName]
if !ok {
continue
}
for _, pod := range volume.scheduledPods {
pods = append(pods, pod.podObj)
}
}
return pods
}

View File

@ -1032,3 +1032,49 @@ func verifyVolumeToAttach(
t.Fatalf("volumesToAttach (%v) should contain %q/%q. It does not.", volumesToAttach, expectedVolumeName, expectedNodeName)
}
func Test_GetPodsOnNodes(t *testing.T) {
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
// 2 nodes, each with one pod with a different volume
node1Name := k8stypes.NodeName("node1-name")
pod1Name := "pod1-uid"
volume1Name := v1.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name, false /*keepTerminatedPodVolumes*/)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod1Name,
podAddErr)
}
node2Name := k8stypes.NodeName("node2-name")
pod2Name := "pod2-uid"
volume2Name := v1.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name, false /*keepTerminatedPodVolumes*/)
_, podAddErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod2Name,
podAddErr)
}
// Third node without any pod
node3Name := k8stypes.NodeName("node3-name")
dsw.AddNode(node3Name, false /*keepTerminatedPodVolumes*/)
// Act
pods := dsw.GetVolumePodsOnNodes([]k8stypes.NodeName{node1Name, node2Name, node3Name, "non-existing-node"}, generatedVolume1Name)
// Assert
if len(pods) != 1 {
t.Fatalf("Expected 1 pod, got %d", len(pods))
}
if pods[0].Name != pod1Name {
t.Errorf("Expected pod %s/%s, got %s", pod1Name, pod1Name, pods[0].Name)
}
}

View File

@ -14,7 +14,7 @@ go_library(
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
@ -41,13 +41,12 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["desired_state_of_world_populator_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -33,7 +33,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
@ -133,7 +133,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
true /* default volume action */)
if volumeActionFlag {
informerPodUID := volumehelper.GetUniquePodName(informerPod)
informerPodUID := volutil.GetUniquePodName(informerPod)
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
@ -142,7 +142,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
}
}
// the pod from dsw does not exist in pod informer, or it does not match the unique identifer retrieved
// the pod from dsw does not exist in pod informer, or it does not match the unique identifier retrieved
// from the informer, delete it from dsw
glog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
@ -158,7 +158,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
dswp.timeOfLastListPods = time.Now()
for _, pod := range pods {
if volumehelper.IsPodTerminated(pod, pod.Status) {
if volutil.IsPodTerminated(pod, pod.Status) {
// Do not add volumes for terminated pods
continue
}

View File

@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/volume/util"
)
func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
@ -66,7 +66,7 @@ func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
fakePodInformer.Informer().GetStore().Add(pod)
podName := volumehelper.GetUniquePodName(pod)
podName := util.GetUniquePodName(pod)
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
@ -27,13 +28,13 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["reconciler_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",

View File

@ -21,10 +21,12 @@ package reconciler
import (
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
@ -145,7 +147,7 @@ func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
}
}
// Only if this volume is a persistent volume, we have reliable information on wether it's allowed or not to
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
if volumeSpec.PersistentVolume != nil {
// Check for persistent volume types which do not fail when trying to multi-attach
@ -269,12 +271,8 @@ func (rc *reconciler) attachDesiredVolumes() {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported {
simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
glog.Warningf(detailedMsg)
}
continue
}
@ -292,5 +290,78 @@ func (rc *reconciler) attachDesiredVolumes() {
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}
}
}
// reportMultiAttachError sends events and logs situation that a volume that
// should be attached to a node is already attached to different node(s).
func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
// Filter out the current node from list of nodes where the volume is
// attached.
// Some methods need []string, some other needs []NodeName, collect both.
// In theory, these arrays should have always only one element - the
// controller does not allow more than one attachment. But use array just
// in case...
otherNodes := []types.NodeName{}
otherNodesStr := []string{}
for _, node := range nodes {
if node != volumeToAttach.NodeName {
otherNodes = append(otherNodes, node)
otherNodesStr = append(otherNodesStr, string(node))
}
}
// Get list of pods that use the volume on the other nodes.
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
if len(pods) == 0 {
// We did not find any pods that requests the volume. The pod must have been deleted already.
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
// Log detailed message to system admin
nodeList := strings.Join(otherNodesStr, ", ")
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList))
glog.Warningf(detailedMsg)
return
}
// There are pods that require the volume and run on another node. Typically
// it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let
// the user know what pods are blocking the volume.
for _, scheduledPod := range volumeToAttach.ScheduledPods {
// Each scheduledPod must get a custom message. They can run in
// different namespaces and user of a namespace should not see names of
// pods in other namespaces.
localPodNames := []string{} // Names of pods in scheduledPods's namespace
otherPods := 0 // Count of pods in other namespaces
for _, pod := range pods {
if pod.Namespace == scheduledPod.Namespace {
localPodNames = append(localPodNames, pod.Name)
} else {
otherPods++
}
}
var msg string
if len(localPodNames) > 0 {
msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", "))
if otherPods > 0 {
msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods)
}
} else {
// No local pods, there are pods only in different namespaces.
msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
}
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
// Log all pods for system admin
podNames := []string{}
for _, pod := range pods {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
glog.Warningf(detailedMsg)
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
stringutil "k8s.io/kubernetes/pkg/util/strings"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
@ -531,6 +532,115 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
}
func Test_ReportMultiAttachError(t *testing.T) {
type nodeWithPods struct {
name k8stypes.NodeName
podNames []string
}
tests := []struct {
name string
nodes []nodeWithPods
expectedEvents []string
}{
{
"no pods use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"},
},
{
"pods in the same namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns1/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"},
},
{
"pods in anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"},
},
{
"pods both in the same and anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
{"node3", []string{"ns1/pod3"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"},
},
}
for _, test := range tests {
// Arrange
t.Logf("Test %q starting", test.name)
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := record.NewFakeRecorder(100)
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
rc := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
nodes := []k8stypes.NodeName{}
for _, n := range test.nodes {
dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/)
nodes = append(nodes, n.name)
for _, podName := range n.podNames {
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
uid := string(n.name) + "-" + podName // unique UID
namespace, name := stringutil.SplitQualifiedName(podName)
pod := controllervolumetesting.NewPod(uid, name)
pod.Namespace = namespace
_, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name)
if err != nil {
t.Fatalf("Error adding pod %s to DSW: %s", podName, err)
}
}
}
// Act
volumes := dsw.GetVolumesToAttach()
for _, vol := range volumes {
if vol.NodeName == "node1" {
rc.(*reconciler).reportMultiAttachError(vol, nodes)
}
}
// Assert
close(fakeRecorder.Events)
index := 0
for event := range fakeRecorder.Events {
if len(test.expectedEvents) < index {
t.Errorf("Test %q: unexpected event received: %s", test.name, event)
} else {
expectedEvent := test.expectedEvents[index]
if expectedEvent != event {
t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event)
}
}
index++
}
for i := index; i < len(test.expectedEvents); i++ {
t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i])
}
}
}
func waitForMultiAttachErrorOnNode(
t *testing.T,
attachedNode k8stypes.NodeName,

View File

@ -14,11 +14,11 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater",
deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
],

View File

@ -19,18 +19,15 @@ limitations under the License.
package statusupdater
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
// NodeStatusUpdater defines a set of operations for updating the
@ -100,47 +97,12 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
node := nodeObj.DeepCopy()
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes
newData, err := json.Marshal(node)
_, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node)
if err != nil {
return fmt.Errorf(
"failed to Marshal newData for node %q. %v",
nodeName,
err)
return err
}
patchBytes, err :=
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateTwoWayMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.CoreV1().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
return fmt.Errorf(
"failed to kubeClient.CoreV1().Nodes().Patch for node %q. %v",
nodeName,
err)
}
glog.V(4).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
glog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes)
return nil
}

View File

@ -11,7 +11,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing",
deps = [
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -30,7 +30,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/volume/util"
)
const TestPluginName = "kubernetes.io/testPlugin"
@ -142,7 +142,7 @@ func CreateTestClient() *fake.Clientset {
"name": nodeName,
},
Annotations: map[string]string{
volumehelper.ControllerManagedAttachAnnotation: "true",
util.ControllerManagedAttachAnnotation: "true",
},
},
Status: v1.NodeStatus{

View File

@ -12,7 +12,7 @@ go_library(
deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -25,7 +25,7 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/volume/util"
)
// CreateVolumeSpec creates and returns a mutatable volume.Spec object for the
@ -150,7 +150,7 @@ func DetermineVolumeAction(pod *v1.Pod, desiredStateOfWorld cache.DesiredStateOf
nodeName := types.NodeName(pod.Spec.NodeName)
keepTerminatedPodVolume := desiredStateOfWorld.GetKeepTerminatedPodVolumesForNode(nodeName)
if volumehelper.IsPodTerminated(pod, pod.Status) {
if util.IsPodTerminated(pod, pod.Status) {
// if pod is terminate we let kubelet policy dictate if volume
// should be detached or not
return keepTerminatedPodVolume
@ -216,7 +216,7 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D
continue
}
uniquePodName := volumehelper.GetUniquePodName(pod)
uniquePodName := util.GetUniquePodName(pod)
if addVolumes {
// Add volume to desired state of world
_, err := desiredStateOfWorld.AddPod(
@ -232,7 +232,7 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D
} else {
// Remove volume from desired state of world
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
glog.V(10).Infof(

View File

@ -17,13 +17,13 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/expand/cache:go_default_library",
"//pkg/controller/volume/expand/util:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -53,7 +53,6 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/controller/volume/expand/cache:all-srcs",
"//pkg/controller/volume/expand/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -11,12 +11,13 @@ go_library(
srcs = ["volume_resize_map.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand/cache",
deps = [
"//pkg/controller/volume/expand/util:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
@ -39,8 +40,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["volume_resize_map_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand/cache",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/volume/util/types:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -24,11 +24,12 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
commontypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/expand/util"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/types"
)
@ -44,6 +45,8 @@ type VolumeResizeMap interface {
MarkAsResized(*PVCWithResizeRequest, resource.Quantity) error
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
UpdatePVSize(*PVCWithResizeRequest, resource.Quantity) error
// MarkForFSResize updates pvc condition to indicate that a file system resize is pending
MarkForFSResize(*PVCWithResizeRequest) error
}
type volumeResizeMap struct {
@ -52,7 +55,7 @@ type volumeResizeMap struct {
// kube client for making API calls
kubeClient clientset.Interface
// for guarding access to pvcrs map
sync.RWMutex
sync.Mutex
}
// PVCWithResizeRequest struct defines data structure that stores state needed for
@ -103,9 +106,6 @@ func (resizeMap *volumeResizeMap) AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv
return
}
resizeMap.Lock()
defer resizeMap.Unlock()
pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
@ -121,6 +121,9 @@ func (resizeMap *volumeResizeMap) AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv
ExpectedSize: pvcSize,
PersistentVolume: pv,
}
resizeMap.Lock()
defer resizeMap.Unlock()
resizeMap.pvcrs[types.UniquePVCName(pvc.UID)] = pvcRequest
}
@ -141,18 +144,15 @@ func (resizeMap *volumeResizeMap) GetPVCsWithResizeRequest() []*PVCWithResizeReq
// DeletePVC removes given pvc object from list of pvcs that needs resizing.
// deleting a pvc in this map doesn't affect operations that are already inflight.
func (resizeMap *volumeResizeMap) DeletePVC(pvc *v1.PersistentVolumeClaim) {
resizeMap.Lock()
defer resizeMap.Unlock()
pvcUniqueName := types.UniquePVCName(pvc.UID)
glog.V(5).Infof("Removing PVC %v from resize map", pvcUniqueName)
resizeMap.Lock()
defer resizeMap.Unlock()
delete(resizeMap.pvcrs, pvcUniqueName)
}
// MarkAsResized marks a pvc as fully resized
func (resizeMap *volumeResizeMap) MarkAsResized(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error {
resizeMap.Lock()
defer resizeMap.Unlock()
emptyCondition := []v1.PersistentVolumeClaimCondition{}
err := resizeMap.updatePVCCapacityAndConditions(pvcr, newSize, emptyCondition)
@ -163,11 +163,23 @@ func (resizeMap *volumeResizeMap) MarkAsResized(pvcr *PVCWithResizeRequest, newS
return nil
}
// MarkForFSResize marks pvc with condition that indicates a fs resize is pending
func (resizeMap *volumeResizeMap) MarkForFSResize(pvcr *PVCWithResizeRequest) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
}
conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
newPVC := pvcr.PVC.DeepCopy()
newPVC = util.MergeResizeConditionOnPVC(newPVC, conditions)
_, err := util.PatchPVCStatus(pvcr.PVC /*oldPVC*/, newPVC, resizeMap.kubeClient)
return err
}
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
func (resizeMap *volumeResizeMap) UpdatePVSize(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error {
resizeMap.Lock()
defer resizeMap.Unlock()
oldPv := pvcr.PersistentVolume
pvClone := oldPv.DeepCopy()
@ -201,16 +213,9 @@ func (resizeMap *volumeResizeMap) UpdatePVSize(pvcr *PVCWithResizeRequest, newSi
}
func (resizeMap *volumeResizeMap) updatePVCCapacityAndConditions(pvcr *PVCWithResizeRequest, newSize resource.Quantity, pvcConditions []v1.PersistentVolumeClaimCondition) error {
claimClone := pvcr.PVC.DeepCopy()
claimClone.Status.Capacity[v1.ResourceStorage] = newSize
claimClone.Status.Conditions = pvcConditions
_, updateErr := resizeMap.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone)
if updateErr != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", pvcr.QualifiedName(), updateErr)
return updateErr
}
return nil
newPVC := pvcr.PVC.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC = util.MergeResizeConditionOnPVC(newPVC, pvcConditions)
_, err := util.PatchPVCStatus(pvcr.PVC /*oldPVC*/, newPVC, resizeMap.kubeClient)
return err
}

View File

@ -42,13 +42,13 @@ import (
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
const (
// How often resizing loop runs
syncLoopPeriod time.Duration = 30 * time.Second
syncLoopPeriod time.Duration = 400 * time.Millisecond
// How often pvc populator runs
populatorLoopPeriod time.Duration = 2 * time.Minute
)
@ -118,7 +118,7 @@ func NewExpandController(
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
blkutil := util.NewBlockVolumePathHandler()
blkutil := volumepathhandler.NewBlockVolumePathHandler()
expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
@ -162,9 +162,17 @@ func (expc *expandController) Run(stopCh <-chan struct{}) {
func (expc *expandController) deletePVC(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if pvc == nil || !ok {
return
if !ok {
tombstone, ok := obj.(kcache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim)
if !ok {
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a pvc %#v", obj))
return
}
}
expc.resizeMap.DeletePVC(pvc)
@ -182,12 +190,21 @@ func (expc *expandController) pvcUpdate(oldObj, newObj interface{}) {
if newPVC == nil || !ok {
return
}
pv, err := getPersistentVolume(newPVC, expc.pvLister)
if err != nil {
glog.V(5).Infof("Error getting Persistent Volume for pvc %q : %v", newPVC.UID, err)
return
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldSize := oldPvc.Spec.Resources.Requests[v1.ResourceStorage]
// We perform additional checks inside resizeMap.AddPVCUpdate function
// this check here exists to ensure - we do not consider every
// PVC update event for resizing, just those where the PVC size changes
if newSize.Cmp(oldSize) > 0 {
pv, err := getPersistentVolume(newPVC, expc.pvLister)
if err != nil {
glog.V(5).Infof("Error getting Persistent Volume for pvc %q : %v", newPVC.UID, err)
return
}
expc.resizeMap.AddPVCUpdate(newPVC, pv)
}
expc.resizeMap.AddPVCUpdate(newPVC, pv)
}
func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) {

View File

@ -25,8 +25,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/controller/volume/expand/util"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -96,6 +96,8 @@ func markPVCResizeInProgress(pvcWithResizeRequest *cache.PVCWithResizeRequest, k
LastTransitionTime: metav1.Now(),
}
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvcWithResizeRequest.PVC.DeepCopy()
newPVC = util.MergeResizeConditionOnPVC(newPVC, conditions)
return util.UpdatePVCCondition(pvcWithResizeRequest.PVC, conditions, kubeClient)
return util.PatchPVCStatus(pvcWithResizeRequest.PVC /*oldPVC*/, newPVC, kubeClient)
}

View File

@ -1,46 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
)
// ClaimToClaimKey return namespace/name string for pvc
func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// UpdatePVCCondition updates pvc with given condition status
func UpdatePVCCondition(pvc *v1.PersistentVolumeClaim,
pvcConditions []v1.PersistentVolumeClaimCondition,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
claimClone := pvc.DeepCopy()
claimClone.Status.Conditions = pvcConditions
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone)
if updateErr != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", ClaimToClaimKey(pvc), updateErr)
return nil, updateErr
}
return updatedClaim, nil
}

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/persistentvolume/metrics:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
@ -31,6 +32,7 @@ go_library(
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
@ -41,6 +43,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
@ -71,14 +74,14 @@ go_test(
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
@ -95,6 +98,7 @@ go_test(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/storage/v1:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
@ -114,6 +118,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/volume/persistentvolume/metrics:all-srcs",
"//pkg/controller/volume/persistentvolume/options:all-srcs",
],
tags = ["automanaged"],

View File

@ -198,6 +198,32 @@ func TestSync(t *testing.T) {
newClaimArray("claim1-1", "uid1-1", "1Gi", "volume1-1", v1.ClaimBound, &classWait, annBoundByController, annBindCompleted),
noevents, noerrors, testSyncClaim,
},
{
// syncClaim binds pre-bound PVC only to the volume it points to,
// even if there is smaller volume available
"1-15 - successful prebound PVC",
[]*v1.PersistentVolume{
newVolume("volume1-15_1", "10Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classEmpty),
newVolume("volume1-15_2", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classEmpty),
},
[]*v1.PersistentVolume{
newVolume("volume1-15_1", "10Gi", "uid1-15", "claim1-15", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classEmpty, annBoundByController),
newVolume("volume1-15_2", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classEmpty),
},
newClaimArray("claim1-15", "uid1-15", "1Gi", "volume1-15_1", v1.ClaimPending, nil),
withExpectedCapacity("10Gi", newClaimArray("claim1-15", "uid1-15", "1Gi", "volume1-15_1", v1.ClaimBound, nil, annBindCompleted)),
noevents, noerrors, testSyncClaim,
},
{
// syncClaim does not bind pre-bound PVC to PV with different AccessMode
"1-16 - successful prebound PVC",
// PV has ReadWriteOnce
newVolumeArray("volume1-16", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classEmpty),
newVolumeArray("volume1-16", "1Gi", "", "", v1.VolumePending, v1.PersistentVolumeReclaimRetain, classEmpty),
claimWithAccessMode([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, newClaimArray("claim1-16", "uid1-16", "1Gi", "volume1-16", v1.ClaimPending, nil)),
claimWithAccessMode([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, newClaimArray("claim1-16", "uid1-16", "1Gi", "volume1-16", v1.ClaimPending, nil)),
noevents, noerrors, testSyncClaim,
},
// [Unit test set 2] User asked for a specific PV.
// Test the binding when pv.ClaimRef is already set by controller or
@ -598,7 +624,7 @@ func TestSync(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: classWait},
VolumeBindingMode: &modeWait,
},
})
}, []*v1.Pod{})
}
func TestSyncAlphaBlockVolume(t *testing.T) {
@ -750,7 +776,7 @@ func TestSyncAlphaBlockVolume(t *testing.T) {
}
defer utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
runSyncTests(t, tests, []*storage.StorageClass{})
runSyncTests(t, tests, []*storage.StorageClass{}, []*v1.Pod{})
}
// Test multiple calls to syncClaim/syncVolume and periodic sync of all

View File

@ -192,7 +192,7 @@ func TestDeleteSync(t *testing.T) {
},
},
}
runSyncTests(t, tests, []*storage.StorageClass{})
runSyncTests(t, tests, []*storage.StorageClass{}, []*v1.Pod{})
}
// Test multiple calls to syncClaim/syncVolume and periodic sync of all

View File

@ -41,6 +41,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
@ -48,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
)
// This is a unit test framework for persistent volume controller.
@ -609,6 +611,7 @@ func newTestController(kubeClient clientset.Interface, informerFactory informers
VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: informerFactory.Storage().V1().StorageClasses(),
PodInformer: informerFactory.Core().V1().Pods(),
EventRecorder: record.NewFakeRecorder(1000),
EnableDynamicProvisioning: enableDynamicProvisioning,
}
@ -802,6 +805,13 @@ func claimWithAnnotation(name, value string, claims []*v1.PersistentVolumeClaim)
return claims
}
// claimWithAccessMode saves given access into given claims.
// Meant to be used to compose claims specified inline in a test.
func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.PersistentVolumeClaim) []*v1.PersistentVolumeClaim {
claims[0].Spec.AccessModes = modes
return claims
}
func testSyncClaim(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return ctrl.syncClaim(test.initialClaims[0])
}
@ -932,7 +942,7 @@ func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReacto
// 2. Call the tested function (syncClaim/syncVolume) via
// controllerTest.testCall *once*.
// 3. Compare resulting volumes and claims with expected volumes and claims.
func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storage.StorageClass) {
func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storage.StorageClass, pods []*v1.Pod) {
for _, test := range tests {
glog.V(4).Infof("starting test %q", test.name)
@ -959,6 +969,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
}
ctrl.classLister = storagelisters.NewStorageClassLister(indexer)
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, pod := range pods {
podIndexer.Add(pod)
}
ctrl.podLister = corelisters.NewPodLister(podIndexer)
// Run the tested functions
err = test.test(ctrl, reactor, test)
if err != nil {
@ -1247,7 +1263,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) {
// Recycler interfaces
func (plugin *mockVolumePlugin) Recycle(pvName string, spec *vol.Spec, eventRecorder vol.RecycleEventRecorder) error {
func (plugin *mockVolumePlugin) Recycle(pvName string, spec *vol.Spec, eventRecorder recyclerclient.RecycleEventRecorder) error {
if len(plugin.recycleCalls) == 0 {
return fmt.Errorf("Mock plugin error: no recycleCalls configured")
}

View File

@ -28,7 +28,6 @@ import (
"k8s.io/client-go/tools/cache"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -169,6 +168,13 @@ func findMatchingVolume(
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
@ -314,7 +320,7 @@ func (pvIndex *persistentVolumeOrderedIndex) allPossibleMatchingAccessModes(requ
keys := pvIndex.store.ListIndexFuncValues("accessmodes")
for _, key := range keys {
indexedModes := v1helper.GetAccessModesFromString(key)
if volume.AccessModesContainedInAll(indexedModes, requestedModes) {
if volumeutil.AccessModesContainedInAll(indexedModes, requestedModes) {
matchedModes = append(matchedModes, indexedModes)
}
}

View File

@ -20,8 +20,6 @@ import (
"sort"
"testing"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -29,8 +27,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
func makePVC(size string, modfn func(*v1.PersistentVolumeClaim)) *v1.PersistentVolumeClaim {
@ -307,7 +304,7 @@ func TestAllPossibleAccessModes(t *testing.T) {
t.Errorf("Expected 3 arrays of modes that match RWO, but got %v", len(possibleModes))
}
for _, m := range possibleModes {
if !volume.AccessModesContains(m, v1.ReadWriteOnce) {
if !util.AccessModesContains(m, v1.ReadWriteOnce) {
t.Errorf("AccessModes does not contain %s", v1.ReadWriteOnce)
}
}
@ -316,7 +313,7 @@ func TestAllPossibleAccessModes(t *testing.T) {
if len(possibleModes) != 1 {
t.Errorf("Expected 1 array of modes that match RWX, but got %v", len(possibleModes))
}
if !volume.AccessModesContains(possibleModes[0], v1.ReadWriteMany) {
if !util.AccessModesContains(possibleModes[0], v1.ReadWriteMany) {
t.Errorf("AccessModes does not contain %s", v1.ReadWriteOnce)
}
@ -680,9 +677,8 @@ func createTestVolumes() []*v1.PersistentVolume {
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "affinity-pv",
Name: "affinity001",
Annotations: getAnnotationWithNodeAffinity("key1", "value1"),
UID: "affinity-pv",
Name: "affinity001",
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
@ -696,13 +692,13 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "affinity-pv2",
Name: "affinity002",
Annotations: getAnnotationWithNodeAffinity("key1", "value1"),
UID: "affinity-pv2",
Name: "affinity002",
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
@ -716,13 +712,13 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "affinity-prebound",
Name: "affinity003",
Annotations: getAnnotationWithNodeAffinity("key1", "value1"),
UID: "affinity-prebound",
Name: "affinity003",
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
@ -737,13 +733,13 @@ func createTestVolumes() []*v1.PersistentVolume {
},
StorageClassName: classWait,
ClaimRef: &v1.ObjectReference{Name: "claim02", Namespace: "myns"},
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
},
},
{
ObjectMeta: metav1.ObjectMeta{
UID: "affinity-pv3",
Name: "affinity003",
Annotations: getAnnotationWithNodeAffinity("key1", "value3"),
UID: "affinity-pv3",
Name: "affinity003",
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
@ -757,6 +753,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value3"),
},
},
}
@ -776,9 +773,9 @@ func testVolume(name, size string) *v1.PersistentVolume {
}
}
func getAnnotationWithNodeAffinity(key string, value string) map[string]string {
affinity := &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
func getVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity {
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
@ -792,14 +789,6 @@ func getAnnotationWithNodeAffinity(key string, value string) map[string]string {
},
},
}
annotations := map[string]string{}
err := helper.StorageNodeAffinityToAlphaAnnotation(annotations, affinity)
if err != nil {
glog.Fatalf("Failed to get node affinity annotation: %v", err)
}
return annotations
}
func createVolumeModeBlockTestVolume() *v1.PersistentVolume {
@ -854,18 +843,22 @@ func createTestVolOrderedIndex(pv *v1.PersistentVolume) persistentVolumeOrderedI
return volFile
}
func toggleBlockVolumeFeature(toggleFlag bool, t *testing.T) {
func toggleFeature(toggleFlag bool, featureName string, t *testing.T) {
var valueStr string
if toggleFlag {
// Enable alpha feature BlockVolume
err := utilfeature.DefaultFeatureGate.Set("BlockVolume=true")
// Enable feature
valueStr = featureName + "=true"
err := utilfeature.DefaultFeatureGate.Set(valueStr)
if err != nil {
t.Errorf("Failed to enable feature gate for BlockVolume: %v", err)
t.Errorf("Failed to enable feature gate for %s: %v", featureName, err)
return
}
} else {
err := utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
// Disable feature
valueStr = featureName + "=false"
err := utilfeature.DefaultFeatureGate.Set(valueStr)
if err != nil {
t.Errorf("Failed to disable feature gate for BlockVolume: %v", err)
t.Errorf("Failed to disable feature gate for %s: %v", featureName, err)
return
}
}
@ -935,7 +928,7 @@ func TestAlphaVolumeModeCheck(t *testing.T) {
}
for name, scenario := range scenarios {
toggleBlockVolumeFeature(scenario.enableBlock, t)
toggleFeature(scenario.enableBlock, "BlockVolume", t)
expectedMisMatch, err := checkVolumeModeMisMatches(&scenario.pvc.Spec, &scenario.vol.Spec)
if err != nil {
t.Errorf("Unexpected failure for checkVolumeModeMisMatches: %v", err)
@ -950,7 +943,7 @@ func TestAlphaVolumeModeCheck(t *testing.T) {
}
// make sure feature gate is turned off
toggleBlockVolumeFeature(false, t)
toggleFeature(false, "BlockVolume", t)
}
func TestAlphaFilteringVolumeModes(t *testing.T) {
@ -1028,7 +1021,7 @@ func TestAlphaFilteringVolumeModes(t *testing.T) {
}
for name, scenario := range scenarios {
toggleBlockVolumeFeature(scenario.enableBlock, t)
toggleFeature(scenario.enableBlock, "BlockVolume", t)
pvmatch, err := scenario.vol.findBestMatchForClaim(scenario.pvc, false)
// expected to match but either got an error or no returned pvmatch
if pvmatch == nil && scenario.isExpectedMatch {
@ -1047,7 +1040,135 @@ func TestAlphaFilteringVolumeModes(t *testing.T) {
}
// make sure feature gate is turned off
toggleBlockVolumeFeature(false, t)
toggleFeature(false, "BlockVolume", t)
}
func TestAlphaStorageObjectInUseProtectionFiltering(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pv1",
Annotations: map[string]string{},
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("1G")},
PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{}},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
}
pvToDelete := pv.DeepCopy()
now := metav1.Now()
pvToDelete.ObjectMeta.DeletionTimestamp = &now
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc1",
Namespace: "myns",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("1G")}},
},
}
satisfyingTestCases := map[string]struct {
isExpectedMatch bool
vol *v1.PersistentVolume
pvc *v1.PersistentVolumeClaim
enableStorageObjectInUseProtection bool
}{
"feature enabled - pv deletionTimeStamp not set": {
isExpectedMatch: true,
vol: pv,
pvc: pvc,
enableStorageObjectInUseProtection: true,
},
"feature enabled - pv deletionTimeStamp set": {
isExpectedMatch: false,
vol: pvToDelete,
pvc: pvc,
enableStorageObjectInUseProtection: true,
},
"feature disabled - pv deletionTimeStamp not set": {
isExpectedMatch: true,
vol: pv,
pvc: pvc,
enableStorageObjectInUseProtection: false,
},
"feature disabled - pv deletionTimeStamp set": {
isExpectedMatch: true,
vol: pvToDelete,
pvc: pvc,
enableStorageObjectInUseProtection: false,
},
}
for name, testCase := range satisfyingTestCases {
toggleFeature(testCase.enableStorageObjectInUseProtection, "StorageObjectInUseProtection", t)
err := checkVolumeSatisfyClaim(testCase.vol, testCase.pvc)
// expected to match but got an error
if err != nil && testCase.isExpectedMatch {
t.Errorf("%s: expected to match but got an error: %v", name, err)
}
// not expected to match but did
if err == nil && !testCase.isExpectedMatch {
t.Errorf("%s: not expected to match but did", name)
}
}
filteringTestCases := map[string]struct {
isExpectedMatch bool
vol persistentVolumeOrderedIndex
pvc *v1.PersistentVolumeClaim
enableStorageObjectInUseProtection bool
}{
"feature enabled - pv deletionTimeStamp not set": {
isExpectedMatch: true,
vol: createTestVolOrderedIndex(pv),
pvc: pvc,
enableStorageObjectInUseProtection: true,
},
"feature enabled - pv deletionTimeStamp set": {
isExpectedMatch: false,
vol: createTestVolOrderedIndex(pvToDelete),
pvc: pvc,
enableStorageObjectInUseProtection: true,
},
"feature disabled - pv deletionTimeStamp not set": {
isExpectedMatch: true,
vol: createTestVolOrderedIndex(pv),
pvc: pvc,
enableStorageObjectInUseProtection: false,
},
"feature disabled - pv deletionTimeStamp set": {
isExpectedMatch: true,
vol: createTestVolOrderedIndex(pvToDelete),
pvc: pvc,
enableStorageObjectInUseProtection: false,
},
}
for name, testCase := range filteringTestCases {
toggleFeature(testCase.enableStorageObjectInUseProtection, "StorageObjectInUseProtection", t)
pvmatch, err := testCase.vol.findBestMatchForClaim(testCase.pvc, false)
// expected to match but either got an error or no returned pvmatch
if pvmatch == nil && testCase.isExpectedMatch {
t.Errorf("Unexpected failure for testcase, no matching volume: %s", name)
}
if err != nil && testCase.isExpectedMatch {
t.Errorf("Unexpected failure for testcase: %s - %+v", name, err)
}
// expected to not match but either got an error or a returned pvmatch
if pvmatch != nil && !testCase.isExpectedMatch {
t.Errorf("Unexpected failure for testcase, expected no matching volume: %s", name)
}
if err != nil && !testCase.isExpectedMatch {
t.Errorf("Unexpected failure for testcase: %s - %+v", name, err)
}
}
// make sure feature gate is turned off
toggleFeature(false, "StorageObjectInUseProtection", t)
}
func TestFindingPreboundVolumes(t *testing.T) {

View File

@ -1,14 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand/util",
visibility = ["//visibility:public"],
srcs = ["metrics.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics",
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
@ -23,5 +27,4 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,184 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"k8s.io/api/core/v1"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const (
// Subsystem names.
pvControllerSubsystem = "pv_collector"
// Metric names.
boundPVKey = "bound_pv_count"
unboundPVKey = "unbound_pv_count"
boundPVCKey = "bound_pvc_count"
unboundPVCKey = "unbound_pvc_count"
// Label names.
namespaceLabel = "namespace"
storageClassLabel = "storage_class"
)
var registerMetrics sync.Once
// PVLister used to list persistent volumes.
type PVLister interface {
List() []interface{}
}
// PVCLister used to list persistent volume claims.
type PVCLister interface {
List() []interface{}
}
// Register all metrics for pv controller.
func Register(pvLister PVLister, pvcLister PVCLister) {
registerMetrics.Do(func() {
prometheus.MustRegister(newPVAndPVCCountCollector(pvLister, pvcLister))
})
}
func newPVAndPVCCountCollector(pvLister PVLister, pvcLister PVCLister) *pvAndPVCCountCollector {
return &pvAndPVCCountCollector{pvLister, pvcLister}
}
// Custom collector for current pod and container counts.
type pvAndPVCCountCollector struct {
// Cache for accessing information about PersistentVolumes.
pvLister PVLister
// Cache for accessing information about PersistentVolumeClaims.
pvcLister PVCLister
}
var (
boundPVCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", pvControllerSubsystem, boundPVKey),
"Gauge measuring number of persistent volume currently bound",
[]string{storageClassLabel}, nil)
unboundPVCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", pvControllerSubsystem, unboundPVKey),
"Gauge measuring number of persistent volume currently unbound",
[]string{storageClassLabel}, nil)
boundPVCCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", pvControllerSubsystem, boundPVCKey),
"Gauge measuring number of persistent volume claim currently bound",
[]string{namespaceLabel}, nil)
unboundPVCCountDesc = prometheus.NewDesc(
prometheus.BuildFQName("", pvControllerSubsystem, unboundPVCKey),
"Gauge measuring number of persistent volume claim currently unbound",
[]string{namespaceLabel}, nil)
)
func (collector *pvAndPVCCountCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- boundPVCountDesc
ch <- unboundPVCountDesc
ch <- boundPVCCountDesc
ch <- unboundPVCCountDesc
}
func (collector *pvAndPVCCountCollector) Collect(ch chan<- prometheus.Metric) {
collector.pvCollect(ch)
collector.pvcCollect(ch)
}
func (collector *pvAndPVCCountCollector) pvCollect(ch chan<- prometheus.Metric) {
boundNumberByStorageClass := make(map[string]int)
unboundNumberByStorageClass := make(map[string]int)
for _, pvObj := range collector.pvLister.List() {
pv, ok := pvObj.(*v1.PersistentVolume)
if !ok {
continue
}
if pv.Status.Phase == v1.VolumeBound {
boundNumberByStorageClass[pv.Spec.StorageClassName]++
} else {
unboundNumberByStorageClass[pv.Spec.StorageClassName]++
}
}
for storageClassName, number := range boundNumberByStorageClass {
metric, err := prometheus.NewConstMetric(
boundPVCountDesc,
prometheus.GaugeValue,
float64(number),
storageClassName)
if err != nil {
glog.Warningf("Create bound pv number metric failed: %v", err)
continue
}
ch <- metric
}
for storageClassName, number := range unboundNumberByStorageClass {
metric, err := prometheus.NewConstMetric(
unboundPVCountDesc,
prometheus.GaugeValue,
float64(number),
storageClassName)
if err != nil {
glog.Warningf("Create unbound pv number metric failed: %v", err)
continue
}
ch <- metric
}
}
func (collector *pvAndPVCCountCollector) pvcCollect(ch chan<- prometheus.Metric) {
boundNumberByNamespace := make(map[string]int)
unboundNumberByNamespace := make(map[string]int)
for _, pvcObj := range collector.pvcLister.List() {
pvc, ok := pvcObj.(*v1.PersistentVolumeClaim)
if !ok {
continue
}
if pvc.Status.Phase == v1.ClaimBound {
boundNumberByNamespace[pvc.Namespace]++
} else {
unboundNumberByNamespace[pvc.Namespace]++
}
}
for namespace, number := range boundNumberByNamespace {
metric, err := prometheus.NewConstMetric(
boundPVCCountDesc,
prometheus.GaugeValue,
float64(number),
namespace)
if err != nil {
glog.Warningf("Create bound pvc number metric failed: %v", err)
continue
}
ch <- metric
}
for namespace, number := range unboundNumberByNamespace {
metric, err := prometheus.NewConstMetric(
unboundPVCCountDesc,
prometheus.GaugeValue,
float64(number),
namespace)
if err != nil {
glog.Warningf("Create unbound pvc number metric failed: %v", err)
continue
}
ch <- metric
}
}

View File

@ -172,7 +172,7 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-6", "uid11-6", "1Gi", "volume11-6", v1.ClaimBound, &classGold, annBoundByController, annBindCompleted),
noevents, noerrors,
// No provisioning plugin confingure - makes the test fail when
// the controller errorneously tries to provision something
// the controller erroneously tries to provision something
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
},
{
@ -416,7 +416,7 @@ func TestProvisionSync(t *testing.T) {
noerrors, wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim),
},
}
runSyncTests(t, tests, storageClasses)
runSyncTests(t, tests, storageClasses, []*v1.Pod{})
}
// Test multiple calls to syncClaim/syncVolume and periodic sync of all

View File

@ -26,6 +26,8 @@ import (
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -43,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
"github.com/golang/glog"
)
@ -161,6 +164,8 @@ type PersistentVolumeController struct {
claimListerSynced cache.InformerSynced
classLister storagelisters.StorageClassLister
classListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
kubeClient clientset.Interface
eventRecorder record.EventRecorder
@ -233,24 +238,35 @@ func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClai
func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedSize := requestedQty.Value()
isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec)
if err != nil {
return fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
// check if PV's DeletionTimeStamp is set, if so, return error.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
return fmt.Errorf("the volume is marked for deletion")
}
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
volumeSize := volumeQty.Value()
if volumeSize < requestedSize {
return fmt.Errorf("Storage capacity of volume[%s] requested by claim[%v] is not enough", volume.Name, claimToClaimKey(claim))
return fmt.Errorf("requested PV is too small")
}
requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
return fmt.Errorf("Class of volume[%s] is not the same as claim[%v]", volume.Name, claimToClaimKey(claim))
return fmt.Errorf("storageClasseName does not match")
}
isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec)
if err != nil {
return fmt.Errorf("error checking volumeMode: %v", err)
}
if isMisMatch {
return fmt.Errorf("VolumeMode[%v] of volume[%s] is incompatible with VolumeMode[%v] of claim[%v]", volume.Spec.VolumeMode, volume.Name, claim.Spec.VolumeMode, claim.Name)
return fmt.Errorf("incompatible volumeMode")
}
if !checkAccessModes(claim, volume) {
return fmt.Errorf("incompatible accessMode")
}
return nil
@ -362,8 +378,9 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
glog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
//send a event
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, "Volume's size is smaller than requested or volume's class does not match with claim")
//send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
//volume does not satisfy the requirements of the claim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
@ -802,7 +819,7 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentV
// API server. The claim is not modified in this method!
func (ctrl *PersistentVolumeController) updateBindVolumeToClaim(volumeClone *v1.PersistentVolume, claim *v1.PersistentVolumeClaim, updateCache bool) (*v1.PersistentVolume, error) {
glog.V(2).Infof("claim %q bound to volume %q", claimToClaimKey(claim), volumeClone.Name)
newVol, err := ctrl.kubeClient.Core().PersistentVolumes().Update(volumeClone)
newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volumeClone.Name, claimToClaimKey(claim), err)
return newVol, err
@ -961,7 +978,7 @@ func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim
func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {
glog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Save the PV only when any modification is neccessary.
// Save the PV only when any modification is necessary.
volumeClone := volume.DeepCopy()
if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {
@ -1041,7 +1058,7 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
// so read current volume state now.
newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
if err != nil {
glog.V(3).Infof("error reading peristent volume %q: %v", volume.Name, err)
glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)
return
}
needsReclaim, err := ctrl.isVolumeReleased(newVolume)
@ -1053,6 +1070,17 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
glog.V(3).Infof("volume %q no longer needs recycling, skipping", volume.Name)
return
}
pods, used, err := ctrl.isVolumeUsed(newVolume)
if err != nil {
glog.V(3).Infof("can't recycle volume %q: %v", volume.Name, err)
return
}
if used {
msg := fmt.Sprintf("Volume is used by pods: %s", strings.Join(pods, ","))
glog.V(3).Infof("can't recycle volume %q: %s", volume.Name, msg)
ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeFailedRecycle, msg)
return
}
// Use the newest volume copy, this will save us from version conflicts on
// saving.
@ -1120,7 +1148,7 @@ func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) e
// read current volume state now.
newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
if err != nil {
glog.V(3).Infof("error reading peristent volume %q: %v", volume.Name, err)
glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)
return nil
}
needsReclaim, err := ctrl.isVolumeReleased(newVolume)
@ -1221,6 +1249,32 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo
return true, nil
}
// isVolumeUsed returns list of pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil {
return nil, false, nil
}
claimName := pv.Spec.ClaimRef.Name
podNames := sets.NewString()
pods, err := ctrl.podLister.Pods(pv.Spec.ClaimRef.Namespace).List(labels.Everything())
if err != nil {
return nil, false, fmt.Errorf("error listing pods: %s", err)
}
for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) {
continue
}
for i := range pod.Spec.Volumes {
usedPV := &pod.Spec.Volumes[i]
if usedPV.PersistentVolumeClaim != nil && usedPV.PersistentVolumeClaim.ClaimName == claimName {
podNames.Insert(pod.Namespace + "/" + pod.Name)
}
}
}
return podNames.List(), podNames.Len() != 0, nil
}
// doDeleteVolume finds appropriate delete plugin and deletes given volume. It
// returns 'true', when the volume was deleted and 'false' when the volume
// cannot be deleted because of the deleter is external. No error should be
@ -1250,7 +1304,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete")
err = deleter.Delete()
opComplete(err)
opComplete(&err)
if err != nil {
// Deleter failed
return false, err
@ -1373,7 +1427,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision()
opComplete(err)
opComplete(&err)
if err != nil {
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
@ -1496,7 +1550,7 @@ func (ctrl *PersistentVolumeController) scheduleOperation(operationName string,
// newRecyclerEventRecorder returns a RecycleEventRecorder that sends all events
// to given volume.
func (ctrl *PersistentVolumeController) newRecyclerEventRecorder(volume *v1.PersistentVolume) vol.RecycleEventRecorder {
func (ctrl *PersistentVolumeController) newRecyclerEventRecorder(volume *v1.PersistentVolume) recyclerclient.RecycleEventRecorder {
return func(eventtype, message string) {
ctrl.eventRecorder.Eventf(volume, eventtype, events.RecyclerPod, "Recycler pod: %s", message)
}

View File

@ -40,6 +40,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
@ -61,6 +62,7 @@ type ControllerParameters struct {
VolumeInformer coreinformers.PersistentVolumeInformer
ClaimInformer coreinformers.PersistentVolumeClaimInformer
ClassInformer storageinformers.StorageClassInformer
PodInformer coreinformers.PodInformer
EventRecorder record.EventRecorder
EnableDynamicProvisioning bool
}
@ -118,6 +120,8 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
controller.podLister = p.PodInformer.Lister()
controller.podListerSynced = p.PodInformer.Informer().HasSynced
return controller, nil
}
@ -262,9 +266,9 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
defer ctrl.volumeQueue.ShutDown()
glog.Infof("Starting persistent volume controller")
defer glog.Infof("Shutting down peristent volume controller")
defer glog.Infof("Shutting down persistent volume controller")
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced) {
return
}
@ -274,6 +278,8 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
metrics.Register(ctrl.volumes.store, ctrl.claims)
<-stopCh
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Test single call to syncVolume, expecting recycling to happen.
@ -29,6 +30,44 @@ import (
// 2. Call the syncVolume *once*.
// 3. Compare resulting volumes with expected volumes.
func TestRecycleSync(t *testing.T) {
runningPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "runningPod",
Namespace: testNamespace,
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "vol1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "runningClaim",
},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
pendingPod := runningPod.DeepCopy()
pendingPod.Name = "pendingPod"
pendingPod.Status.Phase = v1.PodPending
pendingPod.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = "pendingClaim"
completedPod := runningPod.DeepCopy()
completedPod.Name = "completedPod"
completedPod.Status.Phase = v1.PodSucceeded
completedPod.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = "completedClaim"
pods := []*v1.Pod{
runningPod,
pendingPod,
completedPod,
}
tests := []controllerTest{
{
// recycle volume bound by controller
@ -160,8 +199,38 @@ func TestRecycleSync(t *testing.T) {
noclaims,
[]string{"Warning VolumeUnknownReclaimPolicy"}, noerrors, testSyncVolume,
},
{
// volume is used by a running pod - failure expected
"6-11 - used by running pod",
newVolumeArray("volume6-11", "1Gi", "uid6-11", "runningClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
newVolumeArray("volume6-11", "1Gi", "uid6-11", "runningClaim", v1.VolumeReleased, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
noclaims,
noclaims,
[]string{"Normal VolumeFailedRecycle"}, noerrors, testSyncVolume,
},
{
// volume is used by a pending pod - failure expected
"6-12 - used by pending pod",
newVolumeArray("volume6-12", "1Gi", "uid6-12", "pendingClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
newVolumeArray("volume6-12", "1Gi", "uid6-12", "pendingClaim", v1.VolumeReleased, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
noclaims,
noclaims,
[]string{"Normal VolumeFailedRecycle"}, noerrors, testSyncVolume,
},
{
// volume is used by a completed pod - recycle succeeds
"6-13 - used by completed pod",
newVolumeArray("volume6-13", "1Gi", "uid6-13", "completedClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
newVolumeArray("volume6-13", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
noclaims,
noclaims,
noevents, noerrors,
// Inject recycler into the controller and call syncVolume. The
// recycler simulates one recycle() call that succeeds.
wrapTestWithReclaimCalls(operationRecycle, []error{nil}, testSyncVolume),
},
}
runSyncTests(t, tests, []*storage.StorageClass{})
runSyncTests(t, tests, []*storage.StorageClass{}, pods)
}
// Test multiple calls to syncClaim/syncVolume and periodic sync of all

View File

@ -43,7 +43,7 @@ type AssumeCache interface {
Get(objName string) (interface{}, error)
// List all the objects in the cache
List() []interface{}
List(indexObj interface{}) []interface{}
}
type errWrongType struct {
@ -89,7 +89,11 @@ type assumeCache struct {
description string
// Stores objInfo pointers
store cache.Store
store cache.Indexer
// Index function for object
indexFunc cache.IndexFunc
indexName string
}
type objInfo struct {
@ -111,9 +115,21 @@ func objInfoKeyFunc(obj interface{}) (string, error) {
return objInfo.name, nil
}
func NewAssumeCache(informer cache.SharedIndexInformer, description string) *assumeCache {
// TODO: index by storageclass
c := &assumeCache{store: cache.NewStore(objInfoKeyFunc), description: description}
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return []string{""}, &errWrongType{"objInfo", obj}
}
return c.indexFunc(objInfo.latestObj)
}
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) *assumeCache {
c := &assumeCache{
description: description,
indexFunc: indexFunc,
indexName: indexName,
}
c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc})
// Unit tests don't use informers
if informer != nil {
@ -211,12 +227,18 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
return objInfo.latestObj, nil
}
func (c *assumeCache) List() []interface{} {
func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.mutex.Lock()
defer c.mutex.Unlock()
allObjs := []interface{}{}
for _, obj := range c.store.List() {
objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
if err != nil {
glog.Errorf("list index error: %v", err)
return nil
}
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
glog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
@ -280,15 +302,22 @@ type PVAssumeCache interface {
AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
ListPVs() []*v1.PersistentVolume
ListPVs(storageClassName string) []*v1.PersistentVolume
}
type pvAssumeCache struct {
*assumeCache
}
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*v1.PersistentVolume); ok {
return []string{pv.Spec.StorageClassName}, nil
}
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
}
func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume")}
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
}
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
@ -304,8 +333,12 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil
}
func (c *pvAssumeCache) ListPVs() []*v1.PersistentVolume {
objs := c.List()
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName,
},
})
pvs := []*v1.PersistentVolume{}
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)

View File

@ -24,8 +24,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func makePV(name, version string) *v1.PersistentVolume {
return &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: version}}
func makePV(name, version, storageClass string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: version,
},
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClass,
},
}
}
func TestAssumePV(t *testing.T) {
@ -35,33 +43,38 @@ func TestAssumePV(t *testing.T) {
shouldSucceed bool
}{
"success-same-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: true,
},
"success-storageclass-same-version": {
oldPV: makePV("pv1", "5", "class1"),
newPV: makePV("pv1", "5", "class1"),
shouldSucceed: true,
},
"success-new-higher-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "6"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "6", ""),
shouldSucceed: true,
},
"fail-old-not-found": {
oldPV: makePV("pv2", "5"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv2", "5", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: false,
},
"fail-new-lower-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "4"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "4", ""),
shouldSucceed: false,
},
"fail-new-bad-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "a"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "a", ""),
shouldSucceed: false,
},
"fail-old-bad-version": {
oldPV: makePV("pv1", "a"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv1", "a", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: false,
},
}
@ -107,8 +120,8 @@ func TestRestorePV(t *testing.T) {
t.Fatalf("Failed to get internal cache")
}
oldPV := makePV("pv1", "5")
newPV := makePV("pv1", "5")
oldPV := makePV("pv1", "5", "")
newPV := makePV("pv1", "5", "")
// Restore PV that doesn't exist
cache.Restore("nothing")
@ -159,21 +172,21 @@ func TestBasicPVCache(t *testing.T) {
// Add a bunch of PVs
pvs := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "1")
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "")
pvs[pv.Name] = pv
internal_cache.add(pv)
}
// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")
// Update a PV
updatedPV := makePV("test-pv3", "2")
updatedPV := makePV("test-pv3", "2", "")
pvs[updatedPV.Name] = updatedPV
internal_cache.update(nil, updatedPV)
// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")
// Delete a PV
deletedPV := pvs["test-pv7"]
@ -181,11 +194,57 @@ func TestBasicPVCache(t *testing.T) {
internal_cache.delete(deletedPV)
// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume) {
pvList := cache.ListPVs()
func TestPVCacheWithStorageClasses(t *testing.T) {
cache := NewPVAssumeCache(nil)
internal_cache, ok := cache.(*pvAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
// Add a bunch of PVs
pvs1 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "class1")
pvs1[pv.Name] = pv
internal_cache.add(pv)
}
// Add a bunch of PVs
pvs2 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "1", "class2")
pvs2[pv.Name] = pv
internal_cache.add(pv)
}
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
// Update a PV
updatedPV := makePV("test-pv3", "2", "class1")
pvs1[updatedPV.Name] = updatedPV
internal_cache.update(nil, updatedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
// Delete a PV
deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name)
internal_cache.delete(deletedPV)
// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
}

View File

@ -350,10 +350,17 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))
allPVs := b.pvCache.ListPVs()
chosenPVs := map[string]*v1.PersistentVolume{}
for _, bindingInfo := range claimsToBind {
// Get storage class name from each PVC
storageClassName := ""
storageClass := bindingInfo.pvc.Spec.StorageClassName
if storageClass != nil {
storageClassName = *storageClass
}
allPVs := b.pvCache.ListPVs(storageClassName)
// Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
if err != nil {

View File

@ -86,7 +86,7 @@ func TestDeleteBindings(t *testing.T) {
// Get nil bindings
bindings := cache.GetBindings(pod, "node1")
if bindings != nil {
t.Errorf("Test failed: expected inital nil bindings, got %+v", bindings)
t.Errorf("Test failed: expected initial nil bindings, got %+v", bindings)
}
// Delete nothing

View File

@ -331,7 +331,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
},
}
if node != "" {
pv.Annotations = getAnnotationWithNodeAffinity("key1", node)
pv.Spec.NodeAffinity = getVolumeNodeAffinity("key1", node)
}
if boundToPVC != nil {

View File

@ -8,8 +8,8 @@ go_library(
deps = [
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/slice:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -27,8 +27,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["pvc_protection_controller_test.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/volume/util:go_default_library",

View File

@ -33,8 +33,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// Controller is controller that removes PVCProtectionFinalizer
@ -153,7 +153,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
return err
}
if volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc) {
if isDeletionCandidate(pvc) {
// PVC should be deleted. Check if it's used and remove finalizer if
// it's not.
isUsed, err := c.isBeingUsed(pvc)
@ -165,7 +165,7 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
}
}
if !volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc) {
if needToAddFinalizer(pvc) {
// PVC is not being deleted -> it should have the finalizer. The
// finalizer should be added by admission plugin, this is just to add
// the finalizer to old PVCs that were created before the admission
@ -177,10 +177,10 @@ func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
claimClone := pvc.DeepCopy()
volumeutil.AddProtectionFinalizer(claimClone)
claimClone.ObjectMeta.Finalizers = append(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer)
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
if err != nil {
glog.V(3).Infof("Error adding protection finalizer to PVC %s/%s: %v", pvc.Namespace, pvc.Name)
glog.V(3).Infof("Error adding protection finalizer to PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
return err
}
glog.V(3).Infof("Added protection finalizer to PVC %s/%s", pvc.Namespace, pvc.Name)
@ -189,7 +189,7 @@ func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
claimClone := pvc.DeepCopy()
volumeutil.RemoveProtectionFinalizer(claimClone)
claimClone.ObjectMeta.Finalizers = slice.RemoveString(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer, nil)
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
if err != nil {
glog.V(3).Infof("Error removing protection finalizer from PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
@ -213,7 +213,7 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
glog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
continue
}
if volumehelper.IsPodTerminated(pod, pod.Status) {
if volumeutil.IsPodTerminated(pod, pod.Status) {
// This pod is being unmounted/detached or is already
// unmounted/detached. It does not block the PVC from deletion.
continue
@ -247,7 +247,7 @@ func (c *Controller) pvcAddedUpdated(obj interface{}) {
}
glog.V(4).Infof("Got event on PVC %s", key)
if (!volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc)) || (volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc)) {
if needToAddFinalizer(pvc) || isDeletionCandidate(pvc) {
c.queue.Add(key)
}
}
@ -269,7 +269,7 @@ func (c *Controller) podAddedDeletedUpdated(obj interface{}, deleted bool) {
}
// Filter out pods that can't help us to remove a finalizer on PVC
if !deleted && !volumehelper.IsPodTerminated(pod, pod.Status) && pod.Spec.NodeName != "" {
if !deleted && !volumeutil.IsPodTerminated(pod, pod.Status) && pod.Spec.NodeName != "" {
return
}
@ -282,3 +282,11 @@ func (c *Controller) podAddedDeletedUpdated(obj interface{}, deleted bool) {
}
}
}
func isDeletionCandidate(pvc *v1.PersistentVolumeClaim) bool {
return pvc.ObjectMeta.DeletionTimestamp != nil && slice.ContainsString(pvc.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer, nil)
}
func needToAddFinalizer(pvc *v1.PersistentVolumeClaim) bool {
return pvc.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(pvc.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer, nil)
}

View File

@ -0,0 +1,59 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["pv_protection_controller.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/slice:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["pv_protection_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)

View File

@ -0,0 +1,208 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pvprotection
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// Controller is controller that removes PVProtectionFinalizer
// from PVs that are not bound to PVCs.
type Controller struct {
client clientset.Interface
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
// NewPVProtectionController returns a new *Controller.
func NewPVProtectionController(pvInformer coreinformers.PersistentVolumeInformer, cl clientset.Interface) *Controller {
e := &Controller{
client: cl,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvprotection"),
}
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolume_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
}
e.pvLister = pvInformer.Lister()
e.pvListerSynced = pvInformer.Informer().HasSynced
pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.pvAddedUpdated,
UpdateFunc: func(old, new interface{}) {
e.pvAddedUpdated(new)
},
})
return e
}
// Run runs the controller goroutines.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
glog.Infof("Starting PV protection controller")
defer glog.Infof("Shutting down PV protection controller")
if !controller.WaitForCacheSync("PV protection", stopCh, c.pvListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
pvKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(pvKey)
pvName := pvKey.(string)
err := c.processPV(pvName)
if err == nil {
c.queue.Forget(pvKey)
return true
}
utilruntime.HandleError(fmt.Errorf("PV %v failed with : %v", pvKey, err))
c.queue.AddRateLimited(pvKey)
return true
}
func (c *Controller) processPV(pvName string) error {
glog.V(4).Infof("Processing PV %s", pvName)
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished processing PV %s (%v)", pvName, time.Now().Sub(startTime))
}()
pv, err := c.pvLister.Get(pvName)
if apierrs.IsNotFound(err) {
glog.V(4).Infof("PV %s not found, ignoring", pvName)
return nil
}
if err != nil {
return err
}
if isDeletionCandidate(pv) {
// PV should be deleted. Check if it's used and remove finalizer if
// it's not.
isUsed := c.isBeingUsed(pv)
if !isUsed {
return c.removeFinalizer(pv)
}
}
if needToAddFinalizer(pv) {
// PV is not being deleted -> it should have the finalizer. The
// finalizer should be added by admission plugin, this is just to add
// the finalizer to old PVs that were created before the admission
// plugin was enabled.
return c.addFinalizer(pv)
}
return nil
}
func (c *Controller) addFinalizer(pv *v1.PersistentVolume) error {
pvClone := pv.DeepCopy()
pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer)
_, err := c.client.CoreV1().PersistentVolumes().Update(pvClone)
if err != nil {
glog.V(3).Infof("Error adding protection finalizer to PV %s: %v", pv.Name)
return err
}
glog.V(3).Infof("Added protection finalizer to PV %s", pv.Name)
return nil
}
func (c *Controller) removeFinalizer(pv *v1.PersistentVolume) error {
pvClone := pv.DeepCopy()
pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
_, err := c.client.CoreV1().PersistentVolumes().Update(pvClone)
if err != nil {
glog.V(3).Infof("Error removing protection finalizer from PV %s: %v", pv.Name, err)
return err
}
glog.V(3).Infof("Removed protection finalizer from PV %s", pv.Name)
return nil
}
func (c *Controller) isBeingUsed(pv *v1.PersistentVolume) bool {
// check if PV is being bound to a PVC by its status
// the status will be updated by PV controller
if pv.Status.Phase == v1.VolumeBound {
// the PV is being used now
return true
}
return false
}
// pvAddedUpdated reacts to pv added/updated events
func (c *Controller) pvAddedUpdated(obj interface{}) {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj))
return
}
glog.V(4).Infof("Got event on PV %s", pv.Name)
if needToAddFinalizer(pv) || isDeletionCandidate(pv) {
c.queue.Add(pv.Name)
}
}
func isDeletionCandidate(pv *v1.PersistentVolume) bool {
return pv.ObjectMeta.DeletionTimestamp != nil && slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
}
func needToAddFinalizer(pv *v1.PersistentVolume) bool {
return pv.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
}

View File

@ -0,0 +1,257 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pvprotection
import (
"errors"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controller"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const defaultPVName = "default-pv"
type reaction struct {
verb string
resource string
reactorfn clienttesting.ReactionFunc
}
func pv() *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: defaultPVName,
},
}
}
func boundPV() *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: defaultPVName,
},
Status: v1.PersistentVolumeStatus{
Phase: v1.VolumeBound,
},
}
}
func withProtectionFinalizer(pv *v1.PersistentVolume) *v1.PersistentVolume {
pv.Finalizers = append(pv.Finalizers, volumeutil.PVProtectionFinalizer)
return pv
}
func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
i := 0
return func(action clienttesting.Action) (bool, runtime.Object, error) {
i++
if i <= failures {
// Update fails
update, ok := action.(clienttesting.UpdateAction)
if !ok {
t.Fatalf("Reactor got non-update action: %+v", action)
}
acc, _ := meta.Accessor(update.GetObject())
return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error"))
}
// Update succeeds
return false, nil, nil
}
}
func deleted(pv *v1.PersistentVolume) *v1.PersistentVolume {
pv.DeletionTimestamp = &metav1.Time{}
return pv
}
func TestPVProtectionController(t *testing.T) {
pvVer := schema.GroupVersionResource{
Group: v1.GroupName,
Version: "v1",
Resource: "persistentvolumes",
}
tests := []struct {
name string
// Object to insert into fake kubeclient before the test starts.
initialObjects []runtime.Object
// Optional client reactors.
reactors []reaction
// PV event to simulate. This PV will be automatically added to
// initalObjects.
updatedPV *v1.PersistentVolume
// List of expected kubeclient actions that should happen during the
// test.
expectedActions []clienttesting.Action
}{
// PV events
//
{
name: "PV without finalizer -> finalizer is added",
updatedPV: pv(),
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
},
},
{
name: "PVC with finalizer -> no action",
updatedPV: withProtectionFinalizer(pv()),
expectedActions: []clienttesting.Action{},
},
{
name: "saving PVC finalizer fails -> controller retries",
updatedPV: pv(),
reactors: []reaction{
{
verb: "update",
resource: "persistentvolumes",
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
},
},
expectedActions: []clienttesting.Action{
// This fails
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
// This fails too
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
// This succeeds
clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())),
},
},
{
name: "deleted PV with finalizer -> finalizer is removed",
updatedPV: deleted(withProtectionFinalizer(pv())),
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
},
},
{
name: "finalizer removal fails -> controller retries",
updatedPV: deleted(withProtectionFinalizer(pv())),
reactors: []reaction{
{
verb: "update",
resource: "persistentvolumes",
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
},
},
expectedActions: []clienttesting.Action{
// Fails
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
// Fails too
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
// Succeeds
clienttesting.NewUpdateAction(pvVer, "", deleted(pv())),
},
},
{
name: "deleted PVC with finalizer + PV is bound -> finalizer is not removed",
updatedPV: deleted(withProtectionFinalizer(boundPV())),
expectedActions: []clienttesting.Action{},
},
}
for _, test := range tests {
// Create client with initial data
objs := test.initialObjects
if test.updatedPV != nil {
objs = append(objs, test.updatedPV)
}
client := fake.NewSimpleClientset(objs...)
// Create informers
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
pvInformer := informers.Core().V1().PersistentVolumes()
// Populate the informers with initial objects so the controller can
// Get() it.
for _, obj := range objs {
switch obj.(type) {
case *v1.PersistentVolume:
pvInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Unknown initalObject type: %+v", obj)
}
}
// Add reactor to inject test errors.
for _, reactor := range test.reactors {
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
}
// Create the controller
ctrl := NewPVProtectionController(pvInformer, client)
// Start the test by simulating an event
if test.updatedPV != nil {
ctrl.pvAddedUpdated(test.updatedPV)
}
// Process the controller queue until we get expected results
timeout := time.Now().Add(10 * time.Second)
lastReportedActionCount := 0
for {
if time.Now().After(timeout) {
t.Errorf("Test %q: timed out", test.name)
break
}
if ctrl.queue.Len() > 0 {
glog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len())
ctrl.processNextWorkItem()
}
if ctrl.queue.Len() > 0 {
// There is still some work in the queue, process it now
continue
}
currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) {
// Do not log evey wait, only when the action count changes.
if lastReportedActionCount < currentActionCount {
glog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions))
lastReportedActionCount = currentActionCount
}
// The test expected more to happen, wait for the actions.
// Most probably it's exponential backoff
time.Sleep(10 * time.Millisecond)
continue
}
break
}
actions := client.Actions()
if !reflect.DeepEqual(actions, test.expectedActions) {
t.Errorf("Test %q: action not expected\nExpected:\n%s\ngot:\n%s", test.name, spew.Sdump(test.expectedActions), spew.Sdump(actions))
}
}
}