cm metadata persist for rbd and cephfs

This commit is contained in:
mickymiek 2018-12-19 15:26:16 +01:00
parent 27fa8a28b8
commit 12e6881669
15 changed files with 467 additions and 385 deletions

View File

@ -33,3 +33,7 @@
[[override]]
name = "github.com/golang/protobuf"
version = "1.1.0"
[[constraint]]
name = "k8s.io/client-go"
version = "kubernetes-1.10.0-beta.1"

View File

@ -22,6 +22,7 @@ import (
"path"
"github.com/ceph/ceph-csi/pkg/cephfs"
"github.com/ceph/ceph-csi/pkg/util"
"github.com/golang/glog"
)
@ -30,10 +31,11 @@ func init() {
}
var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver")
nodeId = flag.String("nodeid", "", "node id")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver")
nodeId = flag.String("nodeid", "", "node id")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
)
func main() {
@ -49,8 +51,14 @@ func main() {
os.Exit(1)
}
cp, err := util.NewCachePersister(*metadataStorage, *driverName)
if err != nil {
glog.Errorf("failed to define cache persistence method: %v", err)
os.Exit(1)
}
driver := cephfs.NewCephFSDriver()
driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter)
driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter, cp)
os.Exit(0)
}

View File

@ -50,7 +50,8 @@ spec:
- "--endpoint=$(CSI_ENDPOINT)"
- "--v=5"
- "--drivername=csi-rbdplugin"
- "--containerized=true"
- "--containerized=true"
- "--metadatastorage=node"
env:
- name: HOST_ROOTFS
value: "/rootfs"
@ -58,6 +59,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CSI_ENDPOINT
value: unix://var/lib/kubelet/plugins/csi-rbdplugin/csi.sock
imagePullPolicy: "IfNotPresent"

View File

@ -27,6 +27,9 @@ Option | Default value | Description
`--nodeid` | _empty_ | This node's ID
`--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed.
**Available environmental variables:**
`KUBERNETES_CONFIG_PATH`: if you use `k8s_configmap` as metadata store, specify the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself).
**Available volume parameters:**
Parameter | Required | Description

View File

@ -26,11 +26,13 @@ Option | Default value | Description
`--drivername` | `csi-cephfsplugin` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value)
`--nodeid` | _empty_ | This node's ID
`--containerized` | true | Whether running in containerized mode
`--metadatastorage` | _empty_ | Whether should metadata be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`)
**Available environmental variables:**
`HOST_ROOTFS`: rbdplugin searches `/proc` directory under the directory set by `HOST_ROOTFS`.
`KUBERNETES_CONFIG_PATH`: if you use `k8s_configmap` as metadata store, specify the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself).
**Available volume parameters:**
Parameter | Required | Description

View File

@ -1,128 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/golang/glog"
)
const (
controllerCacheRoot = PluginFolder + "/controller/plugin-cache"
)
type controllerCacheEntry struct {
VolOptions volumeOptions
VolumeID volumeID
}
type controllerCacheMap map[volumeID]*controllerCacheEntry
var (
ctrCache = make(controllerCacheMap)
ctrCacheMtx sync.Mutex
)
// Load all .json files from controllerCacheRoot into ctrCache
// Called from driver.go's Run()
func loadControllerCache() error {
cacheDir, err := ioutil.ReadDir(controllerCacheRoot)
if err != nil {
return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err)
}
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
for _, fi := range cacheDir {
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
continue
}
f, err := os.Open(path.Join(controllerCacheRoot, fi.Name()))
if err != nil {
glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err)
continue
}
d := json.NewDecoder(f)
ent := &controllerCacheEntry{}
if err = d.Decode(ent); err != nil {
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
} else {
ctrCache[ent.VolumeID] = ent
}
f.Close()
}
return nil
}
func getControllerCacheEntryPath(volId volumeID) string {
return path.Join(controllerCacheRoot, string(volId)+".json")
}
func (m controllerCacheMap) insert(ent *controllerCacheEntry) error {
filePath := getControllerCacheEntryPath(ent.VolumeID)
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err)
}
defer f.Close()
enc := json.NewEncoder(f)
if err = enc.Encode(ent); err != nil {
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err)
}
m[ent.VolumeID] = ent
return nil
}
func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) {
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("cache entry for volume %s does not exist", volId)
}
filePath := getControllerCacheEntryPath(volId)
if err := os.Remove(filePath); err != nil {
return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err)
}
delete(m, volId)
return ent, nil
}

