cm metadata persist for rbd and cephfs

This commit is contained in:
mickymiek
2018-12-19 15:26:16 +01:00
committed by Huamin Chen
parent 51d6ac6f55
commit 62d65ad0cb
15 changed files with 467 additions and 385 deletions

View File

@ -1,128 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/golang/glog"
)
const (
controllerCacheRoot = PluginFolder + "/controller/plugin-cache"
)
type controllerCacheEntry struct {
VolOptions volumeOptions
VolumeID volumeID
}
type controllerCacheMap map[volumeID]*controllerCacheEntry
var (
ctrCache = make(controllerCacheMap)
ctrCacheMtx sync.Mutex
)
// Load all .json files from controllerCacheRoot into ctrCache
// Called from driver.go's Run()
func loadControllerCache() error {
cacheDir, err := ioutil.ReadDir(controllerCacheRoot)
if err != nil {
return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err)
}
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
for _, fi := range cacheDir {
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
continue
}
f, err := os.Open(path.Join(controllerCacheRoot, fi.Name()))
if err != nil {
glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err)
continue
}
d := json.NewDecoder(f)
ent := &controllerCacheEntry{}
if err = d.Decode(ent); err != nil {
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
} else {
ctrCache[ent.VolumeID] = ent
}
f.Close()
}
return nil
}
func getControllerCacheEntryPath(volId volumeID) string {
return path.Join(controllerCacheRoot, string(volId)+".json")
}
func (m controllerCacheMap) insert(ent *controllerCacheEntry) error {
filePath := getControllerCacheEntryPath(ent.VolumeID)
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err)
}
defer f.Close()
enc := json.NewEncoder(f)
if err = enc.Encode(ent); err != nil {
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err)
}
m[ent.VolumeID] = ent
return nil
}
func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) {
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("cache entry for volume %s does not exist", volId)
}
filePath := getControllerCacheEntryPath(volId)
if err := os.Remove(filePath); err != nil {
return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err)
}
delete(m, volId)
return ent, nil
}

View File

@ -24,10 +24,18 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/ceph/ceph-csi/pkg/util"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
}
type controllerCacheEntry struct {
VolOptions volumeOptions
VolumeID volumeID
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
@ -35,7 +43,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
return nil, err
}
// Configuration
volOptions, err := newVolumeOptions(req.GetParameters())
@ -56,7 +63,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if volOptions.ProvisionVolume {
// Admin credentials are required
cr, err := getAdminCredentials(req.GetControllerCreateSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -82,7 +88,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.Infof("cephfs: volume %s is provisioned statically", volId)
}
if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil {
ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}
if err := cs.MetadataStore.Create(string(volId), ce); err != nil {
glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
@ -107,30 +114,18 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
err error
)
// Load volume info from cache
ent, err := ctrCache.pop(volId)
if err != nil {
glog.Error(err)
ce := &controllerCacheEntry{}
if err := cs.MetadataStore.Get(string(volId), ce); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !ent.VolOptions.ProvisionVolume {
if !ce.VolOptions.ProvisionVolume {
// DeleteVolume() is forbidden for statically provisioned volumes!
glog.Warningf("volume %s is provisioned statically, aborting delete", volId)
return &csi.DeleteVolumeResponse{}, nil
}
defer func() {
if err != nil {
// Reinsert cache entry for retry
if insErr := ctrCache.insert(ent); insErr != nil {
glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err)
}
}
}()
// Deleting a volume requires admin credentials
cr, err := getAdminCredentials(req.GetControllerDeleteSecrets())
@ -139,7 +134,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil {
if err = purgeVolume(volId, cr, &ce.VolOptions); err != nil {
glog.Errorf("failed to delete volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
@ -149,6 +144,10 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error())
}
if err := cs.MetadataStore.Delete(string(volId)); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfully deleted volume %s", volId)
return &csi.DeleteVolumeResponse{}, nil

View File

@ -17,12 +17,12 @@ limitations under the License.
package cephfs
import (
"os"
"github.com/golang/glog"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
"github.com/ceph/ceph-csi/pkg/util"
)
const (
@ -56,9 +56,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer {
}
}
func NewControllerServer(d *csicommon.CSIDriver) *controllerServer {
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}
@ -68,20 +69,11 @@ func NewNodeServer(d *csicommon.CSIDriver) *nodeServer {
}
}
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) {
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string, cachePersister util.CachePersister) {
glog.Infof("Driver: %v version: %v", driverName, Version)
// Configuration
if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil {
glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err)
return
}
if err := loadControllerCache(); err != nil {
glog.Errorf("cephfs: failed to read volume cache: %v", err)
}
if err := loadAvailableMounters(); err != nil {
glog.Fatalf("cephfs: failed to load ceph mounters: %v", err)
}
@ -120,7 +112,9 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string)
fs.is = NewIdentityServer(fs.driver)
fs.ns = NewNodeServer(fs.driver)
fs.cs = NewControllerServer(fs.driver)
fs.cs = NewControllerServer(fs.driver, cachePersister)
//fs.cs.LoadExDataFromMetadataStore()
server := csicommon.NewNonBlockingGRPCServer()
server.Start(endpoint, fs.is, fs.cs, fs.ns)