cephfs/volume: create/delete-volume idempotency checks

This commit is contained in:
gman 2019-02-26 11:06:16 +01:00
parent 5c6bf5fa1a
commit 60588d8968
2 changed files with 90 additions and 94 deletions

View File

@ -17,11 +17,11 @@ limitations under the License.
package cephfs package cephfs
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "os"
"io/ioutil"
"os/exec" "os/exec"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -30,61 +30,41 @@ import (
"github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/ceph-csi/pkg/util"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
type volumeID string type volumeID string
func mustUnlock(m keymutex.KeyMutex, key string) {
if err := m.UnlockKey(key); err != nil {
klog.Fatalf("failed to unlock mutex for %s: %v", key, err)
}
}
func makeVolumeID(volName string) volumeID { func makeVolumeID(volName string) volumeID {
return volumeID("csi-cephfs-" + volName) return volumeID("csi-cephfs-" + volName)
} }
func closePipeOnError(pipe io.Closer, err error) {
if err != nil {
if err = pipe.Close(); err != nil {
klog.Warningf("failed to close pipe: %v", err)
}
}
}
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
cmd := exec.Command(program, args...) // nolint: gosec var (
stripArgs := util.StripSecretInArgs(args) cmd = exec.Command(program, args...) // nolint: gosec
klog.V(4).Infof("cephfs: EXEC %s %s", program, stripArgs) sanitizedArgs = util.StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)
stdoutPipe, err := cmd.StdoutPipe() cmd.Stdout = &stdoutBuf
if err != nil { cmd.Stderr = &stderrBuf
return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err)
klog.V(4).Infof("cephfs: EXEC %s %s", program, sanitizedArgs)
if err := cmd.Run(); err != nil {
return nil, nil, fmt.Errorf("an error occurred while running (%d) %s %v: %v: %s",
cmd.Process.Pid, program, sanitizedArgs, err, stderrBuf.Bytes())
} }
defer closePipeOnError(stdoutPipe, err) return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err)
}
defer closePipeOnError(stderrPipe, err)
if err = cmd.Start(); err != nil {
return nil, nil, fmt.Errorf("failed to run %s %v: %v", program, stripArgs, err)
}
stdout, err = ioutil.ReadAll(stdoutPipe)
if err != nil {
return nil, nil, fmt.Errorf("failed to read from stdout for %s %v: %v", program, stripArgs, err)
}
stderr, err = ioutil.ReadAll(stderrPipe)
if err != nil {
return nil, nil, fmt.Errorf("failed to read from stderr for %s %v: %v", program, stripArgs, err)
}
if waitErr := cmd.Wait(); waitErr != nil {
return nil, nil, fmt.Errorf("an error occurred while running %s %v: %v: %s", program, stripArgs, waitErr, stderr)
}
return
} }
func execCommandErr(program string, args ...string) error { func execCommandErr(program string, args ...string) error {
@ -117,6 +97,11 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil return !notMnt, nil
} }
func pathExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}
// Controller service request validation // Controller service request validation
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {

View File

@ -52,79 +52,66 @@ func setVolumeAttribute(root, attrName, attrValue string) error {
} }
func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error { func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error {
cephRoot := getCephRootPathLocal(volID) if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
if err := createMountPoint(cephRoot); err != nil {
return err return err
} }
defer unmountCephRoot(volID)
// RootPath is not set for a dynamically provisioned volume var (
// Access to cephfs's / is required volRoot = getCephRootVolumePathLocal(volID)
volOptions.RootPath = "/" volRootCreating = volRoot + "-creating"
)
m, err := newMounter(volOptions) if pathExists(volRoot) {
if err != nil { klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID)
return fmt.Errorf("failed to create mounter: %v", err) return nil
} }
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { if err := createMountPoint(volRootCreating); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
defer unmountAndRemove(cephRoot)
volOptions.RootPath = getVolumeRootPathCeph(volID)
localVolRoot := getCephRootVolumePathLocal(volID)
if err := createMountPoint(localVolRoot); err != nil {
return err return err
} }
if bytesQuota > 0 { if bytesQuota > 0 {
if err := setVolumeAttribute(localVolRoot, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil { if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
return err return err
} }
} }
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool", volOptions.Pool); err != nil { if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
} }
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
return err return err
} }
if err := os.Rename(volRootCreating, volRoot); err != nil {
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err)
}
return nil return nil
} }
func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error { func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)
var ( var (
cephRoot = getCephRootPathLocal(volID)
volRoot = getCephRootVolumePathLocal(volID) volRoot = getCephRootVolumePathLocal(volID)
volRootDeleting = volRoot + "-deleting" volRootDeleting = volRoot + "-deleting"
) )
if err := createMountPoint(cephRoot); err != nil { if pathExists(volRoot) {
return err if err := os.Rename(volRoot, volRootDeleting); err != nil {
} return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
}
// Root path is not set for dynamically provisioned volumes } else {
// Access to cephfs's / is required if !pathExists(volRootDeleting) {
volOptions.RootPath = "/" klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
return nil
m, err := newMounter(volOptions) }
if err != nil {
return fmt.Errorf("failed to create mounter: %v", err)
}
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
defer unmountAndRemove(cephRoot)
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
} }
if err := os.RemoveAll(volRootDeleting); err != nil { if err := os.RemoveAll(volRootDeleting); err != nil {
@ -134,13 +121,37 @@ func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions
return nil return nil
} }
func unmountAndRemove(mountPoint string) { func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *credentials) error {
var err error cephRoot := getCephRootPathLocal(volID)
if err = unmountVolume(mountPoint); err != nil {
klog.Errorf("failed to unmount %s with error %s", mountPoint, err) // Root path is not set for dynamically provisioned volumes
// Access to cephfs's / is required
volOptions.RootPath = "/"
if err := createMountPoint(cephRoot); err != nil {
return err
} }
if err = os.Remove(mountPoint); err != nil { m, err := newMounter(volOptions)
klog.Errorf("failed to remove %s with error %s", mountPoint, err) if err != nil {
return fmt.Errorf("failed to create mounter: %v", err)
}
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
return nil
}
func unmountCephRoot(volID volumeID) {
cephRoot := getCephRootPathLocal(volID)
if err := unmountVolume(cephRoot); err != nil {
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
}
if err := os.Remove(cephRoot); err != nil {
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
} }
} }