mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 18:43:34 +00:00
Remove mount cache for cephfs
PR #282 introduces the mount cache to solve cephfs fuse mount issue when cephfs plugin pod restarts .This is not working as intended. This PR removes the code for maintainability. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
committed by
mergify[bot]
parent
669dc4536f
commit
034b123478
@ -115,14 +115,6 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
// Update namespace for storing keys into a specific namespace on RADOS, in the CephFS
|
||||
// metadata pool
|
||||
volJournal.SetNamespace(radosNamespace)
|
||||
|
||||
initVolumeMountCache(conf.DriverName, conf.MountCacheDir)
|
||||
if conf.MountCacheDir != "" {
|
||||
if err := remountCachedVolumes(); err != nil {
|
||||
klog.Warningf("failed to remount cached volumes: %v", err)
|
||||
// ignore remount fail
|
||||
}
|
||||
}
|
||||
// Initialize default library driver
|
||||
|
||||
fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID)
|
||||
|
@ -1,330 +0,0 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type volumeMountCacheEntry struct {
|
||||
DriverVersion string `json:"driverVersion"`
|
||||
|
||||
VolumeID string `json:"volumeID"`
|
||||
Mounter string `json:"mounter"`
|
||||
Secrets map[string]string `json:"secrets"`
|
||||
StagingPath string `json:"stagingPath"`
|
||||
TargetPaths map[string]bool `json:"targetPaths"`
|
||||
CreateTime time.Time `json:"createTime"`
|
||||
}
|
||||
|
||||
type volumeMountCacheMap struct {
|
||||
volumes map[string]volumeMountCacheEntry
|
||||
nodeCacheStore util.NodeCache
|
||||
}
|
||||
|
||||
var (
|
||||
volumeMountCachePrefix = "cephfs-mount-cache-"
|
||||
volumeMountCache volumeMountCacheMap
|
||||
volumeMountCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
func initVolumeMountCache(driverName, mountCacheDir string) {
|
||||
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
|
||||
|
||||
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
|
||||
volumeMountCache.nodeCacheStore.CacheDir = driverName
|
||||
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, util.DriverVersion, mountCacheDir)
|
||||
}
|
||||
|
||||
func remountCachedVolumes() error {
|
||||
if err := util.CreateMountPoint(volumeMountCache.nodeCacheStore.BasePath); err != nil {
|
||||
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
|
||||
return err
|
||||
}
|
||||
var remountFailCount, remountSuccCount int64
|
||||
me := &volumeMountCacheEntry{}
|
||||
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
volID := me.VolumeID
|
||||
if volOpts, vid, err := newVolumeOptionsFromVolID(context.TODO(), me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
|
||||
if err, ok := err.(util.ErrKeyNotFound); ok {
|
||||
klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// update Mounter from mount cache
|
||||
volOpts.Mounter = me.Mounter
|
||||
if err := mountOneCacheEntry(volOpts, vid, me); err == nil {
|
||||
remountSuccCount++
|
||||
volumeMountCache.volumes[me.VolumeID] = *me
|
||||
klog.Infof("mount-cache: successfully remounted volume %s", volID)
|
||||
} else {
|
||||
remountFailCount++
|
||||
klog.Errorf("mount-cache: failed to remount volume %s", volID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: metastore list cache fail %v", err)
|
||||
return err
|
||||
}
|
||||
if remountFailCount > 0 {
|
||||
klog.Infof("mount-cache: successfully remounted %d volumes, failed to remount %d volumes", remountSuccCount, remountFailCount)
|
||||
} else {
|
||||
klog.Infof("mount-cache: successfully remounted %d volumes", remountSuccCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error {
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
var (
|
||||
err error
|
||||
cr *util.Credentials
|
||||
)
|
||||
volID := vid.VolumeID
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
cr, err = util.NewAdminCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
volOptions.RootPath, err = getVolumeRootPathCeph(context.TODO(), volOptions, cr, volumeID(vid.FsSubvolName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
cr, err = util.NewUserCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
}
|
||||
|
||||
err = cleanupMountPoint(me.StagingPath)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
|
||||
return err
|
||||
}
|
||||
|
||||
isMnt, err := util.IsMountPoint(me.StagingPath)
|
||||
if err != nil {
|
||||
isMnt = false
|
||||
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
|
||||
}
|
||||
|
||||
if !isMnt {
|
||||
m, err := newMounter(volOptions)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
if err := m.mount(context.TODO(), me.StagingPath, cr, volOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
mountOptions := []string{"bind"}
|
||||
for targetPath, readOnly := range me.TargetPaths {
|
||||
if err := cleanupMountPoint(targetPath); err == nil {
|
||||
if err := bindMount(context.TODO(), me.StagingPath, targetPath, readOnly, mountOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
|
||||
volID, me.StagingPath, targetPath, readOnly, err)
|
||||
} else {
|
||||
klog.Infof("mount-cache: successfully bind-mounted volume %s: %s %s %v",
|
||||
volID, me.StagingPath, targetPath, readOnly)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupMountPoint(mountPoint string) error {
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
if isCorruptedMnt(err) {
|
||||
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
|
||||
err := execCommandErr(context.TODO(), "umount", mountPoint)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err)
|
||||
// ignore error return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
klog.Errorf("mount-cache: failed to stat mount point %s %v", mountPoint, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isCorruptedMnt(err error) bool {
|
||||
var underlyingError error
|
||||
switch pe := err.(type) {
|
||||
case nil:
|
||||
return false
|
||||
case *os.PathError:
|
||||
underlyingError = pe.Err
|
||||
case *os.LinkError:
|
||||
underlyingError = pe.Err
|
||||
case *os.SyscallError:
|
||||
underlyingError = pe.Err
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
CorruptedErrors := []error{
|
||||
syscall.ENOTCONN, syscall.ESTALE, syscall.EIO, syscall.EACCES}
|
||||
|
||||
for _, v := range CorruptedErrors {
|
||||
if underlyingError == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func genVolumeMountCacheFileName(volID string) string {
|
||||
cachePath := volumeMountCachePrefix + volID
|
||||
return cachePath
|
||||
}
|
||||
func (mc *volumeMountCacheMap) isEnable() bool {
|
||||
// if mount cache dir unset, disable state
|
||||
return mc.nodeCacheStore.BasePath != ""
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(ctx context.Context, volID, stagingTargetPath, mounter string, secrets map[string]string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
lastTargetPaths := make(map[string]bool)
|
||||
me, ok := volumeMountCache.volumes[volID]
|
||||
if ok {
|
||||
if me.StagingPath == stagingTargetPath {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: node unexpected restage volume for volume %s"), volID)
|
||||
return nil
|
||||
}
|
||||
lastTargetPaths = me.TargetPaths
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: node stage volume ignore last cache entry for volume %s"), volID)
|
||||
}
|
||||
|
||||
me = volumeMountCacheEntry{DriverVersion: util.DriverVersion}
|
||||
|
||||
me.VolumeID = volID
|
||||
me.Secrets = encodeCredentials(secrets)
|
||||
me.StagingPath = stagingTargetPath
|
||||
me.TargetPaths = lastTargetPaths
|
||||
me.Mounter = mounter
|
||||
|
||||
me.CreateTime = time.Now()
|
||||
volumeMountCache.volumes[volID] = me
|
||||
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
delete(volumeMountCache.volumes, volID)
|
||||
return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID))
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodePublishVolume(ctx context.Context, volID, targetPath string, readOnly bool) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
|
||||
}
|
||||
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
|
||||
return mc.updateNodeCache(ctx, volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnPublishVolume(ctx context.Context, volID, targetPath string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
|
||||
}
|
||||
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
|
||||
return mc.updateNodeCache(ctx, volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) updateNodeCache(ctx context.Context, volID string) error {
|
||||
me := volumeMountCache.volumes[volID]
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof(util.Log(ctx, "mount-cache: metadata not found, delete mount cache failed for volume %s"), volID)
|
||||
}
|
||||
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
||||
}
|
||||
|
||||
func encodeCredentials(input map[string]string) (output map[string]string) {
|
||||
output = make(map[string]string)
|
||||
for key, value := range input {
|
||||
nKey := base64.StdEncoding.EncodeToString([]byte(key))
|
||||
nValue := base64.StdEncoding.EncodeToString([]byte(value))
|
||||
output[nKey] = nValue
|
||||
}
|
||||
return output
|
||||
}
|
||||
|
||||
func decodeCredentials(input map[string]string) (output map[string]string) {
|
||||
output = make(map[string]string)
|
||||
for key, value := range input {
|
||||
nKey, err := base64.StdEncoding.DecodeString(key)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: decode secret fail")
|
||||
continue
|
||||
}
|
||||
nValue, err := base64.StdEncoding.DecodeString(value)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: decode secret fail")
|
||||
continue
|
||||
}
|
||||
output[string(nKey)] = string(nValue)
|
||||
}
|
||||
return output
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
func TestMountOneCacheEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemountHisMountedPath(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeStageVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeUnStageVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodePublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeUnpublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEncodeDecodeCredentials(t *testing.T) {
|
||||
secrets := make(map[string]string)
|
||||
secrets["user_1"] = "value_1"
|
||||
enSecrets := encodeCredentials(secrets)
|
||||
deSecrets := decodeCredentials(enSecrets)
|
||||
for key, value := range secrets {
|
||||
if deSecrets[key] != value {
|
||||
t.Errorf("key %s of credentials's value %s change after decode %s ", key, value, deSecrets[key])
|
||||
}
|
||||
}
|
||||
}
|
@ -154,9 +154,6 @@ func (*NodeServer) mount(ctx context.Context, volOptions *volumeOptions, req *cs
|
||||
klog.Errorf(util.Log(ctx, "failed to mount volume %s: %v"), volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if err := volumeMountCache.nodeStageVolume(ctx, req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to stage volume %s %s: %v"), volID, stagingTargetPath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -209,10 +206,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = volumeMountCache.nodePublishVolume(ctx, volID, targetPath, req.GetReadonly()); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to publish volume %s %s: %v"), volID, targetPath, err)
|
||||
}
|
||||
|
||||
klog.Infof(util.Log(ctx, "cephfs: successfully bind-mounted volume %s to %s"), volID, targetPath)
|
||||
|
||||
// #nosec - allow anyone to write inside the target path
|
||||
@ -241,10 +234,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the bind-mount
|
||||
if err = unmountVolume(ctx, targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -275,11 +264,6 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the volume
|
||||
if err = unmountVolume(ctx, stagingTargetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
Reference in New Issue
Block a user