mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-18 04:10:22 +00:00
Merge pull request #232 from gman0/cephfs-concurrency-fix
cephfs concurrency fix
This commit is contained in:
commit
dfcd1c33c3
@ -24,6 +24,7 @@ import (
|
|||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
|
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||||
|
|
||||||
"github.com/ceph/ceph-csi/pkg/util"
|
"github.com/ceph/ceph-csi/pkg/util"
|
||||||
)
|
)
|
||||||
@ -40,6 +41,10 @@ type controllerCacheEntry struct {
|
|||||||
VolumeID volumeID
|
VolumeID volumeID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
mtxControllerVolumeID = keymutex.NewHashed(0)
|
||||||
|
)
|
||||||
|
|
||||||
// CreateVolume creates the volume in backend and store the volume metadata
|
// CreateVolume creates the volume in backend and store the volume metadata
|
||||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
||||||
@ -58,6 +63,9 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
|
|
||||||
volID := makeVolumeID(req.GetName())
|
volID := makeVolumeID(req.GetName())
|
||||||
|
|
||||||
|
mtxControllerVolumeID.LockKey(string(volID))
|
||||||
|
defer mustUnlock(mtxControllerVolumeID, string(volID))
|
||||||
|
|
||||||
// Create a volume in case the user didn't provide one
|
// Create a volume in case the user didn't provide one
|
||||||
|
|
||||||
if volOptions.ProvisionVolume {
|
if volOptions.ProvisionVolume {
|
||||||
@ -143,6 +151,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mtxControllerVolumeID.LockKey(string(volID))
|
||||||
|
defer mustUnlock(mtxControllerVolumeID, string(volID))
|
||||||
|
|
||||||
if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
|
if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
|
||||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
@ -35,6 +36,10 @@ type NodeServer struct {
|
|||||||
*csicommon.DefaultNodeServer
|
*csicommon.DefaultNodeServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
mtxNodeVolumeID = keymutex.NewHashed(0)
|
||||||
|
)
|
||||||
|
|
||||||
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
|
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
|
||||||
var (
|
var (
|
||||||
cr *credentials
|
cr *credentials
|
||||||
@ -44,7 +49,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
|
|||||||
if volOptions.ProvisionVolume {
|
if volOptions.ProvisionVolume {
|
||||||
// The volume is provisioned dynamically, get the credentials directly from Ceph
|
// The volume is provisioned dynamically, get the credentials directly from Ceph
|
||||||
|
|
||||||
// First, store admin credentials - those are needed for retrieving the user credentials
|
// First, get admin credentials - those are needed for retrieving the user credentials
|
||||||
|
|
||||||
adminCr, err := getAdminCredentials(secrets)
|
adminCr, err := getAdminCredentials(secrets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -100,6 +105,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mtxNodeVolumeID.LockKey(string(volID))
|
||||||
|
defer mustUnlock(mtxNodeVolumeID, string(volID))
|
||||||
|
|
||||||
// Check if the volume is already mounted
|
// Check if the volume is already mounted
|
||||||
|
|
||||||
isMnt, err := isMountPoint(stagingTargetPath)
|
isMnt, err := isMountPoint(stagingTargetPath)
|
||||||
|
@ -17,11 +17,11 @@ limitations under the License.
|
|||||||
package cephfs
|
package cephfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"os"
|
||||||
"io/ioutil"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@ -30,61 +30,41 @@ import (
|
|||||||
|
|
||||||
"github.com/ceph/ceph-csi/pkg/util"
|
"github.com/ceph/ceph-csi/pkg/util"
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
)
|
)
|
||||||
|
|
||||||
type volumeID string
|
type volumeID string
|
||||||
|
|
||||||
|
func mustUnlock(m keymutex.KeyMutex, key string) {
|
||||||
|
if err := m.UnlockKey(key); err != nil {
|
||||||
|
klog.Fatalf("failed to unlock mutex for %s: %v", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func makeVolumeID(volName string) volumeID {
|
func makeVolumeID(volName string) volumeID {
|
||||||
return volumeID("csi-cephfs-" + volName)
|
return volumeID("csi-cephfs-" + volName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func closePipeOnError(pipe io.Closer, err error) {
|
|
||||||
if err != nil {
|
|
||||||
if err = pipe.Close(); err != nil {
|
|
||||||
klog.Warningf("failed to close pipe: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
||||||
cmd := exec.Command(program, args...) // nolint: gosec
|
var (
|
||||||
stripArgs := util.StripSecretInArgs(args)
|
cmd = exec.Command(program, args...) // nolint: gosec
|
||||||
klog.V(4).Infof("cephfs: EXEC %s %s", program, stripArgs)
|
sanitizedArgs = util.StripSecretInArgs(args)
|
||||||
|
stdoutBuf bytes.Buffer
|
||||||
|
stderrBuf bytes.Buffer
|
||||||
|
)
|
||||||
|
|
||||||
stdoutPipe, err := cmd.StdoutPipe()
|
cmd.Stdout = &stdoutBuf
|
||||||
if err != nil {
|
cmd.Stderr = &stderrBuf
|
||||||
return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err)
|
|
||||||
|
klog.V(4).Infof("cephfs: EXEC %s %s", program, sanitizedArgs)
|
||||||
|
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("an error occurred while running (%d) %s %v: %v: %s",
|
||||||
|
cmd.Process.Pid, program, sanitizedArgs, err, stderrBuf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
defer closePipeOnError(stdoutPipe, err)
|
return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil
|
||||||
|
|
||||||
stderrPipe, err := cmd.StderrPipe()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("cannot open stdout pipe for %s %v: %v", program, stripArgs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer closePipeOnError(stderrPipe, err)
|
|
||||||
|
|
||||||
if err = cmd.Start(); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to run %s %v: %v", program, stripArgs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stdout, err = ioutil.ReadAll(stdoutPipe)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to read from stdout for %s %v: %v", program, stripArgs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stderr, err = ioutil.ReadAll(stderrPipe)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to read from stderr for %s %v: %v", program, stripArgs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if waitErr := cmd.Wait(); waitErr != nil {
|
|
||||||
return nil, nil, fmt.Errorf("an error occurred while running %s %v: %v: %s", program, stripArgs, waitErr, stderr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func execCommandErr(program string, args ...string) error {
|
func execCommandErr(program string, args ...string) error {
|
||||||
@ -117,6 +97,11 @@ func isMountPoint(p string) (bool, error) {
|
|||||||
return !notMnt, nil
|
return !notMnt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pathExists(p string) bool {
|
||||||
|
_, err := os.Stat(p)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
// Controller service request validation
|
// Controller service request validation
|
||||||
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
|
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||||
|
@ -52,80 +52,67 @@ func setVolumeAttribute(root, attrName, attrValue string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error {
|
func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error {
|
||||||
cephRoot := getCephRootPathLocal(volID)
|
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
|
||||||
|
|
||||||
if err := createMountPoint(cephRoot); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer unmountCephRoot(volID)
|
||||||
|
|
||||||
// RootPath is not set for a dynamically provisioned volume
|
var (
|
||||||
// Access to cephfs's / is required
|
volRoot = getCephRootVolumePathLocal(volID)
|
||||||
volOptions.RootPath = "/"
|
volRootCreating = volRoot + "-creating"
|
||||||
|
)
|
||||||
|
|
||||||
m, err := newMounter(volOptions)
|
if pathExists(volRoot) {
|
||||||
if err != nil {
|
klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID)
|
||||||
return fmt.Errorf("failed to create mounter: %v", err)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil {
|
if err := createMountPoint(volRootCreating); err != nil {
|
||||||
return fmt.Errorf("error mounting ceph root: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer unmountAndRemove(cephRoot)
|
|
||||||
|
|
||||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
|
||||||
localVolRoot := getCephRootVolumePathLocal(volID)
|
|
||||||
|
|
||||||
if err := createMountPoint(localVolRoot); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytesQuota > 0 {
|
if bytesQuota > 0 {
|
||||||
if err := setVolumeAttribute(localVolRoot, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
|
if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
|
if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
|
||||||
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
|
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
|
if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(volRootCreating, volRoot); err != nil {
|
||||||
|
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error {
|
func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error {
|
||||||
|
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer unmountCephRoot(volID)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
cephRoot = getCephRootPathLocal(volID)
|
|
||||||
volRoot = getCephRootVolumePathLocal(volID)
|
volRoot = getCephRootVolumePathLocal(volID)
|
||||||
volRootDeleting = volRoot + "-deleting"
|
volRootDeleting = volRoot + "-deleting"
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := createMountPoint(cephRoot); err != nil {
|
if pathExists(volRoot) {
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Root path is not set for dynamically provisioned volumes
|
|
||||||
// Access to cephfs's / is required
|
|
||||||
volOptions.RootPath = "/"
|
|
||||||
|
|
||||||
m, err := newMounter(volOptions)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create mounter: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil {
|
|
||||||
return fmt.Errorf("error mounting ceph root: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer unmountAndRemove(cephRoot)
|
|
||||||
|
|
||||||
if err := os.Rename(volRoot, volRootDeleting); err != nil {
|
if err := os.Rename(volRoot, volRootDeleting); err != nil {
|
||||||
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
|
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if !pathExists(volRootDeleting) {
|
||||||
|
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := os.RemoveAll(volRootDeleting); err != nil {
|
if err := os.RemoveAll(volRootDeleting); err != nil {
|
||||||
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
|
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
|
||||||
@ -134,13 +121,37 @@ func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func unmountAndRemove(mountPoint string) {
|
func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *credentials) error {
|
||||||
var err error
|
cephRoot := getCephRootPathLocal(volID)
|
||||||
if err = unmountVolume(mountPoint); err != nil {
|
|
||||||
klog.Errorf("failed to unmount %s with error %s", mountPoint, err)
|
// Root path is not set for dynamically provisioned volumes
|
||||||
|
// Access to cephfs's / is required
|
||||||
|
volOptions.RootPath = "/"
|
||||||
|
|
||||||
|
if err := createMountPoint(cephRoot); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = os.Remove(mountPoint); err != nil {
|
m, err := newMounter(volOptions)
|
||||||
klog.Errorf("failed to remove %s with error %s", mountPoint, err)
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create mounter: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil {
|
||||||
|
return fmt.Errorf("error mounting ceph root: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmountCephRoot(volID volumeID) {
|
||||||
|
cephRoot := getCephRootPathLocal(volID)
|
||||||
|
|
||||||
|
if err := unmountVolume(cephRoot); err != nil {
|
||||||
|
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
|
||||||
|
} else {
|
||||||
|
if err := os.Remove(cephRoot); err != nil {
|
||||||
|
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user