issue #217

Goal

we try to solve when csi exit unexpect, the pod use cephfs pv can not auto recovery because lost mount relation until pod be killed and reschedule to other node. i think this is may be a problem. may be csi plugin can do more thing to remount the old path so when pod may be auto recovery when pod exit and restart, the old mount path can use.

NoGoal

Pod should exit and restart when csi plugin pod exit and mount point lost. if pod not exit will get error of **transport endpoint is not connected**.

implment logic

csi-plugin start:

	1. load all MountCachEntry  from node local dir
	2. check if volID exist in cluster, if no we ignore this entry, if yes continue
	3. check if stagingPath exist, if yes we mount the path
	4. check if all targetPath exist, if yes we binmount to staging path

NodeServer:

1. NodeStageVolume: add MountCachEntry on local dir include readonly attr and ceph secret
2. NodeStagePublishVolume: add pod bind mount path to MountCachEntry  and persist local dir
3. NodeStageunPublishVolume: remove pod bind mount path From MountCachEntry  and persist local dir
4. NodeStageunStageVolume: remove MountCachEntry  from local dir
This commit is contained in:
王怀宗 2019-03-25 22:47:39 +08:00
parent ff7d649c9d
commit b318964af5
6 changed files with 395 additions and 8 deletions

View File

@ -105,6 +105,10 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
klog.Fatalf("failed to write ceph configuration file: %v", err)
}
if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil {
klog.Warningf("failed to remounted history mounted path: %v", err)
//ignore remount fail
}
// Initialize default library driver
fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)

314
pkg/cephfs/mountcache.go Normal file
View File

