mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-13 09:50:18 +00:00
329 lines
11 KiB
Go
329 lines
11 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cloud
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"k8s.io/klog"
|
|
|
|
"k8s.io/api/core/v1"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
|
"k8s.io/kubernetes/pkg/features"
|
|
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
|
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
cloudprovider "k8s.io/cloud-provider"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
)
|
|
|
|
const initializerName = "pvlabel.kubernetes.io"
|
|
|
|
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
|
|
type PersistentVolumeLabelController struct {
|
|
cloud cloudprovider.Interface
|
|
kubeClient kubernetes.Interface
|
|
pvlController cache.Controller
|
|
pvlIndexer cache.Indexer
|
|
volumeLister corelisters.PersistentVolumeLister
|
|
|
|
syncHandler func(key string) error
|
|
|
|
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
|
|
queue workqueue.RateLimitingInterface
|
|
}
|
|
|
|
// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object
|
|
func NewPersistentVolumeLabelController(
|
|
kubeClient kubernetes.Interface,
|
|
cloud cloudprovider.Interface) *PersistentVolumeLabelController {
|
|
|
|
pvlc := &PersistentVolumeLabelController{
|
|
cloud: cloud,
|
|
kubeClient: kubeClient,
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"),
|
|
}
|
|
pvlc.syncHandler = pvlc.addLabelsAndAffinity
|
|
pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
options.IncludeUninitialized = true
|
|
return kubeClient.CoreV1().PersistentVolumes().List(options)
|
|
},
|
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
options.IncludeUninitialized = true
|
|
return kubeClient.CoreV1().PersistentVolumes().Watch(options)
|
|
},
|
|
},
|
|
&v1.PersistentVolume{},
|
|
0,
|
|
cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
key, err := cache.MetaNamespaceKeyFunc(obj)
|
|
if err == nil {
|
|
pvlc.queue.Add(key)
|
|
}
|
|
},
|
|
},
|
|
cache.Indexers{},
|
|
)
|
|
pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer)
|
|
|
|
return pvlc
|
|
}
|
|
|
|
// Run starts a controller that adds labels to persistent volumes
|
|
func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
defer pvlc.queue.ShutDown()
|
|
|
|
klog.Infof("Starting PersistentVolumeLabelController")
|
|
defer klog.Infof("Shutting down PersistentVolumeLabelController")
|
|
|
|
go pvlc.pvlController.Run(stopCh)
|
|
|
|
if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) {
|
|
return
|
|
}
|
|
|
|
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
|
|
for i := 0; i < threadiness; i++ {
|
|
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
|
|
// after one second
|
|
go wait.Until(pvlc.runWorker, time.Second, stopCh)
|
|
}
|
|
|
|
// wait until we're told to stop
|
|
<-stopCh
|
|
}
|
|
|
|
func (pvlc *PersistentVolumeLabelController) runWorker() {
|
|
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
|
|
// available, so we don't worry about secondary waits
|
|
for pvlc.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
|
func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool {
|
|
// pull the next work item from queue. It should be a key we use to lookup something in a cache
|
|
keyObj, quit := pvlc.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
// you always have to indicate to the queue that you've completed a piece of work
|
|
defer pvlc.queue.Done(keyObj)
|
|
|
|
key := keyObj.(string)
|
|
// do your work on the key. This method will contains your "do stuff" logic
|
|
err := pvlc.syncHandler(key)
|
|
if err == nil {
|
|
// if you had no error, tell the queue to stop tracking history for your key. This will
|
|
// reset things like failure counts for per-item rate limiting
|
|
pvlc.queue.Forget(key)
|
|
return true
|
|
}
|
|
|
|
// there was a failure so be sure to report it. This method allows for pluggable error handling
|
|
// which can be used for things like cluster-monitoring
|
|
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
|
|
|
|
// since we failed, we should requeue the item to work on later. This method will add a backoff
|
|
// to avoid hotlooping on particular items (they're probably still not going to work right away)
|
|
// and overall controller protection (everything I've done is broken, this controller needs to
|
|
// calm down or it can starve other useful work) cases.
|
|
pvlc.queue.AddRateLimited(key)
|
|
|
|
return true
|
|
}
|
|
|
|
// AddLabels adds appropriate labels to persistent volumes and sets the
|
|
// volume as available if successful.
|
|
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error {
|
|
_, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err)
|
|
}
|
|
volume, err := pvlc.volumeLister.Get(name)
|
|
if errors.IsNotFound(err) {
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("error getting volume %s from informer: %v", name, err)
|
|
}
|
|
|
|
return pvlc.addLabelsAndAffinityToVolume(volume)
|
|
}
|
|
|
|
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error {
|
|
var volumeLabels map[string]string
|
|
// Only add labels if the next pending initializer.
|
|
if needsInitialization(vol.Initializers, initializerName) {
|
|
if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok {
|
|
labels, err := labeler.GetLabelsForVolume(context.TODO(), vol)
|
|
if err != nil {
|
|
return fmt.Errorf("error querying volume %v: %v", vol.Spec, err)
|
|
}
|
|
volumeLabels = labels
|
|
} else {
|
|
klog.V(4).Info("cloud provider does not support PVLabeler")
|
|
}
|
|
return pvlc.updateVolume(vol, volumeLabels)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
|
|
volName := vol.Name
|
|
newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)
|
|
populateAffinity := utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) && len(volLabels) != 0
|
|
|
|
if newVolume.Labels == nil {
|
|
newVolume.Labels = make(map[string]string)
|
|
}
|
|
|
|
requirements := make([]v1.NodeSelectorRequirement, 0)
|
|
for k, v := range volLabels {
|
|
newVolume.Labels[k] = v
|
|
// Set NodeSelectorRequirements based on the labels
|
|
if populateAffinity {
|
|
var values []string
|
|
if k == kubeletapis.LabelZoneFailureDomain {
|
|
zones, err := volumeutil.LabelZonesToSet(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)
|
|
}
|
|
values = zones.List()
|
|
} else {
|
|
values = []string{v}
|
|
}
|
|
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
|
|
}
|
|
}
|
|
if populateAffinity {
|
|
if newVolume.Spec.NodeAffinity == nil {
|
|
newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
|
|
}
|
|
if newVolume.Spec.NodeAffinity.Required == nil {
|
|
newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector)
|
|
}
|
|
if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
|
|
// Need at least one term pre-allocated whose MatchExpressions can be appended to
|
|
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
|
|
}
|
|
// Populate NodeAffinity with requirements if there are no conflicting keys found
|
|
if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
|
|
klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
|
|
requirements, newVolume.Spec.NodeAffinity)
|
|
} else {
|
|
for _, req := range requirements {
|
|
for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms {
|
|
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName)
|
|
klog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name)
|
|
|
|
oldData, err := json.Marshal(vol)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err)
|
|
}
|
|
|
|
newData, err := json.Marshal(newVolume)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err)
|
|
}
|
|
|
|
patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err)
|
|
}
|
|
return patch, nil
|
|
}
|
|
|
|
func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error {
|
|
volName := vol.Name
|
|
klog.V(4).Infof("updating PersistentVolume %s", volName)
|
|
patchBytes, err := pvlc.createPatch(vol, volLabels)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = pvlc.kubeClient.CoreV1().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err)
|
|
}
|
|
klog.V(4).Infof("updated PersistentVolume %s", volName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers {
|
|
if initializers == nil {
|
|
return nil
|
|
}
|
|
|
|
var updated []metav1.Initializer
|
|
for _, pending := range initializers.Pending {
|
|
if pending.Name != name {
|
|
updated = append(updated, pending)
|
|
}
|
|
}
|
|
if len(updated) == len(initializers.Pending) {
|
|
return initializers
|
|
}
|
|
if len(updated) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return &metav1.Initializers{Pending: updated}
|
|
}
|
|
|
|
// needsInitialization checks whether or not the PVL is the next pending initializer.
|
|
func needsInitialization(initializers *metav1.Initializers, name string) bool {
|
|
if initializers == nil {
|
|
return false
|
|
}
|
|
|
|
if len(initializers.Pending) == 0 {
|
|
return false
|
|
}
|
|
|
|
// There is at least one initializer still pending so check to
|
|
// see if the PVL is the next in line.
|
|
return initializers.Pending[0].Name == name
|
|
}
|