Merge pull request #228 from gman0/metadata-delete-idempotency

Fixed DeleteVolume/snapshot metadata idempotency
This commit is contained in:
Huamin Chen 2019-02-25 13:12:50 -05:00 committed by GitHub
commit 5c6bf5fa1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 9 deletions

View File

@ -97,8 +97,9 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}, nil }, nil
} }
// DeleteVolume deletes the volume in backend and removes the volume metadata // DeleteVolume deletes the volume in backend
// from store // and removes the volume metadata from store
// nolint: gocyclo
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateDeleteVolumeRequest(); err != nil { if err := cs.validateDeleteVolumeRequest(); err != nil {
klog.Errorf("DeleteVolumeRequest validation failed: %v", err) klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
@ -108,11 +109,15 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
var ( var (
volID = volumeID(req.GetVolumeId()) volID = volumeID(req.GetVolumeId())
secrets = req.GetSecrets() secrets = req.GetSecrets()
err error
) )
ce := &controllerCacheEntry{} ce := &controllerCacheEntry{}
if err = cs.MetadataStore.Get(string(volID), ce); err != nil { if err := cs.MetadataStore.Get(string(volID), ce); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
return &csi.DeleteVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -18,7 +18,6 @@ package rbd
import ( import (
"fmt" "fmt"
"os"
"os/exec" "os/exec"
"syscall" "syscall"
@ -29,7 +28,6 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/pborman/uuid" "github.com/pborman/uuid"
"github.com/pkg/errors"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -252,9 +250,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if os.IsNotExist(errors.Cause(err)) { if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof("metadata for volume %s not found, assuming the volume to be already deleted (%v)", volumeID, err)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
return nil, err return nil, err
} }
@ -471,6 +471,11 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil { if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof("metadata for snapshot %s not found, assuming the snapshot to be already deleted (%v)", snapshotID, err)
return &csi.DeleteSnapshotResponse{}, nil
}
return nil, err return nil, err
} }

View File

@ -23,13 +23,19 @@ import (
) )
const ( const (
//PluginFolder defines location of plugins // PluginFolder defines location of plugins
PluginFolder = "/var/lib/kubelet/plugins" PluginFolder = "/var/lib/kubelet/plugins"
) )
// ForAllFunc stores metadata with identifier // ForAllFunc is a unary predicate for visiting all cache entries
// matching the `pattern' in CachePersister's ForAll function.
type ForAllFunc func(identifier string) error type ForAllFunc func(identifier string) error
// CacheEntryNotFound is an error type for "Not Found" cache errors
type CacheEntryNotFound struct {
error
}
// CachePersister interface implemented for store // CachePersister interface implemented for store
type CachePersister interface { type CachePersister interface {
Create(identifier string, data interface{}) error Create(identifier string, data interface{}) error

View File

@ -159,6 +159,10 @@ func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error { func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier) cm, err := k8scm.getMetadataCM(identifier)
if err != nil { if err != nil {
if apierrs.IsNotFound(err) {
return &CacheEntryNotFound{err}
}
return err return err
} }
err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data) err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data)

View File

@ -130,6 +130,10 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error {
// #nosec // #nosec
fp, err := os.Open(file) fp, err := os.Open(file)
if err != nil { if err != nil {
if os.IsNotExist(errors.Cause(err)) {
return &CacheEntryNotFound{err}
}
return errors.Wrapf(err, "node-cache: open error for %s", file) return errors.Wrapf(err, "node-cache: open error for %s", file)
} }