View File

@ -24,10 +24,18 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/ceph/ceph-csi/pkg/util"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
}
type controllerCacheEntry struct {
VolOptions volumeOptions
VolumeID volumeID
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
@ -35,7 +43,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
return nil, err
}
// Configuration
volOptions, err := newVolumeOptions(req.GetParameters())
@ -56,7 +63,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if volOptions.ProvisionVolume {
// Admin credentials are required
cr, err := getAdminCredentials(req.GetControllerCreateSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -82,7 +88,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.Infof("cephfs: volume %s is provisioned statically", volId)
}
if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil {
ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}
if err := cs.MetadataStore.Create(string(volId), ce); err != nil {
glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
@ -107,30 +114,18 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
err error
)
// Load volume info from cache
ent, err := ctrCache.pop(volId)
if err != nil {
glog.Error(err)
ce := &controllerCacheEntry{}
if err := cs.MetadataStore.Get(string(volId), ce); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !ent.VolOptions.ProvisionVolume {
if !ce.VolOptions.ProvisionVolume {
// DeleteVolume() is forbidden for statically provisioned volumes!
glog.Warningf("volume %s is provisioned statically, aborting delete", volId)
return &csi.DeleteVolumeResponse{}, nil
}
defer func() {
if err != nil {
// Reinsert cache entry for retry
if insErr := ctrCache.insert(ent); insErr != nil {
glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err)
}
}
}()
// Deleting a volume requires admin credentials
cr, err := getAdminCredentials(req.GetControllerDeleteSecrets())
@ -139,7 +134,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil {
if err = purgeVolume(volId, cr, &ce.VolOptions); err != nil {
glog.Errorf("failed to delete volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
@ -149,6 +144,10 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error())
}
if err := cs.MetadataStore.Delete(string(volId)); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfully deleted volume %s", volId)
return &csi.DeleteVolumeResponse{}, nil

View File

@ -17,12 +17,12 @@ limitations under the License.
package cephfs
import (
"os"
"github.com/golang/glog"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/ceph/ceph-csi/pkg/util"
)
const (
@ -56,9 +56,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer {
}
}
func NewControllerServer(d *csicommon.CSIDriver) *controllerServer {
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}
@ -68,20 +69,11 @@ func NewNodeServer(d *csicommon.CSIDriver) *nodeServer {
}
}
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) {
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string, cachePersister util.CachePersister) {
glog.Infof("Driver: %v version: %v", driverName, Version)
// Configuration
if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil {
glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err)
return
}
if err := loadControllerCache(); err != nil {
glog.Errorf("cephfs: failed to read volume cache: %v", err)
}
if err := loadAvailableMounters(); err != nil {
glog.Fatalf("cephfs: failed to load ceph mounters: %v", err)
}
@ -120,7 +112,9 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string)
fs.is = NewIdentityServer(fs.driver)
fs.ns = NewNodeServer(fs.driver)
fs.cs = NewControllerServer(fs.driver)
fs.cs = NewControllerServer(fs.driver, cachePersister)
//fs.cs.LoadExDataFromMetadataStore()
server := csicommon.NewNonBlockingGRPCServer()
server.Start(endpoint, fs.is, fs.cs, fs.ns)

View File

