mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-03-21 06:39:28 +00:00
Bumps the github-dependencies group with 8 updates in the / directory: | Package | From | To | | --- | --- | --- | | [github.com/IBM/keyprotect-go-client](https://github.com/IBM/keyprotect-go-client) | `0.12.2` | `0.14.1` | | [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) | `1.53.14` | `1.54.6` | | [github.com/aws/aws-sdk-go-v2/service/sts](https://github.com/aws/aws-sdk-go-v2) | `1.28.1` | `1.29.1` | | [github.com/hashicorp/vault/api](https://github.com/hashicorp/vault) | `1.12.0` | `1.14.0` | | [github.com/kubernetes-csi/csi-lib-utils](https://github.com/kubernetes-csi/csi-lib-utils) | `0.17.0` | `0.18.1` | | [github.com/onsi/ginkgo/v2](https://github.com/onsi/ginkgo) | `2.17.1` | `2.19.0` | | [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) | `1.18.0` | `1.19.1` | | [github.com/Azure/azure-sdk-for-go/sdk/azidentity](https://github.com/Azure/azure-sdk-for-go) | `1.6.0` | `1.7.0` | Updates `github.com/IBM/keyprotect-go-client` from 0.12.2 to 0.14.1 - [Release notes](https://github.com/IBM/keyprotect-go-client/releases) - [Changelog](https://github.com/IBM/keyprotect-go-client/blob/master/CHANGELOG.md) - [Commits](https://github.com/IBM/keyprotect-go-client/compare/v0.12.2...v0.14.1) Updates `github.com/aws/aws-sdk-go` from 1.53.14 to 1.54.6 - [Release notes](https://github.com/aws/aws-sdk-go/releases) - [Commits](https://github.com/aws/aws-sdk-go/compare/v1.53.14...v1.54.6) Updates `github.com/aws/aws-sdk-go-v2/service/sts` from 1.28.1 to 1.29.1 - [Release notes](https://github.com/aws/aws-sdk-go-v2/releases) - [Commits](https://github.com/aws/aws-sdk-go-v2/compare/service/ecr/v1.28.1...service/s3/v1.29.1) Updates `github.com/hashicorp/vault/api` from 1.12.0 to 1.14.0 - [Release notes](https://github.com/hashicorp/vault/releases) - [Changelog](https://github.com/hashicorp/vault/blob/main/CHANGELOG.md) - [Commits](https://github.com/hashicorp/vault/compare/v1.12.0...v1.14.0) Updates `github.com/kubernetes-csi/csi-lib-utils` from 0.17.0 to 0.18.1 - [Release notes](https://github.com/kubernetes-csi/csi-lib-utils/releases) - [Commits](https://github.com/kubernetes-csi/csi-lib-utils/compare/v0.17.0...v0.18.1) Updates `github.com/onsi/ginkgo/v2` from 2.17.1 to 2.19.0 - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v2.17.1...v2.19.0) Updates `github.com/onsi/gomega` from 1.32.0 to 1.33.1 - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.32.0...v1.33.1) Updates `github.com/prometheus/client_golang` from 1.18.0 to 1.19.1 - [Release notes](https://github.com/prometheus/client_golang/releases) - [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md) - [Commits](https://github.com/prometheus/client_golang/compare/v1.18.0...v1.19.1) Updates `github.com/Azure/azure-sdk-for-go/sdk/azidentity` from 1.6.0 to 1.7.0 - [Release notes](https://github.com/Azure/azure-sdk-for-go/releases) - [Changelog](https://github.com/Azure/azure-sdk-for-go/blob/main/documentation/release.md) - [Commits](https://github.com/Azure/azure-sdk-for-go/compare/sdk/azcore/v1.6.0...sdk/azcore/v1.7.0) --- updated-dependencies: - dependency-name: github.com/IBM/keyprotect-go-client dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/aws/aws-sdk-go dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/aws/aws-sdk-go-v2/service/sts dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/hashicorp/vault/api dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/kubernetes-csi/csi-lib-utils dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/onsi/ginkgo/v2 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/prometheus/client_golang dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies - dependency-name: github.com/Azure/azure-sdk-for-go/sdk/azidentity dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-dependencies ... Signed-off-by: dependabot[bot] <support@github.com>
384 lines
12 KiB
Go
384 lines
12 KiB
Go
package retry
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws/middleware/private/metrics"
|
|
internalcontext "github.com/aws/aws-sdk-go-v2/internal/context"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware"
|
|
"github.com/aws/aws-sdk-go-v2/internal/sdk"
|
|
"github.com/aws/smithy-go/logging"
|
|
smithymiddle "github.com/aws/smithy-go/middleware"
|
|
"github.com/aws/smithy-go/transport/http"
|
|
)
|
|
|
|
// RequestCloner is a function that can take an input request type and clone
|
|
// the request for use in a subsequent retry attempt.
|
|
type RequestCloner func(interface{}) interface{}
|
|
|
|
type retryMetadata struct {
|
|
AttemptNum int
|
|
AttemptTime time.Time
|
|
MaxAttempts int
|
|
AttemptClockSkew time.Duration
|
|
}
|
|
|
|
// Attempt is a Smithy Finalize middleware that handles retry attempts using
|
|
// the provided Retryer implementation.
|
|
type Attempt struct {
|
|
// Enable the logging of retry attempts performed by the SDK. This will
|
|
// include logging retry attempts, unretryable errors, and when max
|
|
// attempts are reached.
|
|
LogAttempts bool
|
|
|
|
retryer aws.RetryerV2
|
|
requestCloner RequestCloner
|
|
}
|
|
|
|
// define the threshold at which we will consider certain kind of errors to be probably
|
|
// caused by clock skew
|
|
const skewThreshold = 4 * time.Minute
|
|
|
|
// NewAttemptMiddleware returns a new Attempt retry middleware.
|
|
func NewAttemptMiddleware(retryer aws.Retryer, requestCloner RequestCloner, optFns ...func(*Attempt)) *Attempt {
|
|
m := &Attempt{
|
|
retryer: wrapAsRetryerV2(retryer),
|
|
requestCloner: requestCloner,
|
|
}
|
|
for _, fn := range optFns {
|
|
fn(m)
|
|
}
|
|
return m
|
|
}
|
|
|
|
// ID returns the middleware identifier
|
|
func (r *Attempt) ID() string { return "Retry" }
|
|
|
|
func (r Attempt) logf(logger logging.Logger, classification logging.Classification, format string, v ...interface{}) {
|
|
if !r.LogAttempts {
|
|
return
|
|
}
|
|
logger.Logf(classification, format, v...)
|
|
}
|
|
|
|
// HandleFinalize utilizes the provider Retryer implementation to attempt
|
|
// retries over the next handler
|
|
func (r *Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
|
|
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
|
|
) {
|
|
var attemptNum int
|
|
var attemptClockSkew time.Duration
|
|
var attemptResults AttemptResults
|
|
|
|
maxAttempts := r.retryer.MaxAttempts()
|
|
releaseRetryToken := nopRelease
|
|
|
|
for {
|
|
attemptNum++
|
|
attemptInput := in
|
|
attemptInput.Request = r.requestCloner(attemptInput.Request)
|
|
|
|
// Record the metadata for the for attempt being started.
|
|
attemptCtx := setRetryMetadata(ctx, retryMetadata{
|
|
AttemptNum: attemptNum,
|
|
AttemptTime: sdk.NowTime().UTC(),
|
|
MaxAttempts: maxAttempts,
|
|
AttemptClockSkew: attemptClockSkew,
|
|
})
|
|
|
|
// Setting clock skew to be used on other context (like signing)
|
|
ctx = internalcontext.SetAttemptSkewContext(ctx, attemptClockSkew)
|
|
|
|
var attemptResult AttemptResult
|
|
out, attemptResult, releaseRetryToken, err = r.handleAttempt(attemptCtx, attemptInput, releaseRetryToken, next)
|
|
attemptClockSkew, _ = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)
|
|
|
|
// AttemptResult Retried states that the attempt was not successful, and
|
|
// should be retried.
|
|
shouldRetry := attemptResult.Retried
|
|
|
|
// Add attempt metadata to list of all attempt metadata
|
|
attemptResults.Results = append(attemptResults.Results, attemptResult)
|
|
|
|
if !shouldRetry {
|
|
// Ensure the last response's metadata is used as the bases for result
|
|
// metadata returned by the stack. The Slice of attempt results
|
|
// will be added to this cloned metadata.
|
|
metadata = attemptResult.ResponseMetadata.Clone()
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
addAttemptResults(&metadata, attemptResults)
|
|
return out, metadata, err
|
|
}
|
|
|
|
// handleAttempt handles an individual request attempt.
|
|
func (r *Attempt) handleAttempt(
|
|
ctx context.Context, in smithymiddle.FinalizeInput, releaseRetryToken func(error) error, next smithymiddle.FinalizeHandler,
|
|
) (
|
|
out smithymiddle.FinalizeOutput, attemptResult AttemptResult, _ func(error) error, err error,
|
|
) {
|
|
defer func() {
|
|
attemptResult.Err = err
|
|
}()
|
|
|
|
// Short circuit if this attempt never can succeed because the context is
|
|
// canceled. This reduces the chance of token pools being modified for
|
|
// attempts that will not be made
|
|
select {
|
|
case <-ctx.Done():
|
|
return out, attemptResult, nopRelease, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
//------------------------------
|
|
// Get Attempt Token
|
|
//------------------------------
|
|
releaseAttemptToken, err := r.retryer.GetAttemptToken(ctx)
|
|
if err != nil {
|
|
return out, attemptResult, nopRelease, fmt.Errorf(
|
|
"failed to get retry Send token, %w", err)
|
|
}
|
|
|
|
//------------------------------
|
|
// Send Attempt
|
|
//------------------------------
|
|
logger := smithymiddle.GetLogger(ctx)
|
|
service, operation := awsmiddle.GetServiceID(ctx), awsmiddle.GetOperationName(ctx)
|
|
retryMetadata, _ := getRetryMetadata(ctx)
|
|
attemptNum := retryMetadata.AttemptNum
|
|
maxAttempts := retryMetadata.MaxAttempts
|
|
|
|
// Following attempts must ensure the request payload stream starts in a
|
|
// rewound state.
|
|
if attemptNum > 1 {
|
|
if rewindable, ok := in.Request.(interface{ RewindStream() error }); ok {
|
|
if rewindErr := rewindable.RewindStream(); rewindErr != nil {
|
|
return out, attemptResult, nopRelease, fmt.Errorf(
|
|
"failed to rewind transport stream for retry, %w", rewindErr)
|
|
}
|
|
}
|
|
|
|
r.logf(logger, logging.Debug, "retrying request %s/%s, attempt %d",
|
|
service, operation, attemptNum)
|
|
}
|
|
|
|
var metadata smithymiddle.Metadata
|
|
out, metadata, err = next.HandleFinalize(ctx, in)
|
|
attemptResult.ResponseMetadata = metadata
|
|
|
|
//------------------------------
|
|
// Bookkeeping
|
|
//------------------------------
|
|
// Release the retry token based on the state of the attempt's error (if any).
|
|
if releaseError := releaseRetryToken(err); releaseError != nil && err != nil {
|
|
return out, attemptResult, nopRelease, fmt.Errorf(
|
|
"failed to release retry token after request error, %w", err)
|
|
}
|
|
// Release the attempt token based on the state of the attempt's error (if any).
|
|
if releaseError := releaseAttemptToken(err); releaseError != nil && err != nil {
|
|
return out, attemptResult, nopRelease, fmt.Errorf(
|
|
"failed to release initial token after request error, %w", err)
|
|
}
|
|
// If there was no error making the attempt, nothing further to do. There
|
|
// will be nothing to retry.
|
|
if err == nil {
|
|
return out, attemptResult, nopRelease, err
|
|
}
|
|
|
|
err = wrapAsClockSkew(ctx, err)
|
|
|
|
//------------------------------
|
|
// Is Retryable and Should Retry
|
|
//------------------------------
|
|
// If the attempt failed with an unretryable error, nothing further to do
|
|
// but return, and inform the caller about the terminal failure.
|
|
retryable := r.retryer.IsErrorRetryable(err)
|
|
if !retryable {
|
|
r.logf(logger, logging.Debug, "request failed with unretryable error %v", err)
|
|
return out, attemptResult, nopRelease, err
|
|
}
|
|
|
|
// set retryable to true
|
|
attemptResult.Retryable = true
|
|
|
|
// Once the maximum number of attempts have been exhausted there is nothing
|
|
// further to do other than inform the caller about the terminal failure.
|
|
if maxAttempts > 0 && attemptNum >= maxAttempts {
|
|
r.logf(logger, logging.Debug, "max retry attempts exhausted, max %d", maxAttempts)
|
|
err = &MaxAttemptsError{
|
|
Attempt: attemptNum,
|
|
Err: err,
|
|
}
|
|
return out, attemptResult, nopRelease, err
|
|
}
|
|
|
|
//------------------------------
|
|
// Get Retry (aka Retry Quota) Token
|
|
//------------------------------
|
|
// Get a retry token that will be released after the
|
|
releaseRetryToken, retryTokenErr := r.retryer.GetRetryToken(ctx, err)
|
|
if retryTokenErr != nil {
|
|
return out, attemptResult, nopRelease, retryTokenErr
|
|
}
|
|
|
|
//------------------------------
|
|
// Retry Delay and Sleep
|
|
//------------------------------
|
|
// Get the retry delay before another attempt can be made, and sleep for
|
|
// that time. Potentially early exist if the sleep is canceled via the
|
|
// context.
|
|
retryDelay, reqErr := r.retryer.RetryDelay(attemptNum, err)
|
|
mctx := metrics.Context(ctx)
|
|
if mctx != nil {
|
|
attempt, err := mctx.Data().LatestAttempt()
|
|
if err != nil {
|
|
attempt.RetryDelay = retryDelay
|
|
}
|
|
}
|
|
if reqErr != nil {
|
|
return out, attemptResult, releaseRetryToken, reqErr
|
|
}
|
|
if reqErr = sdk.SleepWithContext(ctx, retryDelay); reqErr != nil {
|
|
err = &aws.RequestCanceledError{Err: reqErr}
|
|
return out, attemptResult, releaseRetryToken, err
|
|
}
|
|
|
|
// The request should be re-attempted.
|
|
attemptResult.Retried = true
|
|
|
|
return out, attemptResult, releaseRetryToken, err
|
|
}
|
|
|
|
// errors that, if detected when we know there's a clock skew,
|
|
// can be retried and have a high chance of success
|
|
var possibleSkewCodes = map[string]struct{}{
|
|
"InvalidSignatureException": {},
|
|
"SignatureDoesNotMatch": {},
|
|
"AuthFailure": {},
|
|
}
|
|
|
|
var definiteSkewCodes = map[string]struct{}{
|
|
"RequestExpired": {},
|
|
"RequestInTheFuture": {},
|
|
"RequestTimeTooSkewed": {},
|
|
}
|
|
|
|
// wrapAsClockSkew checks if this error could be related to a clock skew
|
|
// error and if so, wrap the error.
|
|
func wrapAsClockSkew(ctx context.Context, err error) error {
|
|
var v interface{ ErrorCode() string }
|
|
if !errors.As(err, &v) {
|
|
return err
|
|
}
|
|
if _, ok := definiteSkewCodes[v.ErrorCode()]; ok {
|
|
return &retryableClockSkewError{Err: err}
|
|
}
|
|
_, isPossibleSkewCode := possibleSkewCodes[v.ErrorCode()]
|
|
if skew := internalcontext.GetAttemptSkewContext(ctx); skew > skewThreshold && isPossibleSkewCode {
|
|
return &retryableClockSkewError{Err: err}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// MetricsHeader attaches SDK request metric header for retries to the transport
|
|
type MetricsHeader struct{}
|
|
|
|
// ID returns the middleware identifier
|
|
func (r *MetricsHeader) ID() string {
|
|
return "RetryMetricsHeader"
|
|
}
|
|
|
|
// HandleFinalize attaches the SDK request metric header to the transport layer
|
|
func (r MetricsHeader) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
|
|
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
|
|
) {
|
|
retryMetadata, _ := getRetryMetadata(ctx)
|
|
|
|
const retryMetricHeader = "Amz-Sdk-Request"
|
|
var parts []string
|
|
|
|
parts = append(parts, "attempt="+strconv.Itoa(retryMetadata.AttemptNum))
|
|
if retryMetadata.MaxAttempts != 0 {
|
|
parts = append(parts, "max="+strconv.Itoa(retryMetadata.MaxAttempts))
|
|
}
|
|
|
|
var ttl time.Time
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
ttl = deadline
|
|
}
|
|
|
|
// Only append the TTL if it can be determined.
|
|
if !ttl.IsZero() && retryMetadata.AttemptClockSkew > 0 {
|
|
const unixTimeFormat = "20060102T150405Z"
|
|
ttl = ttl.Add(retryMetadata.AttemptClockSkew)
|
|
parts = append(parts, "ttl="+ttl.Format(unixTimeFormat))
|
|
}
|
|
|
|
switch req := in.Request.(type) {
|
|
case *http.Request:
|
|
req.Header[retryMetricHeader] = append(req.Header[retryMetricHeader][:0], strings.Join(parts, "; "))
|
|
default:
|
|
return out, metadata, fmt.Errorf("unknown transport type %T", req)
|
|
}
|
|
|
|
return next.HandleFinalize(ctx, in)
|
|
}
|
|
|
|
type retryMetadataKey struct{}
|
|
|
|
// getRetryMetadata retrieves retryMetadata from the context and a bool
|
|
// indicating if it was set.
|
|
//
|
|
// Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues
|
|
// to clear all stack values.
|
|
func getRetryMetadata(ctx context.Context) (metadata retryMetadata, ok bool) {
|
|
metadata, ok = smithymiddle.GetStackValue(ctx, retryMetadataKey{}).(retryMetadata)
|
|
return metadata, ok
|
|
}
|
|
|
|
// setRetryMetadata sets the retryMetadata on the context.
|
|
//
|
|
// Scoped to stack values. Use github.com/aws/smithy-go/middleware#ClearStackValues
|
|
// to clear all stack values.
|
|
func setRetryMetadata(ctx context.Context, metadata retryMetadata) context.Context {
|
|
return smithymiddle.WithStackValue(ctx, retryMetadataKey{}, metadata)
|
|
}
|
|
|
|
// AddRetryMiddlewaresOptions is the set of options that can be passed to
|
|
// AddRetryMiddlewares for configuring retry associated middleware.
|
|
type AddRetryMiddlewaresOptions struct {
|
|
Retryer aws.Retryer
|
|
|
|
// Enable the logging of retry attempts performed by the SDK. This will
|
|
// include logging retry attempts, unretryable errors, and when max
|
|
// attempts are reached.
|
|
LogRetryAttempts bool
|
|
}
|
|
|
|
// AddRetryMiddlewares adds retry middleware to operation middleware stack
|
|
func AddRetryMiddlewares(stack *smithymiddle.Stack, options AddRetryMiddlewaresOptions) error {
|
|
attempt := NewAttemptMiddleware(options.Retryer, http.RequestCloner, func(middleware *Attempt) {
|
|
middleware.LogAttempts = options.LogRetryAttempts
|
|
})
|
|
|
|
// index retry to before signing, if signing exists
|
|
if err := stack.Finalize.Insert(attempt, "Signing", smithymiddle.Before); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := stack.Finalize.Insert(&MetricsHeader{}, attempt.ID(), smithymiddle.After); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|