WIP cephfs CSI plugin

This commit is contained in:
gman
2018-03-05 12:59:47 +01:00
parent c4d775953b
commit 1c1b0eab1e
30 changed files with 1117 additions and 9 deletions

104
pkg/cephfs/cephfs.go Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/golang/glog"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
const (
PluginFolder = "/var/lib/kubelet/plugins/cephfsplugin"
)
type cephfsDriver struct {
driver *csicommon.CSIDriver
ids *identityServer
ns *nodeServer
cs *controllerServer
caps []*csi.VolumeCapability_AccessMode
cscaps []*csi.ControllerServiceCapability
}
var (
provisionRoot = "/cephfs"
driver *cephfsDriver
version = csi.Version{
Minor: 1,
}
)
func GetSupportedVersions() []*csi.Version {
return []*csi.Version{&version}
}
func NewCephFSDriver() *cephfsDriver {
return &cephfsDriver{}
}
func NewIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
func NewControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
}
}
func NewNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
}
}
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint string) {
glog.Infof("Driver: %v version: %v", driverName, GetVersionString(&version))
// Initialize default library driver
fs.driver = csicommon.NewCSIDriver(driverName, &version, GetSupportedVersions(), nodeId)
if fs.driver == nil {
glog.Fatalln("Failed to initialize CSI driver")
}
fs.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
})
fs.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
})
// Create gRPC servers
fs.ids = NewIdentityServer(fs.driver)
fs.ns = NewNodeServer(fs.driver)
fs.cs = NewControllerServer(fs.driver)
server := csicommon.NewNonBlockingGRPCServer()
server.Start(endpoint, fs.ids, fs.cs, fs.ns)
server.Wait()
}

View File