@ -20,10 +20,10 @@ import (
"fmt"
"os"
"os/exec"
"path"
"syscall"
"time"
"github.com/ceph/ceph-csi/pkg/util"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
@ -40,6 +40,28 @@ const (
type controllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
}
var (
rbdVolumes = map[string]*rbdVolume{}
rbdSnapshots = map[string]*rbdSnapshot{}
)
func (cs *controllerServer) LoadExDataFromMetadataStore() error {
vol := &rbdVolume{}
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
rbdVolumes[identifier] = vol
return nil
})
snap := &rbdSnapshot{}
cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
rbdSnapshots[identifier] = snap
return nil
})
glog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
return nil
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
@ -92,7 +114,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volName = rbdVol.Pool + "-dynamic-pvc-" + uniqueID
}
rbdVol.VolName = volName
volumeID := "csi-rbd-" + uniqueID
volumeID := "csi-rbd-vol-" + uniqueID
rbdVol.VolID = volumeID
// Volume Size - Default is 1 GiB
volSizeBytes := int64(oneGB)
@ -118,7 +140,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
rbdSnap := &rbdSnapshot{}
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
return nil, err
}
@ -137,11 +159,15 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.V(4).Infof("create volume %s", volName)
}
}
// Storing volInfo into a persistent file.
if err := persistVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
glog.Warningf("rbd: failed to store volInfo with error: %v", err)
if err := cs.MetadataStore.Create(volumeID, rbdVol); err != nil {
glog.Warningf("failed to store volume metadata with error: %v", err)
if err := deleteRBDImage(rbdVol, rbdVol.AdminId, req.GetControllerCreateSecrets()); err != nil {
glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
return nil, err
}
return nil, err
}
rbdVolumes[volumeID] = rbdVol
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
@ -161,14 +187,15 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
volumeID := req.GetVolumeId()
volumeIDMutex.LockKey(volumeID)
defer volumeIDMutex.UnlockKey(volumeID)
rbdVol := &rbdVolume{}
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if os.IsNotExist(errors.Cause(err)) {
// Must have been deleted already. This is not an error (idempotency!).
return &csi.DeleteVolumeResponse{}, nil
}
return nil, err
}
volName := rbdVol.VolName
// Deleting rbd image
glog.V(4).Infof("deleting volume %s", volName)
@ -177,8 +204,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, volName, err)
return nil, err
}
// Removing persistent storage file for the unmapped volume
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
if err := cs.MetadataStore.Delete(volumeID); err != nil {
return nil, err
}
@ -296,24 +323,21 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
rbdSnap.CreatedAt = time.Now().UnixNano()
// Storing snapInfo into a persistent file.
if err := persistSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
if err := cs.MetadataStore.Create(snapshotID, rbdSnap); err != nil {
glog.Warningf("rbd: failed to store snapInfo with error: %v", err)
// Unprotect snapshot
err := unprotectSnapshot(rbdSnap, rbdSnap.AdminId, req.GetCreateSnapshotSecrets())
if err != nil {
return nil, status.Error(codes.Unknown, fmt.Sprintf("This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err))
}
// Deleting snapshot
glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
if err := deleteSnapshot(rbdSnap, rbdSnap.AdminId, req.GetCreateSnapshotSecrets()); err != nil {
return nil, status.Error(codes.Unknown, fmt.Sprintf("This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err))
}
return nil, err
}
rbdSnapshots[snapshotID] = rbdSnap
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
@ -342,7 +366,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
defer snapshotIDMutex.UnlockKey(snapshotID)
rbdSnap := &rbdSnapshot{}
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
return nil, err
}
@ -358,8 +382,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err))
}
// Removing persistent storage file for the unmapped snapshot
if err := deleteSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap")); err != nil {
if err := cs.MetadataStore.Delete(snapshotID); err != nil {
return nil, err
}

View File

