mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-07 20:39:30 +00:00
d300da19b7
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
850 lines
33 KiB
Go
850 lines
33 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kubectl
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
scaleclient "k8s.io/client-go/scale"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/kubernetes/pkg/kubectl/util"
|
|
deploymentutil "k8s.io/kubernetes/pkg/kubectl/util/deployment"
|
|
"k8s.io/kubernetes/pkg/kubectl/util/podutils"
|
|
"k8s.io/utils/integer"
|
|
)
|
|
|
|
func newInt32Ptr(val int) *int32 {
|
|
ret := int32(val)
|
|
return &ret
|
|
}
|
|
|
|
func valOrZero(val *int32) int32 {
|
|
if val == nil {
|
|
return int32(0)
|
|
}
|
|
return *val
|
|
}
|
|
|
|
const (
|
|
kubectlAnnotationPrefix = "kubectl.kubernetes.io/"
|
|
sourceIDAnnotation = kubectlAnnotationPrefix + "update-source-id"
|
|
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
|
|
originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas"
|
|
nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
|
|
)
|
|
|
|
// RollingUpdaterConfig is the configuration for a rolling deployment process.
|
|
type RollingUpdaterConfig struct {
|
|
// Out is a writer for progress output.
|
|
Out io.Writer
|
|
// OldRC is an existing controller to be replaced.
|
|
OldRc *corev1.ReplicationController
|
|
// NewRc is a controller that will take ownership of updated pods (will be
|
|
// created if needed).
|
|
NewRc *corev1.ReplicationController
|
|
// UpdatePeriod is the time to wait between individual pod updates.
|
|
UpdatePeriod time.Duration
|
|
// Interval is the time to wait between polling controller status after
|
|
// update.
|
|
Interval time.Duration
|
|
// Timeout is the time to wait for controller updates before giving up.
|
|
Timeout time.Duration
|
|
// MinReadySeconds is the number of seconds to wait after the pods are ready
|
|
MinReadySeconds int32
|
|
// CleanupPolicy defines the cleanup action to take after the deployment is
|
|
// complete.
|
|
CleanupPolicy RollingUpdaterCleanupPolicy
|
|
// MaxUnavailable is the maximum number of pods that can be unavailable during the update.
|
|
// Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
|
|
// Absolute number is calculated from percentage by rounding up.
|
|
// This can not be 0 if MaxSurge is 0.
|
|
// By default, a fixed value of 1 is used.
|
|
// Example: when this is set to 30%, the old RC can be scaled down to 70% of desired pods
|
|
// immediately when the rolling update starts. Once new pods are ready, old RC
|
|
// can be scaled down further, followed by scaling up the new RC, ensuring
|
|
// that the total number of pods available at all times during the update is at
|
|
// least 70% of desired pods.
|
|
MaxUnavailable intstr.IntOrString
|
|
// MaxSurge is the maximum number of pods that can be scheduled above the desired number of pods.
|
|
// Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
|
|
// This can not be 0 if MaxUnavailable is 0.
|
|
// Absolute number is calculated from percentage by rounding up.
|
|
// By default, a value of 1 is used.
|
|
// Example: when this is set to 30%, the new RC can be scaled up immediately
|
|
// when the rolling update starts, such that the total number of old and new pods do not exceed
|
|
// 130% of desired pods. Once old pods have been killed, new RC can be scaled up
|
|
// further, ensuring that total number of pods running at any time during
|
|
// the update is at most 130% of desired pods.
|
|
MaxSurge intstr.IntOrString
|
|
// OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or
|
|
// abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value
|
|
// is a synthetic "progress" calculation that represents the approximate percentage completion.
|
|
OnProgress func(oldRc, newRc *corev1.ReplicationController, percentage int) error
|
|
}
|
|
|
|
// RollingUpdaterCleanupPolicy is a cleanup action to take after the
|
|
// deployment is complete.
|
|
type RollingUpdaterCleanupPolicy string
|
|
|
|
const (
|
|
// DeleteRollingUpdateCleanupPolicy means delete the old controller.
|
|
DeleteRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Delete"
|
|
// PreserveRollingUpdateCleanupPolicy means keep the old controller.
|
|
PreserveRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Preserve"
|
|
// RenameRollingUpdateCleanupPolicy means delete the old controller, and rename
|
|
// the new controller to the name of the old controller.
|
|
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
|
|
)
|
|
|
|
// RollingUpdater provides methods for updating replicated pods in a predictable,
|
|
// fault-tolerant way.
|
|
type RollingUpdater struct {
|
|
rcClient corev1client.ReplicationControllersGetter
|
|
podClient corev1client.PodsGetter
|
|
scaleClient scaleclient.ScalesGetter
|
|
// Namespace for resources
|
|
ns string
|
|
// scaleAndWait scales a controller and returns its updated state.
|
|
scaleAndWait func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error)
|
|
//getOrCreateTargetController gets and validates an existing controller or
|
|
//makes a new one.
|
|
getOrCreateTargetController func(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error)
|
|
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
|
|
cleanup func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error
|
|
// getReadyPods returns the amount of old and new ready pods.
|
|
getReadyPods func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error)
|
|
// nowFn returns the current time used to calculate the minReadySeconds
|
|
nowFn func() metav1.Time
|
|
}
|
|
|
|
// NewRollingUpdater creates a RollingUpdater from a client.
|
|
func NewRollingUpdater(namespace string, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
|
|
updater := &RollingUpdater{
|
|
rcClient: rcClient,
|
|
podClient: podClient,
|
|
scaleClient: sc,
|
|
ns: namespace,
|
|
}
|
|
// Inject real implementations.
|
|
updater.scaleAndWait = updater.scaleAndWaitWithScaler
|
|
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
|
|
updater.getReadyPods = updater.readyPods
|
|
updater.cleanup = updater.cleanupWithClients
|
|
updater.nowFn = func() metav1.Time { return metav1.Now() }
|
|
return updater
|
|
}
|
|
|
|
// Update all pods for a ReplicationController (oldRc) by creating a new
|
|
// controller (newRc) with 0 replicas, and synchronously scaling oldRc and
|
|
// newRc until oldRc has 0 replicas and newRc has the original # of desired
|
|
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
|
|
//
|
|
// Each interval, the updater will attempt to make progress however it can
|
|
// without violating any availability constraints defined by the config. This
|
|
// means the amount scaled up or down each interval will vary based on the
|
|
// timeliness of readiness and the updater will always try to make progress,
|
|
// even slowly.
|
|
//
|
|
// If an update from newRc to oldRc is already in progress, we attempt to
|
|
// drive it to completion. If an error occurs at any step of the update, the
|
|
// error will be returned.
|
|
//
|
|
// A scaling event (either up or down) is considered progress; if no progress
|
|
// is made within the config.Timeout, an error is returned.
|
|
//
|
|
// TODO: make this handle performing a rollback of a partially completed
|
|
// rollout.
|
|
func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
|
|
out := config.Out
|
|
oldRc := config.OldRc
|
|
scaleRetryParams := NewRetryParams(config.Interval, config.Timeout)
|
|
|
|
// Find an existing controller (for continuing an interrupted update) or
|
|
// create a new one if necessary.
|
|
sourceID := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID)
|
|
newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if existed {
|
|
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name)
|
|
} else {
|
|
fmt.Fprintf(out, "Created %s\n", newRc.Name)
|
|
}
|
|
// Extract the desired replica count from the controller.
|
|
desiredAnnotation, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation])
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
|
|
newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation])
|
|
}
|
|
desired := int32(desiredAnnotation)
|
|
// Extract the original replica count from the old controller, adding the
|
|
// annotation if it doesn't yet exist.
|
|
_, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation]
|
|
if !hasOriginalAnnotation {
|
|
existing, err := r.rcClient.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
originReplicas := strconv.Itoa(int(valOrZero(existing.Spec.Replicas)))
|
|
applyUpdate := func(rc *corev1.ReplicationController) {
|
|
if rc.Annotations == nil {
|
|
rc.Annotations = map[string]string{}
|
|
}
|
|
rc.Annotations[originalReplicasAnnotation] = originReplicas
|
|
}
|
|
if oldRc, err = updateRcWithRetries(r.rcClient, existing.Namespace, existing, applyUpdate); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
|
|
// that can be unavailable during a rollout.
|
|
maxSurge, maxUnavailable, err := deploymentutil.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Validate maximums.
|
|
if desired > 0 && maxUnavailable == 0 && maxSurge == 0 {
|
|
return fmt.Errorf("one of maxSurge or maxUnavailable must be specified")
|
|
}
|
|
// The minimum pods which must remain available throughout the update
|
|
// calculated for internal convenience.
|
|
minAvailable := int32(integer.IntMax(0, int(desired-maxUnavailable)))
|
|
// If the desired new scale is 0, then the max unavailable is necessarily
|
|
// the effective scale of the old RC regardless of the configuration
|
|
// (equivalent to 100% maxUnavailable).
|
|
if desired == 0 {
|
|
maxUnavailable = valOrZero(oldRc.Spec.Replicas)
|
|
minAvailable = 0
|
|
}
|
|
|
|
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
|
|
newRc.Name, valOrZero(newRc.Spec.Replicas), desired, oldRc.Name, valOrZero(oldRc.Spec.Replicas), minAvailable, desired+maxSurge)
|
|
|
|
// give a caller incremental notification and allow them to exit early
|
|
goal := desired - valOrZero(newRc.Spec.Replicas)
|
|
if goal < 0 {
|
|
goal = -goal
|
|
}
|
|
progress := func(complete bool) error {
|
|
if config.OnProgress == nil {
|
|
return nil
|
|
}
|
|
progress := desired - valOrZero(newRc.Spec.Replicas)
|
|
if progress < 0 {
|
|
progress = -progress
|
|
}
|
|
percentage := 100
|
|
if !complete && goal > 0 {
|
|
percentage = int((goal - progress) * 100 / goal)
|
|
}
|
|
return config.OnProgress(oldRc, newRc, percentage)
|
|
}
|
|
|
|
// Scale newRc and oldRc until newRc has the desired number of replicas and
|
|
// oldRc has 0 replicas.
|
|
progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
|
|
for valOrZero(newRc.Spec.Replicas) != desired || valOrZero(oldRc.Spec.Replicas) != 0 {
|
|
// Store the existing replica counts for progress timeout tracking.
|
|
newReplicas := valOrZero(newRc.Spec.Replicas)
|
|
oldReplicas := valOrZero(oldRc.Spec.Replicas)
|
|
|
|
// Scale up as much as possible.
|
|
scaledRc, err := r.scaleUp(newRc, oldRc, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newRc = scaledRc
|
|
|
|
// notify the caller if necessary
|
|
if err := progress(false); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait between scaling operations for things to settle.
|
|
time.Sleep(config.UpdatePeriod)
|
|
|
|
// Scale down as much as possible.
|
|
scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
oldRc = scaledRc
|
|
|
|
// notify the caller if necessary
|
|
if err := progress(false); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we are making progress, continue to advance the progress deadline.
|
|
// Otherwise, time out with an error.
|
|
progressMade := (valOrZero(newRc.Spec.Replicas) != newReplicas) || (valOrZero(oldRc.Spec.Replicas) != oldReplicas)
|
|
if progressMade {
|
|
progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
|
|
} else if time.Now().UnixNano() > progressDeadline {
|
|
return fmt.Errorf("timed out waiting for any update progress to be made")
|
|
}
|
|
}
|
|
|
|
// notify the caller if necessary
|
|
if err := progress(true); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Housekeeping and cleanup policy execution.
|
|
return r.cleanup(oldRc, newRc, config)
|
|
}
|
|
|
|
// scaleUp scales up newRc to desired by whatever increment is possible given
|
|
// the configured surge threshold. scaleUp will safely no-op as necessary when
|
|
// it detects redundancy or other relevant conditions.
|
|
func (r *RollingUpdater) scaleUp(newRc, oldRc *corev1.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
|
|
// If we're already at the desired, do nothing.
|
|
if valOrZero(newRc.Spec.Replicas) == desired {
|
|
return newRc, nil
|
|
}
|
|
|
|
// Scale up as far as we can based on the surge limit.
|
|
increment := (desired + maxSurge) - (valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas))
|
|
// If the old is already scaled down, go ahead and scale all the way up.
|
|
if valOrZero(oldRc.Spec.Replicas) == 0 {
|
|
increment = desired - valOrZero(newRc.Spec.Replicas)
|
|
}
|
|
// We can't scale up without violating the surge limit, so do nothing.
|
|
if increment <= 0 {
|
|
return newRc, nil
|
|
}
|
|
// Increase the replica count, and deal with fenceposts.
|
|
nextVal := valOrZero(newRc.Spec.Replicas) + increment
|
|
newRc.Spec.Replicas = &nextVal
|
|
if valOrZero(newRc.Spec.Replicas) > desired {
|
|
newRc.Spec.Replicas = &desired
|
|
}
|
|
// Perform the scale-up.
|
|
fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, valOrZero(newRc.Spec.Replicas))
|
|
scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return scaledRc, nil
|
|
}
|
|
|
|
// scaleDown scales down oldRc to 0 at whatever decrement possible given the
|
|
// thresholds defined on the config. scaleDown will safely no-op as necessary
|
|
// when it detects redundancy or other relevant conditions.
|
|
func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
|
|
// Already scaled down; do nothing.
|
|
if valOrZero(oldRc.Spec.Replicas) == 0 {
|
|
return oldRc, nil
|
|
}
|
|
// Get ready pods. We shouldn't block, otherwise in case both old and new
|
|
// pods are unavailable then the rolling update process blocks.
|
|
// Timeout-wise we are already covered by the progress check.
|
|
_, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The old controller is considered as part of the total because we want to
|
|
// maintain minimum availability even with a volatile old controller.
|
|
// Scale down as much as possible while maintaining minimum availability
|
|
allPods := valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas)
|
|
newUnavailable := valOrZero(newRc.Spec.Replicas) - newAvailable
|
|
decrement := allPods - minAvailable - newUnavailable
|
|
// The decrement normally shouldn't drop below 0 because the available count
|
|
// always starts below the old replica count, but the old replica count can
|
|
// decrement due to externalities like pods death in the replica set. This
|
|
// will be considered a transient condition; do nothing and try again later
|
|
// with new readiness values.
|
|
//
|
|
// If the most we can scale is 0, it means we can't scale down without
|
|
// violating the minimum. Do nothing and try again later when conditions may
|
|
// have changed.
|
|
if decrement <= 0 {
|
|
return oldRc, nil
|
|
}
|
|
// Reduce the replica count, and deal with fenceposts.
|
|
nextOldVal := valOrZero(oldRc.Spec.Replicas) - decrement
|
|
oldRc.Spec.Replicas = &nextOldVal
|
|
if valOrZero(oldRc.Spec.Replicas) < 0 {
|
|
oldRc.Spec.Replicas = newInt32Ptr(0)
|
|
}
|
|
// If the new is already fully scaled and available up to the desired size, go
|
|
// ahead and scale old all the way down.
|
|
if valOrZero(newRc.Spec.Replicas) == desired && newAvailable == desired {
|
|
oldRc.Spec.Replicas = newInt32Ptr(0)
|
|
}
|
|
// Perform the scale-down.
|
|
fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, valOrZero(oldRc.Spec.Replicas))
|
|
retryWait := &RetryParams{config.Interval, config.Timeout}
|
|
scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return scaledRc, nil
|
|
}
|
|
|
|
// scalerScaleAndWait scales a controller using a Scaler and a real client.
|
|
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
|
|
scaler := NewScaler(r.scaleClient)
|
|
if err := scaler.Scale(rc.Namespace, rc.Name, uint(valOrZero(rc.Spec.Replicas)), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{})
|
|
}
|
|
|
|
// readyPods returns the old and new ready counts for their pods.
|
|
// If a pod is observed as being ready, it's considered ready even
|
|
// if it later becomes notReady.
|
|
func (r *RollingUpdater) readyPods(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
|
|
controllers := []*corev1.ReplicationController{oldRc, newRc}
|
|
oldReady := int32(0)
|
|
newReady := int32(0)
|
|
if r.nowFn == nil {
|
|
r.nowFn = func() metav1.Time { return metav1.Now() }
|
|
}
|
|
|
|
for i := range controllers {
|
|
controller := controllers[i]
|
|
selector := labels.Set(controller.Spec.Selector).AsSelector()
|
|
options := metav1.ListOptions{LabelSelector: selector.String()}
|
|
pods, err := r.podClient.Pods(controller.Namespace).List(options)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
for _, v1Pod := range pods.Items {
|
|
// Do not count deleted pods as ready
|
|
if v1Pod.DeletionTimestamp != nil {
|
|
continue
|
|
}
|
|
if !podutils.IsPodAvailable(&v1Pod, minReadySeconds, r.nowFn()) {
|
|
continue
|
|
}
|
|
switch controller.Name {
|
|
case oldRc.Name:
|
|
oldReady++
|
|
case newRc.Name:
|
|
newReady++
|
|
}
|
|
}
|
|
}
|
|
return oldReady, newReady, nil
|
|
}
|
|
|
|
// getOrCreateTargetControllerWithClient looks for an existing controller with
|
|
// sourceID. If found, the existing controller is returned with true
|
|
// indicating that the controller already exists. If the controller isn't
|
|
// found, a new one is created and returned along with false indicating the
|
|
// controller was created.
|
|
//
|
|
// Existing controllers are validated to ensure their sourceIDAnnotation
|
|
// matches sourceID; if there's a mismatch, an error is returned.
|
|
func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error) {
|
|
existingRc, err := r.existingController(controller)
|
|
if err != nil {
|
|
if !errors.IsNotFound(err) {
|
|
// There was an error trying to find the controller; don't assume we
|
|
// should create it.
|
|
return nil, false, err
|
|
}
|
|
if valOrZero(controller.Spec.Replicas) <= 0 {
|
|
return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d", controller.Name, valOrZero(controller.Spec.Replicas))
|
|
}
|
|
// The controller wasn't found, so create it.
|
|
if controller.Annotations == nil {
|
|
controller.Annotations = map[string]string{}
|
|
}
|
|
controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", valOrZero(controller.Spec.Replicas))
|
|
controller.Annotations[sourceIDAnnotation] = sourceID
|
|
controller.Spec.Replicas = newInt32Ptr(0)
|
|
newRc, err := r.rcClient.ReplicationControllers(r.ns).Create(controller)
|
|
return newRc, false, err
|
|
}
|
|
// Validate and use the existing controller.
|
|
annotations := existingRc.Annotations
|
|
source := annotations[sourceIDAnnotation]
|
|
_, ok := annotations[desiredReplicasAnnotation]
|
|
if source != sourceID || !ok {
|
|
return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceID, annotations)
|
|
}
|
|
return existingRc, true, nil
|
|
}
|
|
|
|
// existingController verifies if the controller already exists
|
|
func (r *RollingUpdater) existingController(controller *corev1.ReplicationController) (*corev1.ReplicationController, error) {
|
|
// without rc name but generate name, there's no existing rc
|
|
if len(controller.Name) == 0 && len(controller.GenerateName) > 0 {
|
|
return nil, errors.NewNotFound(corev1.Resource("replicationcontrollers"), controller.Name)
|
|
}
|
|
// controller name is required to get rc back
|
|
return r.rcClient.ReplicationControllers(controller.Namespace).Get(controller.Name, metav1.GetOptions{})
|
|
}
|
|
|
|
// cleanupWithClients performs cleanup tasks after the rolling update. Update
|
|
// process related annotations are removed from oldRc and newRc. The
|
|
// CleanupPolicy on config is executed.
|
|
func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
|
|
// Clean up annotations
|
|
var err error
|
|
newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
applyUpdate := func(rc *corev1.ReplicationController) {
|
|
delete(rc.Annotations, sourceIDAnnotation)
|
|
delete(rc.Annotations, desiredReplicasAnnotation)
|
|
}
|
|
if newRc, err = updateRcWithRetries(r.rcClient, r.ns, newRc, applyUpdate); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = wait.Poll(config.Interval, config.Timeout, ControllerHasDesiredReplicas(r.rcClient, newRc)); err != nil {
|
|
return err
|
|
}
|
|
newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch config.CleanupPolicy {
|
|
case DeleteRollingUpdateCleanupPolicy:
|
|
// delete old rc
|
|
fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name)
|
|
return r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil)
|
|
case RenameRollingUpdateCleanupPolicy:
|
|
// delete old rc
|
|
fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name)
|
|
if err := r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil); err != nil {
|
|
return err
|
|
}
|
|
fmt.Fprintf(config.Out, "Renaming %s to %s\n", newRc.Name, oldRc.Name)
|
|
return Rename(r.rcClient, newRc, oldRc.Name)
|
|
case PreserveRollingUpdateCleanupPolicy:
|
|
return nil
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func Rename(c corev1client.ReplicationControllersGetter, rc *corev1.ReplicationController, newName string) error {
|
|
oldName := rc.Name
|
|
rc.Name = newName
|
|
rc.ResourceVersion = ""
|
|
// First delete the oldName RC and orphan its pods.
|
|
policy := metav1.DeletePropagationOrphan
|
|
err := c.ReplicationControllers(rc.Namespace).Delete(oldName, &metav1.DeleteOptions{PropagationPolicy: &policy})
|
|
if err != nil && !errors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
|
_, err := c.ReplicationControllers(rc.Namespace).Get(oldName, metav1.GetOptions{})
|
|
if err == nil {
|
|
return false, nil
|
|
} else if errors.IsNotFound(err) {
|
|
return true, nil
|
|
} else {
|
|
return false, err
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Then create the same RC with the new name.
|
|
_, err = c.ReplicationControllers(rc.Namespace).Create(rc)
|
|
return err
|
|
}
|
|
|
|
func LoadExistingNextReplicationController(c corev1client.ReplicationControllersGetter, namespace, newName string) (*corev1.ReplicationController, error) {
|
|
if len(newName) == 0 {
|
|
return nil, nil
|
|
}
|
|
newRc, err := c.ReplicationControllers(namespace).Get(newName, metav1.GetOptions{})
|
|
if err != nil && errors.IsNotFound(err) {
|
|
return nil, nil
|
|
}
|
|
return newRc, err
|
|
}
|
|
|
|
type NewControllerConfig struct {
|
|
Namespace string
|
|
OldName, NewName string
|
|
Image string
|
|
Container string
|
|
DeploymentKey string
|
|
PullPolicy corev1.PullPolicy
|
|
}
|
|
|
|
func CreateNewControllerFromCurrentController(rcClient corev1client.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*corev1.ReplicationController, error) {
|
|
containerIndex := 0
|
|
// load the old RC into the "new" RC
|
|
newRc, err := rcClient.ReplicationControllers(cfg.Namespace).Get(cfg.OldName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(cfg.Container) != 0 {
|
|
containerFound := false
|
|
|
|
for i, c := range newRc.Spec.Template.Spec.Containers {
|
|
if c.Name == cfg.Container {
|
|
containerIndex = i
|
|
containerFound = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !containerFound {
|
|
return nil, fmt.Errorf("container %s not found in pod", cfg.Container)
|
|
}
|
|
}
|
|
|
|
if len(newRc.Spec.Template.Spec.Containers) > 1 && len(cfg.Container) == 0 {
|
|
return nil, fmt.Errorf("must specify container to update when updating a multi-container pod")
|
|
}
|
|
|
|
if len(newRc.Spec.Template.Spec.Containers) == 0 {
|
|
return nil, fmt.Errorf("pod has no containers! (%v)", newRc)
|
|
}
|
|
newRc.Spec.Template.Spec.Containers[containerIndex].Image = cfg.Image
|
|
if len(cfg.PullPolicy) != 0 {
|
|
newRc.Spec.Template.Spec.Containers[containerIndex].ImagePullPolicy = cfg.PullPolicy
|
|
}
|
|
|
|
newHash, err := util.HashObject(newRc, codec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(cfg.NewName) == 0 {
|
|
cfg.NewName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
|
|
}
|
|
newRc.Name = cfg.NewName
|
|
|
|
newRc.Spec.Selector[cfg.DeploymentKey] = newHash
|
|
newRc.Spec.Template.Labels[cfg.DeploymentKey] = newHash
|
|
// Clear resource version after hashing so that identical updates get different hashes.
|
|
newRc.ResourceVersion = ""
|
|
return newRc, nil
|
|
}
|
|
|
|
func AbortRollingUpdate(c *RollingUpdaterConfig) error {
|
|
// Swap the controllers
|
|
tmp := c.OldRc
|
|
c.OldRc = c.NewRc
|
|
c.NewRc = tmp
|
|
|
|
if c.NewRc.Annotations == nil {
|
|
c.NewRc.Annotations = map[string]string{}
|
|
}
|
|
c.NewRc.Annotations[sourceIDAnnotation] = fmt.Sprintf("%s:%s", c.OldRc.Name, c.OldRc.UID)
|
|
|
|
// Use the original value since the replica count change from old to new
|
|
// could be asymmetric. If we don't know the original count, we can't safely
|
|
// roll back to a known good size.
|
|
originalSize, foundOriginal := tmp.Annotations[originalReplicasAnnotation]
|
|
if !foundOriginal {
|
|
return fmt.Errorf("couldn't find original replica count of %q", tmp.Name)
|
|
}
|
|
fmt.Fprintf(c.Out, "Setting %q replicas to %s\n", c.NewRc.Name, originalSize)
|
|
c.NewRc.Annotations[desiredReplicasAnnotation] = originalSize
|
|
c.CleanupPolicy = DeleteRollingUpdateCleanupPolicy
|
|
return nil
|
|
}
|
|
|
|
func GetNextControllerAnnotation(rc *corev1.ReplicationController) (string, bool) {
|
|
res, found := rc.Annotations[nextControllerAnnotation]
|
|
return res, found
|
|
}
|
|
|
|
func SetNextControllerAnnotation(rc *corev1.ReplicationController, name string) {
|
|
if rc.Annotations == nil {
|
|
rc.Annotations = map[string]string{}
|
|
}
|
|
rc.Annotations[nextControllerAnnotation] = name
|
|
}
|
|
|
|
func UpdateExistingReplicationController(rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, oldRc *corev1.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*corev1.ReplicationController, error) {
|
|
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
|
|
SetNextControllerAnnotation(oldRc, newName)
|
|
return AddDeploymentKeyToReplicationController(oldRc, rcClient, podClient, deploymentKey, deploymentValue, namespace, out)
|
|
}
|
|
|
|
// If we didn't need to update the controller for the deployment key, we still need to write
|
|
// the "next" controller.
|
|
applyUpdate := func(rc *corev1.ReplicationController) {
|
|
SetNextControllerAnnotation(rc, newName)
|
|
}
|
|
return updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate)
|
|
}
|
|
|
|
func AddDeploymentKeyToReplicationController(oldRc *corev1.ReplicationController, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*corev1.ReplicationController, error) {
|
|
var err error
|
|
// First, update the template label. This ensures that any newly created pods will have the new label
|
|
applyUpdate := func(rc *corev1.ReplicationController) {
|
|
if rc.Spec.Template.Labels == nil {
|
|
rc.Spec.Template.Labels = map[string]string{}
|
|
}
|
|
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
|
|
}
|
|
if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update all pods managed by the rc to have the new hash label, so they are correctly adopted
|
|
// TODO: extract the code from the label command and re-use it here.
|
|
selector := labels.SelectorFromSet(oldRc.Spec.Selector)
|
|
options := metav1.ListOptions{LabelSelector: selector.String()}
|
|
podList, err := podClient.Pods(namespace).List(options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for ix := range podList.Items {
|
|
pod := &podList.Items[ix]
|
|
applyUpdate := func(p *corev1.Pod) {
|
|
if p.Labels == nil {
|
|
p.Labels = map[string]string{
|
|
deploymentKey: deploymentValue,
|
|
}
|
|
} else {
|
|
p.Labels[deploymentKey] = deploymentValue
|
|
}
|
|
}
|
|
if pod, err = updatePodWithRetries(podClient, namespace, pod, applyUpdate); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if oldRc.Spec.Selector == nil {
|
|
oldRc.Spec.Selector = map[string]string{}
|
|
}
|
|
// Copy the old selector, so that we can scrub out any orphaned pods
|
|
selectorCopy := map[string]string{}
|
|
for k, v := range oldRc.Spec.Selector {
|
|
selectorCopy[k] = v
|
|
}
|
|
applyUpdate = func(rc *corev1.ReplicationController) {
|
|
rc.Spec.Selector[deploymentKey] = deploymentValue
|
|
}
|
|
// Update the selector of the rc so it manages all the pods we updated above
|
|
if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
|
|
// doesn't see the update to its pod template and creates a new pod with the old labels after
|
|
// we've finished re-adopting existing pods to the rc.
|
|
selector = labels.SelectorFromSet(selectorCopy)
|
|
options = metav1.ListOptions{LabelSelector: selector.String()}
|
|
if podList, err = podClient.Pods(namespace).List(options); err != nil {
|
|
return nil, err
|
|
}
|
|
for ix := range podList.Items {
|
|
pod := &podList.Items[ix]
|
|
if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue {
|
|
if err := podClient.Pods(namespace).Delete(pod.Name, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return oldRc, nil
|
|
}
|
|
|
|
type updateRcFunc func(controller *corev1.ReplicationController)
|
|
|
|
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
|
|
// 1. Get latest resource
|
|
// 2. applyUpdate
|
|
// 3. Update the resource
|
|
func updateRcWithRetries(rcClient corev1client.ReplicationControllersGetter, namespace string, rc *corev1.ReplicationController, applyUpdate updateRcFunc) (*corev1.ReplicationController, error) {
|
|
// Deep copy the rc in case we failed on Get during retry loop
|
|
oldRc := rc.DeepCopy()
|
|
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
|
|
// Apply the update, then attempt to push it to the apiserver.
|
|
applyUpdate(rc)
|
|
if rc, e = rcClient.ReplicationControllers(namespace).Update(rc); e == nil {
|
|
// rc contains the latest controller post update
|
|
return
|
|
}
|
|
updateErr := e
|
|
// Update the controller with the latest resource version, if the update failed we
|
|
// can't trust rc so use oldRc.Name.
|
|
if rc, e = rcClient.ReplicationControllers(namespace).Get(oldRc.Name, metav1.GetOptions{}); e != nil {
|
|
// The Get failed: Value in rc cannot be trusted.
|
|
rc = oldRc
|
|
}
|
|
// Only return the error from update
|
|
return updateErr
|
|
})
|
|
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
|
// controller contains the applied update.
|
|
return rc, err
|
|
}
|
|
|
|
type updatePodFunc func(controller *corev1.Pod)
|
|
|
|
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
|
|
// 1. Get latest resource
|
|
// 2. applyUpdate
|
|
// 3. Update the resource
|
|
func updatePodWithRetries(podClient corev1client.PodsGetter, namespace string, pod *corev1.Pod, applyUpdate updatePodFunc) (*corev1.Pod, error) {
|
|
// Deep copy the pod in case we failed on Get during retry loop
|
|
oldPod := pod.DeepCopy()
|
|
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
|
|
// Apply the update, then attempt to push it to the apiserver.
|
|
applyUpdate(pod)
|
|
if pod, e = podClient.Pods(namespace).Update(pod); e == nil {
|
|
return
|
|
}
|
|
updateErr := e
|
|
if pod, e = podClient.Pods(namespace).Get(oldPod.Name, metav1.GetOptions{}); e != nil {
|
|
pod = oldPod
|
|
}
|
|
// Only return the error from update
|
|
return updateErr
|
|
})
|
|
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
|
|
// controller contains the applied update.
|
|
return pod, err
|
|
}
|
|
|
|
func FindSourceController(r corev1client.ReplicationControllersGetter, namespace, name string) (*corev1.ReplicationController, error) {
|
|
list, err := r.ReplicationControllers(namespace).List(metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for ix := range list.Items {
|
|
rc := &list.Items[ix]
|
|
if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIDAnnotation], name) {
|
|
return rc, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
|
|
}
|