nfs: make DeleteVolume (more) idempotent

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2022-07-19 17:09:37 +02:00 committed by mergify[bot]
parent a6cd56ae7e
commit 0a173a8a9e
3 changed files with 68 additions and 12 deletions

View File

@ -18,7 +18,7 @@ package controller
import (
"context"
"strings"
"errors"
"github.com/ceph/ceph-csi/internal/cephfs"
"github.com/ceph/ceph-csi/internal/cephfs/store"
@ -147,11 +147,11 @@ func (cs *Server) DeleteVolume(
err = nfsVolume.DeleteExport()
// if the export does not exist, continue with deleting the backend volume
if err != nil && !strings.Contains(err.Error(), "Export does not exist") {
if err != nil && !errors.Is(err, ErrNotFound) {
return nil, status.Errorf(codes.InvalidArgument, "failed to delete export: %v", err)
}
log.DebugLog(ctx, "deleted NFS-export: %s", nfsVolume)
log.DebugLog(ctx, "NFS-export %q has been deleted", nfsVolume)
return cs.backendServer.DeleteVolume(ctx, req)
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"errors"
"fmt"
)
var (
// ErrNotConnected is returned when components from the NFS
// ControllerServer can not connect to the Ceph cluster or NFS-Ganesha
// service.
ErrNotConnected = errors.New("not connected")
// ErrNotFound is a generic error that is the parent of other "not
// found" failures. Callers can check if something was "not found",
// even if the actual error is more specific.
ErrNotFound = errors.New("not found")
// ErrExportNotFound is returned by components that communicate with the
// NFS-Ganesha service, and have identified that the NFS-Export does
// not exist (anymore). This error is also a ErrNotFound.
ErrExportNotFound = fmt.Errorf("NFS-export %w", ErrNotFound)
// ErrFilesystemNotFound is returned in case the filesystem
// does not exist.
ErrFilesystemNotFound = fmt.Errorf("filesystem %w", ErrNotFound)
)

View File

@ -18,6 +18,7 @@ package controller
import (
"context"
"errors"
"fmt"
"strings"
@ -124,7 +125,7 @@ func (nv *NFSVolume) GetExportPath() string {
// a new NFS-export for the volume on the Ceph managed NFS-server.
func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
if !nv.connected {
return fmt.Errorf("can not created export for %q: not connected", nv)
return fmt.Errorf("can not created export for %q: %w", nv, ErrNotConnected)
}
fs := backend.VolumeContext["fsName"]
@ -213,6 +214,8 @@ func (nv *NFSVolume) DeleteExport() error {
return nil
case strings.Contains(err.Error(), "API call not implemented"): // try with the old command
break
case strings.Contains(err.Error(), "Export does not exist"):
return ErrExportNotFound
default: // any other error
return fmt.Errorf("failed to remove %q from NFS-cluster %q: "+
"%w", nv, nfsCluster, err)
@ -251,17 +254,21 @@ func (nv *NFSVolume) deleteExportCommand(cmd, nfsCluster string) []string {
// getNFSCluster fetches the NFS-cluster name from the CephFS journal.
func (nv *NFSVolume) getNFSCluster() (string, error) {
if !nv.connected {
return "", fmt.Errorf("can not get NFS-cluster for %q: not connected", nv)
return "", fmt.Errorf("can not get NFS-cluster for %q: %w", nv, ErrNotConnected)
}
fs := fscore.NewFileSystem(nv.conn)
fsName, err := fs.GetFsName(nv.ctx, nv.fscID)
if err != nil {
if err != nil && errors.Is(err, util.ErrPoolNotFound) {
return "", fmt.Errorf("%w for ID %x: %v", ErrFilesystemNotFound, nv.fscID, err)
} else if err != nil {
return "", fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err)
}
mdPool, err := fs.GetMetadataPool(nv.ctx, fsName)
if err != nil {
if err != nil && errors.Is(err, util.ErrPoolNotFound) {
return "", fmt.Errorf("metadata pool for %q %w: %v", fsName, ErrNotFound, err)
} else if err != nil {
return "", fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err)
}
@ -273,8 +280,10 @@ func (nv *NFSVolume) getNFSCluster() (string, error) {
defer j.Destroy()
clusterName, err := j.FetchAttribute(nv.ctx, mdPool, nv.objectUUID, clusterNameKey)
if err != nil {
return "", fmt.Errorf("failed to get cluster name: %w", err)
if err != nil && errors.Is(err, util.ErrPoolNotFound) || errors.Is(err, util.ErrKeyNotFound) {
return "", fmt.Errorf("cluster name for %q %w: %v", nv.objectUUID, ErrNotFound, err)
} else if err != nil {
return "", fmt.Errorf("failed to get cluster name for %q: %w", nv.objectUUID, err)
}
return clusterName, nil
@ -283,17 +292,21 @@ func (nv *NFSVolume) getNFSCluster() (string, error) {
// setNFSCluster stores the NFS-cluster name in the CephFS journal.
func (nv *NFSVolume) setNFSCluster(clusterName string) error {
if !nv.connected {
return fmt.Errorf("can not set NFS-cluster for %q: not connected", nv)
return fmt.Errorf("can not set NFS-cluster for %q: %w", nv, ErrNotConnected)
}
fs := fscore.NewFileSystem(nv.conn)
fsName, err := fs.GetFsName(nv.ctx, nv.fscID)
if err != nil {
if err != nil && errors.Is(err, util.ErrPoolNotFound) {
return fmt.Errorf("%w for ID %x: %v", ErrFilesystemNotFound, nv.fscID, err)
} else if err != nil {
return fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err)
}
mdPool, err := fs.GetMetadataPool(nv.ctx, fsName)
if err != nil {
if err != nil && errors.Is(err, util.ErrPoolNotFound) {
return fmt.Errorf("metadata pool for %q %w: %v", fsName, ErrNotFound, err)
} else if err != nil {
return fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err)
}