@ -0,0 +1,314 @@
package cephfs
import (
"encoding/base64"
"os"
"sync"
"syscall"
"time"
"github.com/ceph/ceph-csi/pkg/util"
"github.com/pkg/errors"
"k8s.io/klog"
)
type volumeMountEntry struct {
NodeID string `json:"nodeID"`
DriverName string `json:"driverName"`
DriverVersion string `json:"driverVersion"`
Namespace string `json:"namespace"`
VolumeID string `json:"volumeID"`
Secrets map[string]string `json:"secrets"`
StagingPath string `json:"stagingPath"`
TargetPaths map[string]bool `json:"targetPaths"`
CreateTime time.Time `json:"createTime"`
LastMountTime time.Time `json:"lastMountTime"`
LoadCount uint64 `json:"loadCount"`
}
type volumeMountCacheMap struct {
DriverName string
DriverVersion string
NodeID string
MountFailNum int64
MountSuccNum int64
Volumes map[string]volumeMountEntry
NodeCacheStore util.NodeCache
MetadataStore util.CachePersister
}
var (
csiPersistentVolumeRoot = "/var/lib/kubelet/plugins/kubernetes.io/csi"
volumeMountCachePrefix = "cephfs-mount-cache-"
volumeMountCache volumeMountCacheMap
volumeMountCacheMtx sync.Mutex
)
func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error {
volumeMountCache.Volumes = make(map[string]volumeMountEntry)
volumeMountCache.NodeID = nodeID
volumeMountCache.DriverName = name
volumeMountCache.DriverVersion = v
volumeMountCache.MountSuccNum = 0
volumeMountCache.MountFailNum = 0
volumeMountCache.MetadataStore = cachePersister
volumeMountCache.NodeCacheStore.BasePath = PluginFolder
volumeMountCache.NodeCacheStore.CacheDir = "volumes-mount-cache"
if _, err := os.Stat(csiPersistentVolumeRoot); err != nil {
klog.Infof("mount-cache: csi pv root path %s stat fail %v, may not in daemonset csi plugin, exit", csiPersistentVolumeRoot, err)
return err
}
if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil {
klog.Fatalf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err)
return err
}
me := &volumeMountEntry{}
ce := &controllerCacheEntry{}
err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
volID := me.VolumeID
klog.Infof("mount-cache: load %v", me)
if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID)
}
}
} else {
if err := mountOneCacheEntry(ce, me); err == nil {
volumeMountCache.MountSuccNum++
volumeMountCache.Volumes[me.VolumeID] = *me
} else {
volumeMountCache.MountFailNum++
}
}
return nil
})
if err != nil {
klog.Infof("mount-cache: metastore list cache fail %v", err)
return err
}
if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum {
return errors.New("mount-cache: too many volumes mount fail")
}
klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum)
return nil
}
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
var err error
volID := ce.VolumeID
volOptions := ce.VolOptions
adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}
entity, err := getCephUser(&volOptions, adminCr, volID)
if err != nil {
klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath)
}
cr := entity.toCredentials()
if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volID)
}
err = cleanupMountPoint(me.StagingPath)
if err != nil {
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
return err
}
isMnt, err := isMountPoint(me.StagingPath)
if err != nil {
isMnt = false
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
}
if !isMnt {
m, err := newMounter(&volOptions)
if err != nil {
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
return err
}
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
return err
}
}
for targetPath, readOnly := range me.TargetPaths {
if err := cleanupMountPoint(targetPath); err == nil {
if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil {
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
volID, me.StagingPath, targetPath, readOnly, err)
} else {
klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v",
volID, me.StagingPath, targetPath, readOnly)
}
}
}
return nil
}
func cleanupMountPoint(mountPoint string) error {
if _, err := os.Stat(mountPoint); err != nil {
if IsCorruptedMnt(err) {
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
err := execCommandErr("umount", mountPoint)
if err != nil {
klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err)
//ignore error return err
}
}
}
if _, err := os.Stat(mountPoint); err != nil {
klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err)
return err
}
return nil
}
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES
}
func genVolumeMountCacheFileName(volID string) string {
cachePath := volumeMountCachePrefix + volID
return cachePath
}
func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
lastTargetPaths := make(map[string]bool)
me, ok := volumeMountCache.Volumes[volID]
if ok {
if me.StagingPath == stagingTargetPath {
klog.Infof("mount-cache: node stage volume last cache entry for volume %s stagingTargetPath %s no equal %s",
volID, me.StagingPath, stagingTargetPath)
return nil
}
lastTargetPaths = me.TargetPaths
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
}
me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion}
me.VolumeID = volID
me.Secrets = encodeCredentials(secrets)
me.StagingPath = stagingTargetPath
me.TargetPaths = lastTargetPaths
curTime := time.Now()
me.CreateTime = curTime
me.CreateTime = curTime
me.LoadCount = 0
volumeMountCache.Volumes[volID] = me
if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err)
return err
}
klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me)
return nil
}
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
delete(volumeMountCache.Volumes, volID)
if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil {
klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err)
return err
}
return nil
}
func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
_, ok := volumeMountCache.Volumes[volID]
if !ok {
klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID)
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
}
volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly
me := volumeMountCache.Volumes[volID]
if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil {
klog.Errorf("mount-cache: node publish volume failed to store a cache entry for volume %s: %v", volID, err)
return err
}
return nil
}
func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
_, ok := volumeMountCache.Volumes[volID]
if !ok {
klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID)
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
}
delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath)
me := volumeMountCache.Volumes[volID]
if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil {
klog.Errorf("mount-cache: node unpublish volume failed to store a cache entry for volume %s: %v", volID, err)
return err
}
return nil
}
func encodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey := base64.StdEncoding.EncodeToString([]byte(key))
nValue := base64.StdEncoding.EncodeToString([]byte(value))
output[nKey] = nValue
}
return output
}
func decodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey, err := base64.StdEncoding.DecodeString(key)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
nValue, err := base64.StdEncoding.DecodeString(value)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
output[string(nKey)] = string(nValue)
}
return output
}

View File