@ -17,14 +17,9 @@ limitations under the License.
package rbd
import (
"encoding/json"
"io/ioutil"
"os"
"path"
"strings"
"github.com/golang/glog"
"github.com/ceph/ceph-csi/pkg/util"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
@ -56,94 +51,6 @@ var (
version = "0.3.0"
)
var rbdVolumes map[string]*rbdVolume
var rbdSnapshots map[string]*rbdSnapshot
// Init checks for the persistent volume file and loads all found volumes
// into a memory structure
func init() {
rbdVolumes = map[string]*rbdVolume{}
rbdSnapshots = map[string]*rbdSnapshot{}
if _, err := os.Stat(path.Join(PluginFolder, "controller")); os.IsNotExist(err) {
glog.Infof("rbd: folder %s not found. Creating... \n", path.Join(PluginFolder, "controller"))
if err := os.Mkdir(path.Join(PluginFolder, "controller"), 0755); err != nil {
glog.Fatalf("Failed to create a controller's volumes folder with error: %v\n", err)
}
} else {
// Since "controller" folder exists, it means the rbdplugin has already been running, it means
// there might be some volumes left, they must be re-inserted into rbdVolumes map
loadExVolumes()
}
if _, err := os.Stat(path.Join(PluginFolder, "controller-snap")); os.IsNotExist(err) {
glog.Infof("rbd: folder %s not found. Creating... \n", path.Join(PluginFolder, "controller-snap"))
if err := os.Mkdir(path.Join(PluginFolder, "controller-snap"), 0755); err != nil {
glog.Fatalf("Failed to create a controller's snapshots folder with error: %v\n", err)
}
} else {
// Since "controller-snap" folder exists, it means the rbdplugin has already been running, it means
// there might be some snapshots left, they must be re-inserted into rbdSnapshots map
loadExSnapshots()
}
}
// loadExSnapshots check for any *.json files in the PluginFolder/controller-snap folder
// and loads then into rbdSnapshots map
func loadExSnapshots() {
rbdSnap := rbdSnapshot{}
files, err := ioutil.ReadDir(path.Join(PluginFolder, "controller-snap"))
if err != nil {
glog.Infof("rbd: failed to read controller's snapshots folder: %s error:%v", path.Join(PluginFolder, "controller-snap"), err)
return
}
for _, f := range files {
if !strings.HasSuffix(f.Name(), ".json") {
continue
}
fp, err := os.Open(path.Join(PluginFolder, "controller-snap", f.Name()))
if err != nil {
glog.Infof("rbd: open file: %s err %%v", f.Name(), err)
continue
}
decoder := json.NewDecoder(fp)
if err = decoder.Decode(&rbdSnap); err != nil {
glog.Infof("rbd: decode file: %s err: %v", f.Name(), err)
fp.Close()
continue
}
rbdSnapshots[rbdSnap.SnapID] = &rbdSnap
}
glog.Infof("rbd: Loaded %d snapshots from %s", len(rbdSnapshots), path.Join(PluginFolder, "controller-snap"))
}
// loadExVolumes check for any *.json files in the PluginFolder/controller folder
// and loads then into rbdVolumes map
func loadExVolumes() {
rbdVol := rbdVolume{}
files, err := ioutil.ReadDir(path.Join(PluginFolder, "controller"))
if err != nil {
glog.Infof("rbd: failed to read controller's volumes folder: %s error:%v", path.Join(PluginFolder, "controller"), err)
return
}
for _, f := range files {
if !strings.HasSuffix(f.Name(), ".json") {
continue
}
fp, err := os.Open(path.Join(PluginFolder, "controller", f.Name()))
if err != nil {
glog.Infof("rbd: open file: %s err %%v", f.Name(), err)
continue
}
decoder := json.NewDecoder(fp)
if err = decoder.Decode(&rbdVol); err != nil {
glog.Infof("rbd: decode file: %s err: %v", f.Name(), err)
fp.Close()
continue
}
rbdVolumes[rbdVol.VolID] = &rbdVol
}
glog.Infof("rbd: Loaded %d volumes from %s", len(rbdVolumes), path.Join(PluginFolder, "controller"))
}
func GetRBDDriver() *rbd {
return &rbd{}
}
@ -154,9 +61,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer {
}
}
func NewControllerServer(d *csicommon.CSIDriver) *controllerServer {
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}
@ -175,7 +83,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*nodeServer, err
}, nil
}
func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool) {
func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) {
var err error
glog.Infof("Driver: %v version: %v", driverName, version)
@ -198,7 +106,10 @@ func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool) {
if err != nil {
glog.Fatalln("failed to start node server, err %v", err)
}
rbd.cs = NewControllerServer(rbd.driver)
rbd.cs = NewControllerServer(rbd.driver, cachePersister)
rbd.cs.LoadExDataFromMetadataStore()
s := csicommon.NewNonBlockingGRPCServer()
s.Start(endpoint, rbd.ids, rbd.cs, rbd.ns)
s.Wait()

View File

@ -17,11 +17,8 @@ limitations under the License.
package rbd
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"strings"
"time"
@ -316,95 +313,6 @@ func hasSnapshotFeature(imageFeatures string) bool {
return false
}
func persistVolInfo(image string, persistentStoragePath string, volInfo *rbdVolume) error {
file := path.Join(persistentStoragePath, image+".json")
fp, err := os.Create(file)
if err != nil {
glog.Errorf("rbd: failed to create persistent storage file %s with error: %v\n", file, err)
return errors.Wrapf(err, "rbd: create error for %s", file)
}
defer fp.Close()
encoder := json.NewEncoder(fp)
if err = encoder.Encode(volInfo); err != nil {
glog.Errorf("rbd: failed to encode volInfo: %+v for file: %s with error: %v\n", volInfo, file, err)
return errors.Wrap(err, "rbd: encode error")
}
glog.Infof("rbd: successfully saved volInfo: %+v into file: %s\n", volInfo, file)
return nil
}
func loadVolInfo(image string, persistentStoragePath string, volInfo *rbdVolume) error {
file := path.Join(persistentStoragePath, image+".json")
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "rbd: open error for %s", file)
}
defer fp.Close()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(volInfo); err != nil {
return errors.Wrap(err, "rbd: decode error")
}
return nil
}
func deleteVolInfo(image string, persistentStoragePath string) error {
file := path.Join(persistentStoragePath, image+".json")
glog.Infof("rbd: Deleting file for Volume: %s at: %s resulting path: %+v\n", image, persistentStoragePath, file)
err := os.Remove(file)
if err != nil {
if err != os.ErrNotExist {
return errors.Wrapf(err, "rbd: error removing file %s", file)
}
}
return nil
}
func persistSnapInfo(snapshot string, persistentStoragePath string, snapInfo *rbdSnapshot) error {
file := path.Join(persistentStoragePath, snapshot+".json")
fp, err := os.Create(file)
if err != nil {
glog.Errorf("rbd: failed to create persistent storage file %s with error: %v\n", file, err)
return errors.Wrapf(err, "rbd: create error for %s", file)
}
defer fp.Close()
encoder := json.NewEncoder(fp)
if err = encoder.Encode(snapInfo); err != nil {
glog.Errorf("rbd: failed to encode snapInfo: %+v for file: %s with error: %v\n", snapInfo, file, err)
return errors.Wrap(err, "rbd: encode error")
}
glog.Infof("rbd: successfully saved snapInfo: %+v into file: %s\n", snapInfo, file)
return nil
}
func loadSnapInfo(snapshot string, persistentStoragePath string, snapInfo *rbdSnapshot) error {
file := path.Join(persistentStoragePath, snapshot+".json")
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "rbd: open error for %s", file)
}
defer fp.Close()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(snapInfo); err != nil {
return errors.Wrap(err, "rbd: decode error")
}
return nil
}
func deleteSnapInfo(snapshot string, persistentStoragePath string) error {
file := path.Join(persistentStoragePath, snapshot+".json")
glog.Infof("rbd: Deleting file for Snapshot: %s at: %s resulting path: %+v\n", snapshot, persistentStoragePath, file)
err := os.Remove(file)
if err != nil {
if err != os.ErrNotExist {
return errors.Wrapf(err, "rbd: error removing file %s", file)
}
}
return nil
}
func getRBDVolumeByID(volumeID string) (*rbdVolume, error) {
if rbdVol, ok := rbdVolumes[volumeID]; ok {
return rbdVol, nil

View File

@ -0,0 +1,52 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"errors"
"github.com/golang/glog"
)
const (
PluginFolder = "/var/lib/kubelet/plugins"
)
type ForAllFunc func(identifier string) error
type CachePersister interface {
Create(identifier string, data interface{}) error
Get(identifier string, data interface{}) error
ForAll(pattern string, destObj interface{}, f ForAllFunc) error
Delete(identifier string) error
}
func NewCachePersister(metadataStore string, driverName string) (CachePersister, error) {
if metadataStore == "k8s_configmap" {
glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")
k8scm := &K8sCMCache{}
k8scm.Client = NewK8sClient()
k8scm.Namespace = GetK8sNamespace()
return k8scm, nil
} else if metadataStore == "node" {
glog.Infof("cache-persister: using node as metadata cache persister")
nc := &NodeCache{}
nc.BasePath = PluginFolder + "/" + driverName
return nc, nil
}
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
}

172
pkg/util/k8scmcache.go Normal file
View File

@ -0,0 +1,172 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"os"
"regexp"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type K8sCMCache struct {
Client *k8s.Clientset
Namespace string
}
const (
defaultNamespace = "default"
cmLabel = "csi-metadata"
cmDataKey = "content"
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
)
func GetK8sNamespace() string {
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return defaultNamespace
}
return namespace
}
func NewK8sClient() *k8s.Clientset {
var cfg *rest.Config
var err error
cPath := os.Getenv("KUBERNETES_CONFIG_PATH")
if cPath != "" {
cfg, err = clientcmd.BuildConfigFromFlags("", cPath)
if err != nil {
glog.Errorf("Failed to get cluster config with error: %v\n", err)
os.Exit(1)
}
} else {
cfg, err = rest.InClusterConfig()
if err != nil {
glog.Errorf("Failed to get cluster config with error: %v\n", err)
os.Exit(1)
}
}
client, err := k8s.NewForConfig(cfg)
if err != nil {
glog.Errorf("Failed to create client with error: %v\n", err)
os.Exit(1)
}
return client
}
func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) {
cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(resourceID, metav1.GetOptions{})
if err != nil {
return nil, err
}
return cm, nil
}
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(listOpts)
if err != nil {
return errors.Wrap(err, "k8s-cm-cache: failed to list metadata configmaps")
}
for _, cm := range cms.Items {
data := cm.Data[cmDataKey]
match, err := regexp.MatchString(pattern, cm.ObjectMeta.Name)
if err != nil {
continue
}
if !match {
continue
}
if err := json.Unmarshal([]byte(data), destObj); err != nil {
return errors.Wrap(err, "k8s-cm-cache: unmarshal error")
}
if err = f(cm.ObjectMeta.Name); err != nil {
return err
}
}
return nil
}
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if cm != nil && err == nil {
glog.V(4).Infof("k8s-cm-cache: configmap already exists, skipping configmap creation")
return nil
} else {
dataJson, err := json.Marshal(data)
if err != nil {
return errors.Wrap(err, "k8s-cm-cache: marshal error")
}
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: identifier,
Namespace: k8scm.Namespace,
Labels: map[string]string{
csiMetadataLabelAttr: cmLabel,
},
},
Data: map[string]string{},
}
cm.Data[cmDataKey] = string(dataJson)
_, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(cm)
if err != nil {
if apierrs.IsAlreadyExists(err) {
glog.V(4).Infof("k8s-cm-cache: configmap already exists")
return nil
}
return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier)
}
}
glog.V(4).Infof("k8s-cm-cache: configmap %s successfully created\n", identifier)
return nil
}
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if err != nil {
return err
}
err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data)
if err != nil {
return errors.Wrap(err, "k8s-cm-cache: unmarshal error")
}
return nil
}
func (k8scm *K8sCMCache) Delete(identifier string) error {
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(identifier, nil)
if err != nil {
return errors.Wrapf(err, "k8s-cm-cache: couldn't delete metadata configmap %s", identifier)
}
glog.V(4).Infof("k8s-cm-cache: successfully deleted metadata configmap %s", identifier)
return nil
}

