Migrate from snapClient.VolumesnapshotV1alpha1Client to

snapClient.SnapshotV1alpha1Client and also update kube dependency

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2019-06-24 14:38:09 +05:30
committed by mergify[bot]
parent 3bc6771df8
commit 22ff5c0911
1031 changed files with 34242 additions and 177906 deletions

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
)
// patch patches service's Status or ObjectMeta given the origin and
// updated ones. Change to spec will be ignored.
func patch(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) {
// Reset spec to make sure only patch for Status or ObjectMeta.
newSvc.Spec = oldSvc.Spec
patchBytes, err := getPatchBytes(oldSvc, newSvc)
if err != nil {
return nil, err
}
return c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status")
}
func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) {
oldData, err := json.Marshal(oldSvc)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
}
newData, err := json.Marshal(newSvc)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{})
if err != nil {
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
}
return patchBytes, nil
}

View File

@ -24,7 +24,7 @@ import (
"reflect"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -39,11 +39,13 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
)
const (
@ -135,15 +137,28 @@ func New(
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
AddFunc: func(cur interface{}) {
svc, ok := cur.(*v1.Service)
if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
s.enqueueService(cur)
}
},
DeleteFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
s.enqueueService(cur)
}
},
DeleteFunc: func(old interface{}) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
// No need to handle deletion event if finalizer feature gate is
// enabled. Because the deletion would be handled by the update
// path when the deletion timestamp is added.
return
}
s.enqueueService(old)
},
},
serviceSyncPeriod,
)
@ -160,7 +175,7 @@ func New(
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
s.queue.Add(key)
@ -235,129 +250,112 @@ func (s *ServiceController) init() error {
return nil
}
// processServiceUpdate operates loadbalancers for the incoming service accordingly.
// processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
// Returns an error if processing the service update failed.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
if cachedService.state != nil {
if cachedService.state.UID != service.UID {
err := s.processLoadBalancerDelete(cachedService, key)
if err != nil {
return err
}
func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, key string) error {
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
// path. Ref https://github.com/kubernetes/enhancements/issues/980.
cachedService := s.cache.getOrCreate(key)
if cachedService.state != nil && cachedService.state.UID != service.UID {
// This happens only when a service is deleted and re-created
// in a short period, which is only possible when it doesn't
// contain finalizer.
if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
return err
}
}
// cache the service, we need the info for service deletion
// Always cache the service, we need the info for service deletion in case
// when load balancer cleanup is not handled via finalizer.
cachedService.state = service
err := s.createLoadBalancerIfNeeded(key, service)
op, err := s.syncLoadBalancerIfNeeded(service, key)
if err != nil {
eventType := "CreatingLoadBalancerFailed"
message := "Error creating load balancer (will retry): "
if !wantsLoadBalancer(service) {
eventType = "CleanupLoadBalancerFailed"
message = "Error cleaning up load balancer (will retry): "
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, eventType, message)
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
return err
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
// processed it, a cached service being nil implies that it hasn't yet
// been successfully processed.
s.cache.set(key, cachedService)
if op == deleteLoadBalancer {
// Only delete the cache upon successful load balancer deletion.
s.cache.delete(key)
}
return nil
}
// createLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
type loadBalancerOperation int
const (
deleteLoadBalancer loadBalancerOperation = iota
ensureLoadBalancer
)
// syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
// doesn't want a loadbalancer no more. Returns whatever error occurred.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error {
func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key string) (loadBalancerOperation, error) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
previousState := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newState *v1.LoadBalancerStatus
previousStatus := v1helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newStatus *v1.LoadBalancerStatus
var op loadBalancerOperation
var err error
if !wantsLoadBalancer(service) {
if !wantsLoadBalancer(service) || needsCleanup(service) {
// Delete the load balancer if service no longer wants one, or if service needs cleanup.
op = deleteLoadBalancer
newStatus = &v1.LoadBalancerStatus{}
_, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err)
return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
}
if exists {
klog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
klog.V(2).Infof("Deleting existing load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
return err
return op, fmt.Errorf("failed to delete load balancer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
newState = &v1.LoadBalancerStatus{}
// Always try to remove finalizer when load balancer is deleted.
// It will be a no-op if finalizer does not exist.
// Note this also clears up finalizer if the cluster is downgraded
// from a version that attaches finalizer to a version that doesn't.
if err := s.removeFinalizer(service); err != nil {
return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
} else {
klog.V(2).Infof("Ensuring LB for service %s", key)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// Create or update the load balancer if service wants one.
op = ensureLoadBalancer
klog.V(2).Infof("Ensuring load balancer for service %s", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
newState, err = s.ensureLoadBalancer(service)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ServiceLoadBalancerFinalizer) {
// Always try to add finalizer prior to load balancer creation.
// It will be a no-op if finalizer already exists.
// Note this also retrospectively puts on finalizer if the cluster
// is upgraded from a version that doesn't attach finalizer to a
// version that does.
if err := s.addFinalizer(service); err != nil {
return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
}
}
newStatus, err = s.ensureLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
return op, fmt.Errorf("failed to ensure load balancer: %v", err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
// Make a copy so we don't mutate the shared informer cache
service = service.DeepCopy()
// Update the status on the copy
service.Status.LoadBalancer = *newState
if err := s.persistUpdate(service); err != nil {
// TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts.
if errors.IsConflict(err) {
return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err)
}
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
return nil
if err := s.patchStatus(service, previousStatus, newStatus); err != nil {
// Only retry error that isn't not found:
// - Not found error mostly happens when service disappears right after
// we remove the finalizer.
// - We can't patch status on non-exist service anyway.
if !errors.IsNotFound(err) {
return op, fmt.Errorf("failed to update load balancer status: %v", err)
}
} else {
klog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}
return nil
}
func (s *ServiceController) persistUpdate(service *v1.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service)
if err == nil {
return nil
}
// If the object no longer exists, we don't want to recreate it. Just bail
// out so that we can process the delete, which we should soon be receiving
// if we haven't already.
if errors.IsNotFound(err) {
klog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return nil
}
// TODO: Try to resolve the conflict if the change was unrelated to load
// balancer status. For now, just pass it up the stack.
if errors.IsConflict(err) {
return err
}
klog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
service.Namespace, service.Name, err)
time.Sleep(clientRetryInterval)
}
return err
return op, nil
}
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
@ -368,7 +366,7 @@ func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBal
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(nodes) == 0 {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer service %s/%s", service.Namespace, service.Name)
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
}
// - Only one protocol supported per service
@ -441,6 +439,12 @@ func (s *serviceCache) delete(serviceName string) {
delete(s.serviceMap, serviceName)
}
// needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
func needsCleanup(service *v1.Service) bool {
return service.ObjectMeta.DeletionTimestamp != nil && servicehelper.HasLBFinalizer(service)
}
// needsUpdate checks if load balancer needs to be updated due to change in attributes.
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false
@ -626,7 +630,7 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
func (s *ServiceController) nodeSyncLoop() {
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
klog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
return
}
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
@ -636,7 +640,7 @@ func (s *ServiceController) nodeSyncLoop() {
return
}
klog.Infof("Detected change in list of current cluster nodes. New node set: %v",
klog.V(2).Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
// Try updating all services, and save the ones that fail to try again next
@ -644,7 +648,7 @@ func (s *ServiceController) nodeSyncLoop() {
s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
klog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
s.knownHosts = newHosts
@ -660,7 +664,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
return
}
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
klog.Errorf("External error while updating load balancer: %v.", err)
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
servicesToRetry = append(servicesToRetry, service)
}
}()
@ -680,7 +684,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer service %s/%s", service.Namespace, service.Name)
s.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} else {
s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
}
@ -689,12 +693,12 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
// It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {
klog.Errorf("External error while checking if load balancer %q exists: name, %v", s.balancer.GetLoadBalancerName(context.TODO(), s.clusterName, service), err)
runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
} else if !exists {
return nil
}
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err
}
@ -711,7 +715,6 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
defer func() {
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()
@ -726,44 +729,88 @@ func (s *ServiceController) syncService(key string) error {
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
klog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
err = s.processServiceDeletion(key)
case err != nil:
klog.Infof("Unable to retrieve service %v from store: %v", key, err)
runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
default:
cachedService = s.cache.getOrCreate(key)
err = s.processServiceUpdate(cachedService, service, key)
err = s.processServiceCreateOrUpdate(service, key)
}
return err
}
// Returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry after that Duration.
func (s *ServiceController) processServiceDeletion(key string) error {
cachedService, ok := s.cache.get(key)
if !ok {
klog.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key)
// Cache does not contains the key means:
// - We didn't create a Load Balancer for the deleted service at all.
// - We already deleted the Load Balancer that was created for the service.
// In both cases we have nothing left to do.
return nil
}
return s.processLoadBalancerDelete(cachedService, key)
klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
return err
}
s.cache.delete(key)
return nil
}
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) error {
service := cachedService.state
func (s *ServiceController) processLoadBalancerDelete(service *v1.Service, key string) error {
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service)
if err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err)
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
return nil
}
// addFinalizer patches the service to add finalizer.
func (s *ServiceController) addFinalizer(service *v1.Service) error {
if servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}
// removeFinalizer patches the service to remove finalizer.
func (s *ServiceController) removeFinalizer(service *v1.Service) error {
if !servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer, nil)
klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}
// patchStatus patches the service with the given LoadBalancerStatus.
func (s *ServiceController) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
if v1helper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.Status.LoadBalancer = *newStatus
klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
_, err := patch(s.kubeClient.CoreV1(), service, updated)
return err
}