@ -0,0 +1,38 @@
package cephfs
import (
"testing"
)
func init() {
}
func TestMountOneCacheEntry(t *testing.T) {
}
func TestRemountHisMountedPath(t *testing.T) {
}
func TestNodeStageVolume(t *testing.T) {
}
func TestNodeUnStageVolume(t *testing.T) {
}
func TestNodePublishVolume(t *testing.T) {
}
func TestNodeUnpublishVolume(t *testing.T) {
}
func TestEncodeDecodeCredentials(t *testing.T) {
secrets := make(map[string]string)
secrets["user_1"] = "value_1"
enSecrets := encodeCredentials(secrets)
deSecrets := decodeCredentials(enSecrets)
for key, value := range secrets {
if deSecrets[key] != value {
t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key])
}
}
}

View File

@ -154,6 +154,9 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ
klog.Errorf("failed to mount volume %s: %v", volID, err)
return status.Error(codes.Internal, err.Error())
}
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil {
klog.Warningf("mount-cache: failed stage volume %s %s: %v", volID, stagingTargetPath, err)
}
return nil
}
@ -195,6 +198,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.Internal, err.Error())
}
if err := volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil {
klog.Warningf("mount-cache: failed publish volume %s %s: %v", volID, targetPath, err)
}
klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
@ -209,6 +216,11 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil {
klog.Warningf("mount-cache: failed unpublish volume %s %s: %v", volID, targetPath, err)
}
// Unmount the bind-mount
if err = unmountVolume(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
@ -232,6 +244,11 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
stagingTargetPath := req.GetStagingTargetPath()
volID := req.GetVolumeId()
if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil {
klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err)
}
// Unmount the volume
if err = unmountVolume(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())

View File

@ -56,6 +56,7 @@ func NewCachePersister(metadataStore, driverName string) (CachePersister, error)
klog.Infof("cache-persister: using node as metadata cache persister")
nc := &NodeCache{}
nc.BasePath = PluginFolder + "/" + driverName
nc.CacheDir = "controller"
return nc, nil
}
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")

View File

@ -32,10 +32,9 @@ import (
// NodeCache to store metadata
type NodeCache struct {
BasePath string
CacheDir string
}
var cacheDir = "controller"
var errDec = errors.New("file not found")
// EnsureCacheDirectory creates cache directory if not present
@ -52,15 +51,15 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
//ForAll list the metadata in Nodecache and filters outs based on the pattern
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
err := nc.EnsureCacheDirectory(cacheDir)
err := nc.EnsureCacheDirectory(nc.CacheDir)
if err != nil {
return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists")
}
files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir))
files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir))
if err != nil {
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
}
path := path.Join(nc.BasePath, cacheDir)
path := path.Join(nc.BasePath, nc.CacheDir)
for _, file := range files {
err = decodeObj(path, pattern, file, destObj)
if err == errDec {
@ -102,9 +101,23 @@ func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{})
}
func (nc *NodeCache) Update(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
identifierTmp := identifier + ".creating"
fileTmp := path.Join(nc.BasePath, nc.CacheDir, identifierTmp+".json")
os.Remove(fileTmp)
if err := nc.Create(identifierTmp, data); err != nil {
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
}
if err := os.Rename(fileTmp, file); err != nil {
return errors.Wrapf(err, "node-cache: couldn't rename %s as %s", fileTmp, file)
}
return nil
}
// Create creates the metadata file in cache directory with identifier name
func (nc *NodeCache) Create(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
fp, err := os.Create(file)
if err != nil {
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
@ -126,7 +139,7 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error {
// Get retrieves the metadata from cache directory with identifier name
func (nc *NodeCache) Get(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
// #nosec
fp, err := os.Open(file)
if err != nil {
@ -153,7 +166,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error {
// Delete deletes the metadata file from cache directory with identifier name
func (nc *NodeCache) Delete(identifier string) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
err := os.Remove(file)
if err != nil {
if err == os.ErrNotExist {