diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 1ca7cacd1..175a1f729 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -18,12 +18,13 @@ package cephfs import ( "context" + "fmt" "github.com/golang/glog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" "k8s.io/kubernetes/pkg/util/keymutex" ) @@ -35,10 +36,6 @@ type nodeServer struct { var nsMtx = keymutex.NewKeyMutex() func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { - if req.GetVersion() == nil { - return status.Error(codes.InvalidArgument, "Version missing in request") - } - if req.GetVolumeCapability() == nil { return status.Error(codes.InvalidArgument, "Volume capability missing in request") } @@ -51,24 +48,10 @@ func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { return status.Error(codes.InvalidArgument, "Target path missing in request") } - attrs := req.GetVolumeAttributes() - - if _, ok := attrs["path"]; !ok { - return status.Error(codes.InvalidArgument, "Missing path attribute") - } - - if _, ok := attrs["user"]; !ok { - return status.Error(codes.InvalidArgument, "Missing user attribute") - } - return nil } func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { - if req.GetVersion() == nil { - return status.Error(codes.InvalidArgument, "Version missing in request") - } - if req.GetVolumeId() == "" { return status.Error(codes.InvalidArgument, "Volume ID missing in request") } @@ -87,19 +70,46 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // Configuration - volId := req.GetVolumeId() targetPath := req.GetTargetPath() - if err := tryLock(volId, nsMtx, "NodeServer"); err != nil { - return nil, err + volOptions, err := newVolumeOptions(req.GetVolumeAttributes()) + if err != nil { + glog.Errorf("error reading volume options: %v", err) + return nil, status.Error(codes.InvalidArgument, err.Error()) } - defer nsMtx.UnlockKey(volId) - if err := createMountPoint(targetPath); err != nil { + /* + volId := req.GetVolumeId() + if err = tryLock(volId, nsMtx, "NodeServer"); err != nil { + return nil, err + } + defer nsMtx.UnlockKey(volId) + */ + + if err = createMountPoint(targetPath); err != nil { glog.Errorf("failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) } + key, err := getKeyFromCredentials(req.GetNodePublishSecrets()) + if err != nil { + glog.Error(err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + keyring := cephKeyringData{ + User: volOptions.User, + Key: key, + RootPath: volOptions.RootPath, + ReadOnly: req.GetReadonly(), + } + + if err = keyring.writeToFile(); err != nil { + msg := fmt.Sprintf("couldn't write ceph keyring for user %s: %v", volOptions.User, err) + glog.Error(msg) + return nil, status.Error(codes.Internal, msg) + } + // Check if the volume is already mounted isMnt, err := isMountPoint(targetPath) @@ -115,17 +125,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // It's not, exec ceph-fuse now - // TODO honor req.GetReadOnly() - - attrs := req.GetVolumeAttributes() - vol := volume{Root: attrs["path"], User: attrs["user"]} + vol := volume{RootPath: volOptions.RootPath, User: volOptions.User} if err := vol.mount(targetPath); err != nil { - glog.Errorf("mounting volume %s to %s failed: %v", vol.Root, targetPath, err) + glog.Errorf("mounting volume %s to %s failed: %v", vol.RootPath, targetPath, err) return nil, status.Error(codes.Internal, err.Error()) } - glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", vol.Root, targetPath) + glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", vol.RootPath, targetPath) return &csi.NodePublishVolumeResponse{}, nil } @@ -135,12 +142,13 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, err } - volId := req.GetVolumeId() - - if err := tryLock(volId, nsMtx, "NodeServer"); err != nil { - return nil, err - } - defer nsMtx.UnlockKey(volId) + /* + volId := req.GetVolumeId() + if err := tryLock(volId, nsMtx, "NodeServer"); err != nil { + return nil, err + } + defer nsMtx.UnlockKey(volId) + */ if err := unmountVolume(req.GetTargetPath()); err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -148,3 +156,17 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } + +func (ns *nodeServer) NodeStageVolume( + ctx context.Context, + req *csi.NodeStageVolumeRequest) ( + *csi.NodeStageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (ns *nodeServer) NodeUnstageVolume( + ctx context.Context, + req *csi.NodeUnstageVolumeRequest) ( + *csi.NodeUnstageVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 6c53a1a89..1f9a3ab1d 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -17,7 +17,7 @@ limitations under the License. package cephfs import ( - // "fmt" + "fmt" "os/exec" "google.golang.org/grpc/codes" @@ -54,3 +54,11 @@ func tryLock(id string, mtx keymutex.KeyMutex, name string) error { return nil } + +func getKeyFromCredentials(creds map[string]string) (string, error) { + if key, ok := creds["key"]; ok { + return key, nil + } else { + return "", fmt.Errorf("missing key in credentials") + } +}