2019-05-31 09:45:11 +00:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package utils
import (
2020-04-14 07:04:33 +00:00
"context"
2019-05-31 09:45:11 +00:00
"fmt"
"time"
"github.com/davecgh/go-spew/spew"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
type LogfFn func ( format string , args ... interface { } )
func LogReplicaSetsOfDeployment ( deployment * apps . Deployment , allOldRSs [ ] * apps . ReplicaSet , newRS * apps . ReplicaSet , logf LogfFn ) {
if newRS != nil {
logf ( spew . Sprintf ( "New ReplicaSet %q of Deployment %q:\n%+v" , newRS . Name , deployment . Name , * newRS ) )
} else {
logf ( "New ReplicaSet of Deployment %q is nil." , deployment . Name )
}
if len ( allOldRSs ) > 0 {
logf ( "All old ReplicaSets of Deployment %q:" , deployment . Name )
}
for i := range allOldRSs {
logf ( spew . Sprintf ( "%+v" , * allOldRSs [ i ] ) )
}
}
func LogPodsOfDeployment ( c clientset . Interface , deployment * apps . Deployment , rsList [ ] * apps . ReplicaSet , logf LogfFn ) {
minReadySeconds := deployment . Spec . MinReadySeconds
podListFunc := func ( namespace string , options metav1 . ListOptions ) ( * v1 . PodList , error ) {
2020-04-14 07:04:33 +00:00
return c . CoreV1 ( ) . Pods ( namespace ) . List ( context . TODO ( ) , options )
2019-05-31 09:45:11 +00:00
}
podList , err := deploymentutil . ListPods ( deployment , rsList , podListFunc )
if err != nil {
logf ( "Failed to list Pods of Deployment %q: %v" , deployment . Name , err )
return
}
for _ , pod := range podList . Items {
availability := "not available"
if podutil . IsPodAvailable ( & pod , minReadySeconds , metav1 . Now ( ) ) {
availability = "available"
}
logf ( spew . Sprintf ( "Pod %q is %s:\n%+v" , pod . Name , availability , pod ) )
}
}
// Waits for the deployment to complete.
// If during a rolling update (rolling == true), returns an error if the deployment's
// rolling update strategy (max unavailable or max surge) is broken at any times.
// It's not seen as a rolling update if shortly after a scaling event or the deployment is just created.
func waitForDeploymentCompleteMaybeCheckRolling ( c clientset . Interface , d * apps . Deployment , rolling bool , logf LogfFn , pollInterval , pollTimeout time . Duration ) error {
var (
deployment * apps . Deployment
reason string
)
err := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
var err error
2020-04-14 07:04:33 +00:00
deployment , err = c . AppsV1 ( ) . Deployments ( d . Namespace ) . Get ( context . TODO ( ) , d . Name , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return false , err
}
// If during a rolling update, make sure rolling update strategy isn't broken at any times.
if rolling {
reason , err = checkRollingUpdateStatus ( c , deployment , logf )
if err != nil {
return false , err
}
logf ( reason )
}
// When the deployment status and its underlying resources reach the desired state, we're done
if deploymentutil . DeploymentComplete ( d , & deployment . Status ) {
return true , nil
}
reason = fmt . Sprintf ( "deployment status: %#v" , deployment . Status )
logf ( reason )
return false , nil
} )
if err == wait . ErrWaitTimeout {
err = fmt . Errorf ( "%s" , reason )
}
if err != nil {
return fmt . Errorf ( "error waiting for deployment %q status to match expectation: %v" , d . Name , err )
}
return nil
}
func checkRollingUpdateStatus ( c clientset . Interface , deployment * apps . Deployment , logf LogfFn ) ( string , error ) {
var reason string
oldRSs , allOldRSs , newRS , err := deploymentutil . GetAllReplicaSets ( deployment , c . AppsV1 ( ) )
if err != nil {
return "" , err
}
if newRS == nil {
// New RC hasn't been created yet.
reason = "new replica set hasn't been created yet"
return reason , nil
}
allRSs := append ( oldRSs , newRS )
// The old/new ReplicaSets need to contain the pod-template-hash label
for i := range allRSs {
if ! labelsutil . SelectorHasLabel ( allRSs [ i ] . Spec . Selector , apps . DefaultDeploymentUniqueLabelKey ) {
reason = "all replica sets need to contain the pod-template-hash label"
return reason , nil
}
}
// Check max surge and min available
totalCreated := deploymentutil . GetReplicaCountForReplicaSets ( allRSs )
maxCreated := * ( deployment . Spec . Replicas ) + deploymentutil . MaxSurge ( * deployment )
if totalCreated > maxCreated {
LogReplicaSetsOfDeployment ( deployment , allOldRSs , newRS , logf )
LogPodsOfDeployment ( c , deployment , allRSs , logf )
return "" , fmt . Errorf ( "total pods created: %d, more than the max allowed: %d" , totalCreated , maxCreated )
}
minAvailable := deploymentutil . MinAvailable ( deployment )
if deployment . Status . AvailableReplicas < minAvailable {
LogReplicaSetsOfDeployment ( deployment , allOldRSs , newRS , logf )
LogPodsOfDeployment ( c , deployment , allRSs , logf )
return "" , fmt . Errorf ( "total pods available: %d, less than the min required: %d" , deployment . Status . AvailableReplicas , minAvailable )
}
return "" , nil
}
// Waits for the deployment to complete, and check rolling update strategy isn't broken at any times.
// Rolling update strategy should not be broken during a rolling update.
func WaitForDeploymentCompleteAndCheckRolling ( c clientset . Interface , d * apps . Deployment , logf LogfFn , pollInterval , pollTimeout time . Duration ) error {
rolling := true
return waitForDeploymentCompleteMaybeCheckRolling ( c , d , rolling , logf , pollInterval , pollTimeout )
}
// Waits for the deployment to complete, and don't check if rolling update strategy is broken.
// Rolling update strategy is used only during a rolling update, and can be violated in other situations,
// such as shortly after a scaling event or the deployment is just created.
func WaitForDeploymentComplete ( c clientset . Interface , d * apps . Deployment , logf LogfFn , pollInterval , pollTimeout time . Duration ) error {
rolling := false
return waitForDeploymentCompleteMaybeCheckRolling ( c , d , rolling , logf , pollInterval , pollTimeout )
}
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
func WaitForDeploymentRevisionAndImage ( c clientset . Interface , ns , deploymentName string , revision , image string , logf LogfFn , pollInterval , pollTimeout time . Duration ) error {
var deployment * apps . Deployment
var newRS * apps . ReplicaSet
var reason string
err := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
var err error
2020-04-14 07:04:33 +00:00
deployment , err = c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return false , err
}
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
newRS , err = deploymentutil . GetNewReplicaSet ( deployment , c . AppsV1 ( ) )
if err != nil {
return false , err
}
if err := checkRevisionAndImage ( deployment , newRS , revision , image ) ; err != nil {
reason = err . Error ( )
logf ( reason )
return false , nil
}
return true , nil
} )
if err == wait . ErrWaitTimeout {
LogReplicaSetsOfDeployment ( deployment , nil , newRS , logf )
err = fmt . Errorf ( reason )
}
if newRS == nil {
return fmt . Errorf ( "deployment %q failed to create new replica set" , deploymentName )
}
if err != nil {
return fmt . Errorf ( "error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v" , deploymentName , deployment . Annotations [ deploymentutil . RevisionAnnotation ] , deployment . Spec . Template . Spec . Containers [ 0 ] . Image , newRS . Name , newRS . Annotations [ deploymentutil . RevisionAnnotation ] , newRS . Spec . Template . Spec . Containers [ 0 ] . Image , revision , image , err )
}
return nil
}
// CheckDeploymentRevisionAndImage checks if the input deployment's and its new replica set's revision and image are as expected.
func CheckDeploymentRevisionAndImage ( c clientset . Interface , ns , deploymentName , revision , image string ) error {
2020-04-14 07:04:33 +00:00
deployment , err := c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return fmt . Errorf ( "unable to get deployment %s during revision check: %v" , deploymentName , err )
}
// Check revision of the new replica set of this deployment
newRS , err := deploymentutil . GetNewReplicaSet ( deployment , c . AppsV1 ( ) )
if err != nil {
return fmt . Errorf ( "unable to get new replicaset of deployment %s during revision check: %v" , deploymentName , err )
}
return checkRevisionAndImage ( deployment , newRS , revision , image )
}
func checkRevisionAndImage ( deployment * apps . Deployment , newRS * apps . ReplicaSet , revision , image string ) error {
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
if newRS == nil {
return fmt . Errorf ( "new replicaset for deployment %q is yet to be created" , deployment . Name )
}
if ! labelsutil . SelectorHasLabel ( newRS . Spec . Selector , apps . DefaultDeploymentUniqueLabelKey ) {
return fmt . Errorf ( "new replica set %q doesn't have %q label selector" , newRS . Name , apps . DefaultDeploymentUniqueLabelKey )
}
// Check revision of this deployment, and of the new replica set of this deployment
if deployment . Annotations == nil || deployment . Annotations [ deploymentutil . RevisionAnnotation ] != revision {
return fmt . Errorf ( "deployment %q doesn't have the required revision set" , deployment . Name )
}
if newRS . Annotations == nil || newRS . Annotations [ deploymentutil . RevisionAnnotation ] != revision {
return fmt . Errorf ( "new replicaset %q doesn't have the required revision set" , newRS . Name )
}
// Check the image of this deployment, and of the new replica set of this deployment
if ! containsImage ( deployment . Spec . Template . Spec . Containers , image ) {
return fmt . Errorf ( "deployment %q doesn't have the required image %s set" , deployment . Name , image )
}
if ! containsImage ( newRS . Spec . Template . Spec . Containers , image ) {
return fmt . Errorf ( "new replica set %q doesn't have the required image %s." , newRS . Name , image )
}
return nil
}
func containsImage ( containers [ ] v1 . Container , imageName string ) bool {
for _ , container := range containers {
if container . Image == imageName {
return true
}
}
return false
}
type UpdateDeploymentFunc func ( d * apps . Deployment )
func UpdateDeploymentWithRetries ( c clientset . Interface , namespace , name string , applyUpdate UpdateDeploymentFunc , logf LogfFn , pollInterval , pollTimeout time . Duration ) ( * apps . Deployment , error ) {
var deployment * apps . Deployment
var updateErr error
pollErr := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
var err error
2020-04-14 07:04:33 +00:00
if deployment , err = c . AppsV1 ( ) . Deployments ( namespace ) . Get ( context . TODO ( ) , name , metav1 . GetOptions { } ) ; err != nil {
2019-05-31 09:45:11 +00:00
return false , err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate ( deployment )
2020-04-14 07:04:33 +00:00
if deployment , err = c . AppsV1 ( ) . Deployments ( namespace ) . Update ( context . TODO ( ) , deployment , metav1 . UpdateOptions { } ) ; err == nil {
2019-05-31 09:45:11 +00:00
logf ( "Updating deployment %s" , name )
return true , nil
}
updateErr = err
return false , nil
} )
if pollErr == wait . ErrWaitTimeout {
pollErr = fmt . Errorf ( "couldn't apply the provided updated to deployment %q: %v" , name , updateErr )
}
return deployment , pollErr
}
func WaitForObservedDeployment ( c clientset . Interface , ns , deploymentName string , desiredGeneration int64 ) error {
return deploymentutil . WaitForObservedDeployment ( func ( ) ( * apps . Deployment , error ) {
2020-04-14 07:04:33 +00:00
return c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
} , desiredGeneration , 2 * time . Second , 1 * time . Minute )
}
// WaitForDeploymentRollbackCleared waits for given deployment either started rolling back or doesn't need to rollback.
func WaitForDeploymentRollbackCleared ( c clientset . Interface , ns , deploymentName string , pollInterval , pollTimeout time . Duration ) error {
err := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
2020-04-14 07:04:33 +00:00
deployment , err := c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return false , err
}
// Rollback not set or is kicked off
if deployment . Annotations [ apps . DeprecatedRollbackTo ] == "" {
return true , nil
}
return false , nil
} )
if err != nil {
return fmt . Errorf ( "error waiting for deployment %s rollbackTo to be cleared: %v" , deploymentName , err )
}
return nil
}
// WaitForDeploymentUpdatedReplicasGTE waits for given deployment to be observed by the controller and has at least a number of updatedReplicas
func WaitForDeploymentUpdatedReplicasGTE ( c clientset . Interface , ns , deploymentName string , minUpdatedReplicas int32 , desiredGeneration int64 , pollInterval , pollTimeout time . Duration ) error {
var deployment * apps . Deployment
err := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
2020-04-14 07:04:33 +00:00
d , err := c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return false , err
}
deployment = d
return deployment . Status . ObservedGeneration >= desiredGeneration && deployment . Status . UpdatedReplicas >= minUpdatedReplicas , nil
} )
if err != nil {
return fmt . Errorf ( "error waiting for deployment %q to have at least %d updatedReplicas: %v; latest .status.updatedReplicas: %d" , deploymentName , minUpdatedReplicas , err , deployment . Status . UpdatedReplicas )
}
return nil
}
func WaitForDeploymentWithCondition ( c clientset . Interface , ns , deploymentName , reason string , condType apps . DeploymentConditionType , logf LogfFn , pollInterval , pollTimeout time . Duration ) error {
var deployment * apps . Deployment
pollErr := wait . PollImmediate ( pollInterval , pollTimeout , func ( ) ( bool , error ) {
2020-04-14 07:04:33 +00:00
d , err := c . AppsV1 ( ) . Deployments ( ns ) . Get ( context . TODO ( ) , deploymentName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return false , err
}
deployment = d
cond := deploymentutil . GetDeploymentCondition ( deployment . Status , condType )
return cond != nil && cond . Reason == reason , nil
} )
if pollErr == wait . ErrWaitTimeout {
pollErr = fmt . Errorf ( "deployment %q never updated with the desired condition and reason, latest deployment conditions: %+v" , deployment . Name , deployment . Status . Conditions )
_ , allOldRSs , newRS , err := deploymentutil . GetAllReplicaSets ( deployment , c . AppsV1 ( ) )
if err == nil {
LogReplicaSetsOfDeployment ( deployment , allOldRSs , newRS , logf )
LogPodsOfDeployment ( c , deployment , append ( allOldRSs , newRS ) , logf )
}
}
return pollErr
}