@ -0,0 +1,158 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
"path"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
}
const (
oneGB = 1073741824
)
func GetVersionString(v *csi.Version) string {
return fmt.Sprintf("%d.%d.%d", v.GetMajor(), v.GetMinor(), v.GetPatch())
}
func (cs *controllerServer) validateRequest(v *csi.Version) error {
if v == nil {
return status.Error(codes.InvalidArgument, "Version missing in request")
}
return cs.Driver.ValidateControllerServiceRequest(v, csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME)
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateRequest(req.Version); err != nil {
glog.Warningf("invalid create volume request: %v", req)
return nil, err
}
// Configuration
volOptions, err := newVolumeOptions(req.GetParameters())
if err != nil {
return nil, err
}
volId := newVolumeIdentifier(volOptions, req)
volSz := int64(oneGB)
if req.GetCapacityRange() != nil {
volSz = int64(req.GetCapacityRange().GetRequiredBytes())
}
if err := createMountPoint(provisionRoot); err != nil {
glog.Errorf("failed to create provision root at %s: %v", provisionRoot, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Exec ceph-fuse only if cephfs has not been not mounted yet
isMnt, err := isMountPoint(provisionRoot)
if err != nil {
glog.Errorf("stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
if !isMnt {
if err = mountFuse(provisionRoot); err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
}
// Create a new directory inside the provision root for bind-mounting done by NodePublishVolume
volPath := path.Join(provisionRoot, volId.id)
if err := os.Mkdir(volPath, 0750); err != nil {
glog.Errorf("failed to create volume %s: %v", volPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Set attributes & quotas
if err = setVolAttributes(volPath, volSz); err != nil {
glog.Errorf("failed to set attributes for volume %s: %v", volPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: created volume %s", volPath)
return &csi.CreateVolumeResponse{
VolumeInfo: &csi.VolumeInfo{
Id: volId.id,
CapacityBytes: uint64(volSz),
Attributes: req.GetParameters(),
},
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateRequest(req.Version); err != nil {
glog.Warningf("invalid delete volume request: %v", req)
return nil, err
}
volId := req.GetVolumeId()
volPath := path.Join(provisionRoot, volId)
glog.V(4).Infof("deleting volume %s", volPath)
if err := deleteVolumePath(volPath); err != nil {
glog.Errorf("failed to delete volume %s: %v", volPath, err)
return nil, err
}
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
res := &csi.ValidateVolumeCapabilitiesResponse{}
for _, capability := range req.VolumeCapabilities {
if capability.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
return res, nil
}
}
res.Supported = true
return res, nil
}
func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return &csi.ControllerPublishVolumeResponse{}, nil
}
func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
}

144
pkg/cephfs/nodeserver.go Normal file
View File

@ -0,0 +1,144 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"path"
"github.com/golang/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/mount"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
}
var nsMtx = keymutex.NewKeyMutex()
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
if req.GetVersion() == nil {
return status.Error(codes.InvalidArgument, "Version missing in request")
}
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
if req.GetVersion() == nil {
return status.Error(codes.InvalidArgument, "Version missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if err := validateNodePublishVolumeRequest(req); err != nil {
return nil, err
}
// Configuration
volId := req.GetVolumeId()
targetPath := req.GetTargetPath()
if err := tryLock(volId, nsMtx, "NodeServer"); err != nil {
return nil, err
}
defer nsMtx.UnlockKey(volId)
if err := createMountPoint(targetPath); err != nil {
glog.Errorf("failed to create mount point at %s: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Check if the volume is already mounted
isMnt, err := isMountPoint(targetPath)
if err != nil {
glog.Errorf("stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
if isMnt {
return &csi.NodePublishVolumeResponse{}, nil
}
// It's not, do the bind-mount now
options := []string{"bind"}
if req.GetReadonly() {
options = append(options, "ro")
}
volPath := path.Join(provisionRoot, req.GetVolumeId())
if err := mount.New("").Mount(volPath, targetPath, "", options); err != nil {
glog.Errorf("bind-mounting %s to %s failed: %v", volPath, targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", volPath, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := validateNodeUnpublishVolumeRequest(req); err != nil {
return nil, err
}
volId := req.GetVolumeId()
targetPath := req.GetTargetPath()
if err := tryLock(volId, nsMtx, "NodeServer"); err != nil {
return nil, err
}
defer nsMtx.UnlockKey(volId)
if err := mount.New("").Unmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}

56
pkg/cephfs/util.go Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
// "fmt"
"os/exec"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/mount"
)
func execCommand(command string, args ...string) ([]byte, error) {
cmd := exec.Command(command, args...)
return cmd.CombinedOutput()
}
func isMountPoint(p string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(p)
if err != nil {
return false, status.Error(codes.Internal, err.Error())
}
return !notMnt, nil
}
func tryLock(id string, mtx keymutex.KeyMutex, name string) error {
// TODO uncomment this once TryLockKey gets into Kubernetes
/*
if !mtx.TryLockKey(id) {
msg := fmt.Sprintf("%s has a pending operation on %s", name, req.GetVolumeId())
glog.Infoln(msg)
return status.Error(codes.Aborted, msg)
}
*/
return nil
}

58
pkg/cephfs/volume.go Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
)
func createMountPoint(root string) error {
return os.MkdirAll(root, 0750)
}
func deleteVolumePath(volPath string) error {
return os.RemoveAll(volPath)
}
func mountFuse(root string) error {
out, err := execCommand("ceph-fuse", root)
if err != nil {
return fmt.Errorf("cephfs: ceph-fuse failed with following error: %v\ncephfs: ceph-fuse output: %s", err, out)
}
return nil
}
func unmountFuse(root string) error {
out, err := execCommand("fusermount", "-u", root)
if err != nil {
return fmt.Errorf("cephfs: fusermount failed with following error: %v\ncephfs: fusermount output: %s", err, out)
}
return nil
}
func setVolAttributes(volPath string /*opts *fsVolumeOptions*/, maxBytes int64) error {
out, err := execCommand("setfattr", "-n", "ceph.quota.max_bytes",
"-v", fmt.Sprintf("%d", maxBytes), volPath)
if err != nil {
return fmt.Errorf("cephfs: setfattr failed with following error: %v\ncephfs: setfattr output: %s", err, out)
}
return nil
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
)
type volumeIdentifier struct {
name, uuid, id string
}
func newVolumeIdentifier(volOptions *volumeOptions, req *csi.CreateVolumeRequest) *volumeIdentifier {
volId := volumeIdentifier{
name: req.GetName(),
uuid: uuid.NewUUID().String(),
}
volId.id = "csi-rbd-" + volId.uuid
if volId.name == "" {
volId.name = volOptions.Pool + "-dynamic-pvc-" + volId.uuid
}
return &volId
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import "errors"
type volumeOptions struct {
VolName string `json:"volName"`
Monitor string `json:"monitor"`
Pool string `json:"pool"`
AdminId string `json:"adminID"`
AdminSecret string `json:"adminSecret"`
}
func extractOption(dest *string, optionLabel string, options map[string]string) error {
if opt, ok := options[optionLabel]; !ok {
return errors.New("Missing required parameter " + optionLabel)
} else {
*dest = opt
return nil
}
}
func newVolumeOptions(volOptions map[string]string) (*volumeOptions, error) {
var opts volumeOptions
// XXX early return - we're not reading credentials from volOptions for now...
// i'll finish this once ceph-fuse accepts passing credentials through cmd args
return &opts, nil
/*
if err := extractOption(&opts.AdminId, "adminID", volOptions); err != nil {
return nil, err
}
if err := extractOption(&opts.AdminSecret, "adminSecret", volOptions); err != nil {
return nil, err
}
if err := extractOption(&opts.Monitors, "monitors", volOptions); err != nil {
return nil, err
}
if err := extractOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err
}
return &opts, nil
*/
}