124
pkg/util/nodecache.go Normal file
View File

@ -0,0 +1,124 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"github.com/golang/glog"
"github.com/pkg/errors"
)
type NodeCache struct {
BasePath string
}
var cacheDir = "controller"
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
fullPath := path.Join(nc.BasePath, cacheDir)
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
if err := os.Mkdir(fullPath, 0755); err != nil {
return errors.Wrapf(err, "node-cache: failed to create %s folder with error: %v", fullPath, err)
}
}
return nil
}
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
err := nc.EnsureCacheDirectory(cacheDir)
if err != nil {
return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists")
}
files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir))
if err != nil {
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
}
for _, file := range files {
match, err := regexp.MatchString(pattern, file.Name())
if err != nil || !match {
continue
}
if !strings.HasSuffix(file.Name(), ".json") {
continue
}
fp, err := os.Open(path.Join(nc.BasePath, cacheDir, file.Name()))
if err != nil {
glog.Infof("node-cache: open file: %s err %%v", file.Name(), err)
continue
}
decoder := json.NewDecoder(fp)
if err = decoder.Decode(destObj); err != nil {
fp.Close()
return errors.Wrapf(err, "node-cache: couldn't decode file %s", file.Name())
}
if err := f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil {
return err
}
}
return nil
}
func (nc *NodeCache) Create(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
fp, err := os.Create(file)
if err != nil {
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
}
defer fp.Close()
encoder := json.NewEncoder(fp)
if err = encoder.Encode(data); err != nil {
return errors.Wrapf(err, "node-cache: failed to encode metadata for file: %s\n", file)
}
glog.V(4).Infof("node-cache: successfully saved metadata into file: %s\n", file)
return nil
}
func (nc *NodeCache) Get(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "node-cache: open error for %s", file)
}
defer fp.Close()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(data); err != nil {
return errors.Wrap(err, "rbd: decode error")
}
return nil
}
func (nc *NodeCache) Delete(identifier string) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
err := os.Remove(file)
if err != nil {
if err != os.ErrNotExist {
return errors.Wrapf(err, "node-cache: error removing file %s", file)
}
}
glog.V(4).Infof("node-cache: successfully deleted metadata storage file at: %+v\n", file)
return nil
}

View File

@ -22,6 +22,7 @@ import (
"path"
"github.com/ceph/ceph-csi/pkg/rbd"
"github.com/ceph/ceph-csi/pkg/util"
"github.com/golang/glog"
)
@ -30,10 +31,11 @@ func init() {
}
var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-rbdplugin", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
containerized = flag.Bool("containerized", true, "whether run as containerized")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-rbdplugin", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
containerized = flag.Bool("containerized", true, "whether run as containerized")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
)
func main() {
@ -48,13 +50,16 @@ func main() {
os.Exit(1)
}
handle()
os.Exit(0)
}
cp, err := util.NewCachePersister(*metadataStorage, *driverName)
if err != nil {
glog.Errorf("failed to define cache persistence method: %v", err)
os.Exit(1)
}
func handle() {
driver := rbd.GetRBDDriver()
driver.Run(*driverName, *nodeID, *endpoint, *containerized)
driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp)
os.Exit(0)
}
func createPersistentStorage(persistentStoragePath string) error {