From 62d65ad0cbd6ed5ee50711eaec91c8fc1803f788 Mon Sep 17 00:00:00 2001 From: mickymiek Date: Wed, 19 Dec 2018 15:26:16 +0100 Subject: [PATCH] cm metadata persist for rbd and cephfs --- Gopkg.toml | 4 + cephfs/main.go | 18 ++- deploy/rbd/kubernetes/csi-rbdplugin.yaml | 7 +- docs/deploy-cephfs.md | 3 + docs/deploy-rbd.md | 4 +- pkg/cephfs/controllercache.go | 128 ----------------- pkg/cephfs/controllerserver.go | 37 +++-- pkg/cephfs/driver.go | 22 ++- pkg/rbd/controllerserver.go | 61 +++++--- pkg/rbd/rbd.go | 105 ++------------ pkg/rbd/rbd_util.go | 92 ------------ pkg/util/cachepersister.go | 52 +++++++ pkg/util/k8scmcache.go | 172 +++++++++++++++++++++++ pkg/util/nodecache.go | 124 ++++++++++++++++ rbd/main.go | 23 +-- 15 files changed, 467 insertions(+), 385 deletions(-) delete mode 100644 pkg/cephfs/controllercache.go create mode 100644 pkg/util/cachepersister.go create mode 100644 pkg/util/k8scmcache.go create mode 100644 pkg/util/nodecache.go diff --git a/Gopkg.toml b/Gopkg.toml index db2b74538..e8eddf380 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -33,3 +33,7 @@ [[override]] name = "github.com/golang/protobuf" version = "1.1.0" + +[[constraint]] + name = "k8s.io/client-go" + version = "kubernetes-1.10.0-beta.1" diff --git a/cephfs/main.go b/cephfs/main.go index fcebaa73f..7d53f5515 100644 --- a/cephfs/main.go +++ b/cephfs/main.go @@ -22,6 +22,7 @@ import ( "path" "github.com/ceph/ceph-csi/pkg/cephfs" + "github.com/ceph/ceph-csi/pkg/util" "github.com/golang/glog" ) @@ -30,10 +31,11 @@ func init() { } var ( - endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") - driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver") - nodeId = flag.String("nodeid", "", "node id") - volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") + endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") + driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver") + nodeId = flag.String("nodeid", "", "node id") + volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") + metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]") ) func main() { @@ -49,8 +51,14 @@ func main() { os.Exit(1) } + cp, err := util.NewCachePersister(*metadataStorage, *driverName) + if err != nil { + glog.Errorf("failed to define cache persistence method: %v", err) + os.Exit(1) + } + driver := cephfs.NewCephFSDriver() - driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter) + driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter, cp) os.Exit(0) } diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index d641a7832..4f99fd4ef 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -50,7 +50,8 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=csi-rbdplugin" - - "--containerized=true" + - "--containerized=true" + - "--metadatastorage=node" env: - name: HOST_ROOTFS value: "/rootfs" @@ -58,6 +59,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix://var/lib/kubelet/plugins/csi-rbdplugin/csi.sock imagePullPolicy: "IfNotPresent" diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 4e5b68c77..952e9f2c1 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -27,6 +27,9 @@ Option | Default value | Description `--nodeid` | _empty_ | This node's ID `--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed. +**Available environmental variables:** +`KUBERNETES_CONFIG_PATH`: if you use `k8s_configmap` as metadata store, specify the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself). + **Available volume parameters:** Parameter | Required | Description diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index 4f876655f..c71dad51f 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -26,11 +26,13 @@ Option | Default value | Description `--drivername` | `csi-cephfsplugin` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) `--nodeid` | _empty_ | This node's ID `--containerized` | true | Whether running in containerized mode - +`--metadatastorage` | _empty_ | Whether should metadata be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`) **Available environmental variables:** `HOST_ROOTFS`: rbdplugin searches `/proc` directory under the directory set by `HOST_ROOTFS`. +`KUBERNETES_CONFIG_PATH`: if you use `k8s_configmap` as metadata store, specify the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself). + **Available volume parameters:** Parameter | Required | Description diff --git a/pkg/cephfs/controllercache.go b/pkg/cephfs/controllercache.go deleted file mode 100644 index c757028dc..000000000 --- a/pkg/cephfs/controllercache.go +++ /dev/null @@ -1,128 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "strings" - "sync" - - "github.com/golang/glog" -) - -const ( - controllerCacheRoot = PluginFolder + "/controller/plugin-cache" -) - -type controllerCacheEntry struct { - VolOptions volumeOptions - VolumeID volumeID -} - -type controllerCacheMap map[volumeID]*controllerCacheEntry - -var ( - ctrCache = make(controllerCacheMap) - ctrCacheMtx sync.Mutex -) - -// Load all .json files from controllerCacheRoot into ctrCache -// Called from driver.go's Run() -func loadControllerCache() error { - cacheDir, err := ioutil.ReadDir(controllerCacheRoot) - if err != nil { - return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err) - } - - ctrCacheMtx.Lock() - defer ctrCacheMtx.Unlock() - - for _, fi := range cacheDir { - if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() { - continue - } - - f, err := os.Open(path.Join(controllerCacheRoot, fi.Name())) - if err != nil { - glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err) - continue - } - - d := json.NewDecoder(f) - ent := &controllerCacheEntry{} - - if err = d.Decode(ent); err != nil { - glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err) - } else { - ctrCache[ent.VolumeID] = ent - } - - f.Close() - } - - return nil -} - -func getControllerCacheEntryPath(volId volumeID) string { - return path.Join(controllerCacheRoot, string(volId)+".json") -} - -func (m controllerCacheMap) insert(ent *controllerCacheEntry) error { - filePath := getControllerCacheEntryPath(ent.VolumeID) - - ctrCacheMtx.Lock() - defer ctrCacheMtx.Unlock() - - f, err := os.Create(filePath) - if err != nil { - return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err) - } - defer f.Close() - - enc := json.NewEncoder(f) - if err = enc.Encode(ent); err != nil { - return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err) - } - - m[ent.VolumeID] = ent - - return nil -} - -func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) { - ctrCacheMtx.Lock() - defer ctrCacheMtx.Unlock() - - ent, ok := m[volId] - if !ok { - return nil, fmt.Errorf("cache entry for volume %s does not exist", volId) - } - - filePath := getControllerCacheEntryPath(volId) - - if err := os.Remove(filePath); err != nil { - return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err) - } - - delete(m, volId) - - return ent, nil -} diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 10ee1084a..7def64eb7 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -24,10 +24,18 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" + + "github.com/ceph/ceph-csi/pkg/util" ) type controllerServer struct { *csicommon.DefaultControllerServer + MetadataStore util.CachePersister +} + +type controllerCacheEntry struct { + VolOptions volumeOptions + VolumeID volumeID } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -35,7 +43,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.Errorf("CreateVolumeRequest validation failed: %v", err) return nil, err } - // Configuration volOptions, err := newVolumeOptions(req.GetParameters()) @@ -56,7 +63,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol if volOptions.ProvisionVolume { // Admin credentials are required - cr, err := getAdminCredentials(req.GetControllerCreateSecrets()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) @@ -82,7 +88,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.Infof("cephfs: volume %s is provisioned statically", volId) } - if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil { + ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId} + if err := cs.MetadataStore.Create(string(volId), ce); err != nil { glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -107,30 +114,18 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol err error ) - // Load volume info from cache - - ent, err := ctrCache.pop(volId) - if err != nil { - glog.Error(err) + ce := &controllerCacheEntry{} + if err := cs.MetadataStore.Get(string(volId), ce); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - if !ent.VolOptions.ProvisionVolume { + if !ce.VolOptions.ProvisionVolume { // DeleteVolume() is forbidden for statically provisioned volumes! glog.Warningf("volume %s is provisioned statically, aborting delete", volId) return &csi.DeleteVolumeResponse{}, nil } - defer func() { - if err != nil { - // Reinsert cache entry for retry - if insErr := ctrCache.insert(ent); insErr != nil { - glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err) - } - } - }() - // Deleting a volume requires admin credentials cr, err := getAdminCredentials(req.GetControllerDeleteSecrets()) @@ -139,7 +134,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil { + if err = purgeVolume(volId, cr, &ce.VolOptions); err != nil { glog.Errorf("failed to delete volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -149,6 +144,10 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.Internal, err.Error()) } + if err := cs.MetadataStore.Delete(string(volId)); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + glog.Infof("cephfs: successfully deleted volume %s", volId) return &csi.DeleteVolumeResponse{}, nil diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index b73f602b0..57bc9cd1e 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -17,12 +17,12 @@ limitations under the License. package cephfs import ( - "os" - "github.com/golang/glog" "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" + + "github.com/ceph/ceph-csi/pkg/util" ) const ( @@ -56,9 +56,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer { } } -func NewControllerServer(d *csicommon.CSIDriver) *controllerServer { +func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer { return &controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), + MetadataStore: cachePersister, } } @@ -68,20 +69,11 @@ func NewNodeServer(d *csicommon.CSIDriver) *nodeServer { } } -func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) { +func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string, cachePersister util.CachePersister) { glog.Infof("Driver: %v version: %v", driverName, Version) // Configuration - if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil { - glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err) - return - } - - if err := loadControllerCache(); err != nil { - glog.Errorf("cephfs: failed to read volume cache: %v", err) - } - if err := loadAvailableMounters(); err != nil { glog.Fatalf("cephfs: failed to load ceph mounters: %v", err) } @@ -120,7 +112,9 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) fs.is = NewIdentityServer(fs.driver) fs.ns = NewNodeServer(fs.driver) - fs.cs = NewControllerServer(fs.driver) + + fs.cs = NewControllerServer(fs.driver, cachePersister) + //fs.cs.LoadExDataFromMetadataStore() server := csicommon.NewNonBlockingGRPCServer() server.Start(endpoint, fs.is, fs.cs, fs.ns) diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 0fded6853..efbd7586d 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -20,10 +20,10 @@ import ( "fmt" "os" "os/exec" - "path" "syscall" "time" + "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/golang/glog" "github.com/kubernetes-csi/drivers/pkg/csi-common" @@ -40,6 +40,28 @@ const ( type controllerServer struct { *csicommon.DefaultControllerServer + MetadataStore util.CachePersister +} + +var ( + rbdVolumes = map[string]*rbdVolume{} + rbdSnapshots = map[string]*rbdSnapshot{} +) + +func (cs *controllerServer) LoadExDataFromMetadataStore() error { + vol := &rbdVolume{} + cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error { + rbdVolumes[identifier] = vol + return nil + }) + + snap := &rbdSnapshot{} + cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error { + rbdSnapshots[identifier] = snap + return nil + }) + glog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots)) + return nil } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -97,7 +119,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol volName = rbdVol.Pool + "-dynamic-pvc-" + uniqueID } rbdVol.VolName = volName - volumeID := "csi-rbd-" + uniqueID + volumeID := "csi-rbd-vol-" + uniqueID rbdVol.VolID = volumeID // Volume Size - Default is 1 GiB volSizeBytes := int64(oneGB) @@ -123,7 +145,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } rbdSnap := &rbdSnapshot{} - if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { + if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil { return nil, err } @@ -142,11 +164,15 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.V(4).Infof("create volume %s", volName) } } - - // Storing volInfo into a persistent file. - if err := persistVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil { - glog.Warningf("rbd: failed to store volInfo with error: %v", err) + if err := cs.MetadataStore.Create(volumeID, rbdVol); err != nil { + glog.Warningf("failed to store volume metadata with error: %v", err) + if err := deleteRBDImage(rbdVol, rbdVol.AdminId, req.GetControllerCreateSecrets()); err != nil { + glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err) + return nil, err + } + return nil, err } + rbdVolumes[volumeID] = rbdVol return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -166,14 +192,15 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol volumeID := req.GetVolumeId() volumeIDMutex.LockKey(volumeID) defer volumeIDMutex.UnlockKey(volumeID) + rbdVol := &rbdVolume{} - if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil { + if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if os.IsNotExist(errors.Cause(err)) { - // Must have been deleted already. This is not an error (idempotency!). return &csi.DeleteVolumeResponse{}, nil } return nil, err } + volName := rbdVol.VolName // Deleting rbd image glog.V(4).Infof("deleting volume %s", volName) @@ -182,8 +209,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, volName, err) return nil, err } - // Removing persistent storage file for the unmapped volume - if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil { + + if err := cs.MetadataStore.Delete(volumeID); err != nil { return nil, err } @@ -301,24 +328,21 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.CreatedAt = time.Now().UnixNano() - // Storing snapInfo into a persistent file. - if err := persistSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { + if err := cs.MetadataStore.Create(snapshotID, rbdSnap); err != nil { glog.Warningf("rbd: failed to store snapInfo with error: %v", err) - // Unprotect snapshot err := unprotectSnapshot(rbdSnap, rbdSnap.AdminId, req.GetCreateSnapshotSecrets()) if err != nil { return nil, status.Error(codes.Unknown, fmt.Sprintf("This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)) } - // Deleting snapshot glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName) if err := deleteSnapshot(rbdSnap, rbdSnap.AdminId, req.GetCreateSnapshotSecrets()); err != nil { return nil, status.Error(codes.Unknown, fmt.Sprintf("This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)) } - return nil, err } + rbdSnapshots[snapshotID] = rbdSnap return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ @@ -347,7 +371,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS defer snapshotIDMutex.UnlockKey(snapshotID) rbdSnap := &rbdSnapshot{} - if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { + if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil { return nil, err } @@ -363,8 +387,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)) } - // Removing persistent storage file for the unmapped snapshot - if err := deleteSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap")); err != nil { + if err := cs.MetadataStore.Delete(snapshotID); err != nil { return nil, err } diff --git a/pkg/rbd/rbd.go b/pkg/rbd/rbd.go index 7ef552338..f83e1ccc5 100644 --- a/pkg/rbd/rbd.go +++ b/pkg/rbd/rbd.go @@ -17,14 +17,9 @@ limitations under the License. package rbd import ( - "encoding/json" - "io/ioutil" - "os" - "path" - "strings" - "github.com/golang/glog" + "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" @@ -56,94 +51,6 @@ var ( version = "0.3.0" ) -var rbdVolumes map[string]*rbdVolume -var rbdSnapshots map[string]*rbdSnapshot - -// Init checks for the persistent volume file and loads all found volumes -// into a memory structure -func init() { - rbdVolumes = map[string]*rbdVolume{} - rbdSnapshots = map[string]*rbdSnapshot{} - if _, err := os.Stat(path.Join(PluginFolder, "controller")); os.IsNotExist(err) { - glog.Infof("rbd: folder %s not found. Creating... \n", path.Join(PluginFolder, "controller")) - if err := os.Mkdir(path.Join(PluginFolder, "controller"), 0755); err != nil { - glog.Fatalf("Failed to create a controller's volumes folder with error: %v\n", err) - } - } else { - // Since "controller" folder exists, it means the rbdplugin has already been running, it means - // there might be some volumes left, they must be re-inserted into rbdVolumes map - loadExVolumes() - } - if _, err := os.Stat(path.Join(PluginFolder, "controller-snap")); os.IsNotExist(err) { - glog.Infof("rbd: folder %s not found. Creating... \n", path.Join(PluginFolder, "controller-snap")) - if err := os.Mkdir(path.Join(PluginFolder, "controller-snap"), 0755); err != nil { - glog.Fatalf("Failed to create a controller's snapshots folder with error: %v\n", err) - } - } else { - // Since "controller-snap" folder exists, it means the rbdplugin has already been running, it means - // there might be some snapshots left, they must be re-inserted into rbdSnapshots map - loadExSnapshots() - } -} - -// loadExSnapshots check for any *.json files in the PluginFolder/controller-snap folder -// and loads then into rbdSnapshots map -func loadExSnapshots() { - rbdSnap := rbdSnapshot{} - files, err := ioutil.ReadDir(path.Join(PluginFolder, "controller-snap")) - if err != nil { - glog.Infof("rbd: failed to read controller's snapshots folder: %s error:%v", path.Join(PluginFolder, "controller-snap"), err) - return - } - for _, f := range files { - if !strings.HasSuffix(f.Name(), ".json") { - continue - } - fp, err := os.Open(path.Join(PluginFolder, "controller-snap", f.Name())) - if err != nil { - glog.Infof("rbd: open file: %s err %%v", f.Name(), err) - continue - } - decoder := json.NewDecoder(fp) - if err = decoder.Decode(&rbdSnap); err != nil { - glog.Infof("rbd: decode file: %s err: %v", f.Name(), err) - fp.Close() - continue - } - rbdSnapshots[rbdSnap.SnapID] = &rbdSnap - } - glog.Infof("rbd: Loaded %d snapshots from %s", len(rbdSnapshots), path.Join(PluginFolder, "controller-snap")) -} - -// loadExVolumes check for any *.json files in the PluginFolder/controller folder -// and loads then into rbdVolumes map -func loadExVolumes() { - rbdVol := rbdVolume{} - files, err := ioutil.ReadDir(path.Join(PluginFolder, "controller")) - if err != nil { - glog.Infof("rbd: failed to read controller's volumes folder: %s error:%v", path.Join(PluginFolder, "controller"), err) - return - } - for _, f := range files { - if !strings.HasSuffix(f.Name(), ".json") { - continue - } - fp, err := os.Open(path.Join(PluginFolder, "controller", f.Name())) - if err != nil { - glog.Infof("rbd: open file: %s err %%v", f.Name(), err) - continue - } - decoder := json.NewDecoder(fp) - if err = decoder.Decode(&rbdVol); err != nil { - glog.Infof("rbd: decode file: %s err: %v", f.Name(), err) - fp.Close() - continue - } - rbdVolumes[rbdVol.VolID] = &rbdVol - } - glog.Infof("rbd: Loaded %d volumes from %s", len(rbdVolumes), path.Join(PluginFolder, "controller")) -} - func GetRBDDriver() *rbd { return &rbd{} } @@ -154,9 +61,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer { } } -func NewControllerServer(d *csicommon.CSIDriver) *controllerServer { +func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer { return &controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), + MetadataStore: cachePersister, } } @@ -175,7 +83,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*nodeServer, err }, nil } -func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool) { +func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) { var err error glog.Infof("Driver: %v version: %v", driverName, version) @@ -198,7 +106,10 @@ func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool) { if err != nil { glog.Fatalln("failed to start node server, err %v", err) } - rbd.cs = NewControllerServer(rbd.driver) + + rbd.cs = NewControllerServer(rbd.driver, cachePersister) + rbd.cs.LoadExDataFromMetadataStore() + s := csicommon.NewNonBlockingGRPCServer() s.Start(endpoint, rbd.ids, rbd.cs, rbd.ns) s.Wait() diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index accde1091..b2e4e88ef 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -17,11 +17,8 @@ limitations under the License. package rbd import ( - "encoding/json" "fmt" - "os" "os/exec" - "path" "strings" "time" @@ -316,95 +313,6 @@ func hasSnapshotFeature(imageFeatures string) bool { return false } -func persistVolInfo(image string, persistentStoragePath string, volInfo *rbdVolume) error { - file := path.Join(persistentStoragePath, image+".json") - fp, err := os.Create(file) - if err != nil { - glog.Errorf("rbd: failed to create persistent storage file %s with error: %v\n", file, err) - return errors.Wrapf(err, "rbd: create error for %s", file) - } - defer fp.Close() - encoder := json.NewEncoder(fp) - if err = encoder.Encode(volInfo); err != nil { - glog.Errorf("rbd: failed to encode volInfo: %+v for file: %s with error: %v\n", volInfo, file, err) - return errors.Wrap(err, "rbd: encode error") - } - glog.Infof("rbd: successfully saved volInfo: %+v into file: %s\n", volInfo, file) - return nil -} - -func loadVolInfo(image string, persistentStoragePath string, volInfo *rbdVolume) error { - file := path.Join(persistentStoragePath, image+".json") - fp, err := os.Open(file) - if err != nil { - return errors.Wrapf(err, "rbd: open error for %s", file) - } - defer fp.Close() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(volInfo); err != nil { - return errors.Wrap(err, "rbd: decode error") - } - - return nil -} - -func deleteVolInfo(image string, persistentStoragePath string) error { - file := path.Join(persistentStoragePath, image+".json") - glog.Infof("rbd: Deleting file for Volume: %s at: %s resulting path: %+v\n", image, persistentStoragePath, file) - err := os.Remove(file) - if err != nil { - if err != os.ErrNotExist { - return errors.Wrapf(err, "rbd: error removing file %s", file) - } - } - return nil -} - -func persistSnapInfo(snapshot string, persistentStoragePath string, snapInfo *rbdSnapshot) error { - file := path.Join(persistentStoragePath, snapshot+".json") - fp, err := os.Create(file) - if err != nil { - glog.Errorf("rbd: failed to create persistent storage file %s with error: %v\n", file, err) - return errors.Wrapf(err, "rbd: create error for %s", file) - } - defer fp.Close() - encoder := json.NewEncoder(fp) - if err = encoder.Encode(snapInfo); err != nil { - glog.Errorf("rbd: failed to encode snapInfo: %+v for file: %s with error: %v\n", snapInfo, file, err) - return errors.Wrap(err, "rbd: encode error") - } - glog.Infof("rbd: successfully saved snapInfo: %+v into file: %s\n", snapInfo, file) - return nil -} - -func loadSnapInfo(snapshot string, persistentStoragePath string, snapInfo *rbdSnapshot) error { - file := path.Join(persistentStoragePath, snapshot+".json") - fp, err := os.Open(file) - if err != nil { - return errors.Wrapf(err, "rbd: open error for %s", file) - } - defer fp.Close() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(snapInfo); err != nil { - return errors.Wrap(err, "rbd: decode error") - } - return nil -} - -func deleteSnapInfo(snapshot string, persistentStoragePath string) error { - file := path.Join(persistentStoragePath, snapshot+".json") - glog.Infof("rbd: Deleting file for Snapshot: %s at: %s resulting path: %+v\n", snapshot, persistentStoragePath, file) - err := os.Remove(file) - if err != nil { - if err != os.ErrNotExist { - return errors.Wrapf(err, "rbd: error removing file %s", file) - } - } - return nil -} - func getRBDVolumeByID(volumeID string) (*rbdVolume, error) { if rbdVol, ok := rbdVolumes[volumeID]; ok { return rbdVol, nil diff --git a/pkg/util/cachepersister.go b/pkg/util/cachepersister.go new file mode 100644 index 000000000..f6ae13f9d --- /dev/null +++ b/pkg/util/cachepersister.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "errors" + + "github.com/golang/glog" +) + +const ( + PluginFolder = "/var/lib/kubelet/plugins" +) + +type ForAllFunc func(identifier string) error + +type CachePersister interface { + Create(identifier string, data interface{}) error + Get(identifier string, data interface{}) error + ForAll(pattern string, destObj interface{}, f ForAllFunc) error + Delete(identifier string) error +} + +func NewCachePersister(metadataStore string, driverName string) (CachePersister, error) { + if metadataStore == "k8s_configmap" { + glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister") + k8scm := &K8sCMCache{} + k8scm.Client = NewK8sClient() + k8scm.Namespace = GetK8sNamespace() + return k8scm, nil + } else if metadataStore == "node" { + glog.Infof("cache-persister: using node as metadata cache persister") + nc := &NodeCache{} + nc.BasePath = PluginFolder + "/" + driverName + return nc, nil + } + return nil, errors.New("cache-persister: couldn't parse metadatastorage flag") +} diff --git a/pkg/util/k8scmcache.go b/pkg/util/k8scmcache.go new file mode 100644 index 000000000..c27447f14 --- /dev/null +++ b/pkg/util/k8scmcache.go @@ -0,0 +1,172 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "fmt" + "os" + "regexp" + + "github.com/golang/glog" + "github.com/pkg/errors" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +type K8sCMCache struct { + Client *k8s.Clientset + Namespace string +} + +const ( + defaultNamespace = "default" + + cmLabel = "csi-metadata" + cmDataKey = "content" + + csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata" +) + +func GetK8sNamespace() string { + namespace := os.Getenv("POD_NAMESPACE") + if namespace == "" { + return defaultNamespace + } + return namespace +} + +func NewK8sClient() *k8s.Clientset { + var cfg *rest.Config + var err error + cPath := os.Getenv("KUBERNETES_CONFIG_PATH") + if cPath != "" { + cfg, err = clientcmd.BuildConfigFromFlags("", cPath) + if err != nil { + glog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } else { + cfg, err = rest.InClusterConfig() + if err != nil { + glog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } + client, err := k8s.NewForConfig(cfg) + if err != nil { + glog.Errorf("Failed to create client with error: %v\n", err) + os.Exit(1) + } + return client +} + +func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) { + cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(resourceID, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return cm, nil +} + +func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { + listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)} + cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(listOpts) + if err != nil { + return errors.Wrap(err, "k8s-cm-cache: failed to list metadata configmaps") + } + + for _, cm := range cms.Items { + data := cm.Data[cmDataKey] + match, err := regexp.MatchString(pattern, cm.ObjectMeta.Name) + if err != nil { + continue + } + if !match { + continue + } + if err := json.Unmarshal([]byte(data), destObj); err != nil { + return errors.Wrap(err, "k8s-cm-cache: unmarshal error") + } + if err = f(cm.ObjectMeta.Name); err != nil { + return err + } + } + return nil +} + +func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error { + cm, err := k8scm.getMetadataCM(identifier) + if cm != nil && err == nil { + glog.V(4).Infof("k8s-cm-cache: configmap already exists, skipping configmap creation") + return nil + } else { + dataJson, err := json.Marshal(data) + if err != nil { + return errors.Wrap(err, "k8s-cm-cache: marshal error") + } + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: identifier, + Namespace: k8scm.Namespace, + Labels: map[string]string{ + csiMetadataLabelAttr: cmLabel, + }, + }, + Data: map[string]string{}, + } + cm.Data[cmDataKey] = string(dataJson) + + _, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(cm) + if err != nil { + if apierrs.IsAlreadyExists(err) { + glog.V(4).Infof("k8s-cm-cache: configmap already exists") + return nil + } + return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier) + } + + } + glog.V(4).Infof("k8s-cm-cache: configmap %s successfully created\n", identifier) + return nil +} + +func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error { + cm, err := k8scm.getMetadataCM(identifier) + if err != nil { + return err + } + err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data) + if err != nil { + return errors.Wrap(err, "k8s-cm-cache: unmarshal error") + } + return nil +} + +func (k8scm *K8sCMCache) Delete(identifier string) error { + err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(identifier, nil) + if err != nil { + return errors.Wrapf(err, "k8s-cm-cache: couldn't delete metadata configmap %s", identifier) + } + glog.V(4).Infof("k8s-cm-cache: successfully deleted metadata configmap %s", identifier) + return nil +} diff --git a/pkg/util/nodecache.go b/pkg/util/nodecache.go new file mode 100644 index 000000000..3e9411e7d --- /dev/null +++ b/pkg/util/nodecache.go @@ -0,0 +1,124 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "io/ioutil" + "os" + "path" + "path/filepath" + "regexp" + "strings" + + "github.com/golang/glog" + "github.com/pkg/errors" +) + +type NodeCache struct { + BasePath string +} + +var cacheDir = "controller" + +func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error { + fullPath := path.Join(nc.BasePath, cacheDir) + if _, err := os.Stat(fullPath); os.IsNotExist(err) { + if err := os.Mkdir(fullPath, 0755); err != nil { + return errors.Wrapf(err, "node-cache: failed to create %s folder with error: %v", fullPath, err) + } + } + return nil +} + +func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { + err := nc.EnsureCacheDirectory(cacheDir) + if err != nil { + return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists") + } + files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir)) + if err != nil { + return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath) + } + + for _, file := range files { + match, err := regexp.MatchString(pattern, file.Name()) + if err != nil || !match { + continue + } + if !strings.HasSuffix(file.Name(), ".json") { + continue + } + fp, err := os.Open(path.Join(nc.BasePath, cacheDir, file.Name())) + if err != nil { + glog.Infof("node-cache: open file: %s err %%v", file.Name(), err) + continue + } + decoder := json.NewDecoder(fp) + if err = decoder.Decode(destObj); err != nil { + fp.Close() + return errors.Wrapf(err, "node-cache: couldn't decode file %s", file.Name()) + } + if err := f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil { + return err + } + } + return nil +} + +func (nc *NodeCache) Create(identifier string, data interface{}) error { + file := path.Join(nc.BasePath, cacheDir, identifier+".json") + fp, err := os.Create(file) + if err != nil { + return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) + } + defer fp.Close() + encoder := json.NewEncoder(fp) + if err = encoder.Encode(data); err != nil { + return errors.Wrapf(err, "node-cache: failed to encode metadata for file: %s\n", file) + } + glog.V(4).Infof("node-cache: successfully saved metadata into file: %s\n", file) + return nil +} + +func (nc *NodeCache) Get(identifier string, data interface{}) error { + file := path.Join(nc.BasePath, cacheDir, identifier+".json") + fp, err := os.Open(file) + if err != nil { + return errors.Wrapf(err, "node-cache: open error for %s", file) + } + defer fp.Close() + + decoder := json.NewDecoder(fp) + if err = decoder.Decode(data); err != nil { + return errors.Wrap(err, "rbd: decode error") + } + + return nil +} + +func (nc *NodeCache) Delete(identifier string) error { + file := path.Join(nc.BasePath, cacheDir, identifier+".json") + err := os.Remove(file) + if err != nil { + if err != os.ErrNotExist { + return errors.Wrapf(err, "node-cache: error removing file %s", file) + } + } + glog.V(4).Infof("node-cache: successfully deleted metadata storage file at: %+v\n", file) + return nil +} diff --git a/rbd/main.go b/rbd/main.go index 649589441..2031245e0 100644 --- a/rbd/main.go +++ b/rbd/main.go @@ -22,6 +22,7 @@ import ( "path" "github.com/ceph/ceph-csi/pkg/rbd" + "github.com/ceph/ceph-csi/pkg/util" "github.com/golang/glog" ) @@ -30,10 +31,11 @@ func init() { } var ( - endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") - driverName = flag.String("drivername", "csi-rbdplugin", "name of the driver") - nodeID = flag.String("nodeid", "", "node id") - containerized = flag.Bool("containerized", true, "whether run as containerized") + endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") + driverName = flag.String("drivername", "csi-rbdplugin", "name of the driver") + nodeID = flag.String("nodeid", "", "node id") + containerized = flag.Bool("containerized", true, "whether run as containerized") + metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]") ) func main() { @@ -48,13 +50,16 @@ func main() { os.Exit(1) } - handle() - os.Exit(0) -} + cp, err := util.NewCachePersister(*metadataStorage, *driverName) + if err != nil { + glog.Errorf("failed to define cache persistence method: %v", err) + os.Exit(1) + } -func handle() { driver := rbd.GetRBDDriver() - driver.Run(*driverName, *nodeID, *endpoint, *containerized) + driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp) + + os.Exit(0) } func createPersistentStorage(persistentStoragePath string) error {