cephfs: code cleaning

This commit is contained in:
gman 2018-03-26 15:00:28 +02:00
parent a585f083ab
commit cb36c9e4c8
3 changed files with 49 additions and 75 deletions

View File

@ -18,7 +18,6 @@ package cephfs
import ( import (
"context" "context"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -26,15 +25,12 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/kubernetes-csi/drivers/pkg/csi-common"
"k8s.io/kubernetes/pkg/util/keymutex"
) )
type nodeServer struct { type nodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
} }
var nsMtx = keymutex.NewKeyMutex()
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
if req.GetVolumeCapability() == nil { if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "Volume capability missing in request") return status.Error(codes.InvalidArgument, "Volume capability missing in request")
@ -63,54 +59,11 @@ func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) err
return nil return nil
} }
func newMounter(volOptions *volumeOptions, key string, readOnly bool) (volumeMounter, error) {
var m volumeMounter
if volOptions.Mounter == volumeMounter_fuse {
keyring := cephKeyringData{
User: volOptions.User,
Key: key,
RootPath: volOptions.RootPath,
ReadOnly: readOnly,
}
if err := keyring.writeToFile(); err != nil {
msg := fmt.Sprintf("couldn't write ceph keyring for user %s: %v", volOptions.User, err)
glog.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
m = &fuseMounter{}
} else if volOptions.Mounter == volumeMounter_kernel {
secret := cephSecretData{
User: volOptions.User,
Key: key,
}
if err := secret.writeToFile(); err != nil {
msg := fmt.Sprintf("couldn't write ceph secret for user %s: %v", volOptions.User, err)
glog.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
m = &kernelMounter{}
}
return m, nil
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if err := validateNodePublishVolumeRequest(req); err != nil { if err := validateNodePublishVolumeRequest(req); err != nil {
return nil, err return nil, err
} }
/*
if err = tryLock(volId, nsMtx, "NodeServer"); err != nil {
return nil, err
}
defer nsMtx.UnlockKey(volId)
*/
// Configuration // Configuration
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
@ -178,13 +131,6 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
volId := req.GetVolumeId() volId := req.GetVolumeId()
/*
if err := tryLock(volId, nsMtx, "NodeServer"); err != nil {
return nil, err
}
defer nsMtx.UnlockKey(volId)
*/
if err := unmountVolume(req.GetTargetPath()); err != nil { if err := unmountVolume(req.GetTargetPath()); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"os/exec" "os/exec"
"github.com/golang/glog"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -62,3 +63,39 @@ func getKeyFromCredentials(creds map[string]string) (string, error) {
return "", fmt.Errorf("missing key in credentials") return "", fmt.Errorf("missing key in credentials")
} }
} }
func newMounter(volOptions *volumeOptions, key string, readOnly bool) (volumeMounter, error) {
var m volumeMounter
if volOptions.Mounter == volumeMounter_fuse {
keyring := cephKeyringData{
User: volOptions.User,
Key: key,
RootPath: volOptions.RootPath,
ReadOnly: readOnly,
}
if err := keyring.writeToFile(); err != nil {
msg := fmt.Sprintf("couldn't write ceph keyring for user %s: %v", volOptions.User, err)
glog.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
m = &fuseMounter{}
} else if volOptions.Mounter == volumeMounter_kernel {
secret := cephSecretData{
User: volOptions.User,
Key: key,
}
if err := secret.writeToFile(); err != nil {
msg := fmt.Sprintf("couldn't write ceph secret for user %s: %v", volOptions.User, err)
glog.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
m = &kernelMounter{}
}
return m, nil
}

View File

@ -32,46 +32,37 @@ type volumeMounter interface {
type fuseMounter struct{} type fuseMounter struct{}
func (m *fuseMounter) mount(mountPoint string, volOptions *volumeOptions) error { func execCommandAndValidate(program string, args ...string) error {
out, err := execCommand("ceph-fuse", mountPoint, "-n", "client."+volOptions.User, "-r", volOptions.RootPath) out, err := execCommand(program, args...)
if err != nil { if err != nil {
return fmt.Errorf("cephfs: ceph-fuse failed with following error: %s\ncephfs: cephf-fuse output: %s", err, out) return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
} }
return nil return nil
} }
func (m *fuseMounter) mount(mountPoint string, volOptions *volumeOptions) error {
return execCommandAndValidate("ceph-fuse", mountPoint, "-n", "client."+volOptions.User, "-r", volOptions.RootPath)
}
type kernelMounter struct{} type kernelMounter struct{}
func (m *kernelMounter) mount(mountPoint string, volOptions *volumeOptions) error { func (m *kernelMounter) mount(mountPoint string, volOptions *volumeOptions) error {
out, err := execCommand("modprobe", "ceph") if err := execCommandAndValidate("modprobe", "ceph"); err != nil {
if err != nil { return err
return fmt.Errorf("cephfs: modprobe failed with following error, %s\ncephfs: modprobe output: %s", err, out)
} }
args := [...]string{ return execCommandAndValidate("mount",
"-t", "ceph", "-t", "ceph",
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint, mountPoint,
"-o", "-o",
fmt.Sprintf("name=%s,secretfile=%s", volOptions.User, getCephSecretPath(volOptions.User)), fmt.Sprintf("name=%s,secretfile=%s", volOptions.User, getCephSecretPath(volOptions.User)),
} )
out, err = execCommand("mount", args[:]...)
if err != nil {
return fmt.Errorf("cephfs: mount.ceph failed with following error: %s\ncephfs: mount.ceph output: %s", err, out)
}
return nil
} }
func unmountVolume(mountPoint string) error { func unmountVolume(mountPoint string) error {
out, err := execCommand("umount", mountPoint) return execCommandAndValidate("umount", mountPoint)
if err != nil {
return fmt.Errorf("cephfs: umount failed with following error: %v\ncephfs: umount output: %s", err, out)
}
return nil
} }
func createMountPoint(root string) error { func createMountPoint(root string) error {