mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-10 22:09:30 +00:00
f4c80dec9a
in NodeStage RPC call we have to map the device to the node plugin and make sure the the device will be mounted to the global path in nodeUnstage request unmount the device from global path and unmap the device if the volume mode is block we will be creating a file inside a stageTargetPath and it will be considered as the global path Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
319 lines
9.3 KiB
Go
319 lines
9.3 KiB
Go
package cephfs
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"os"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/ceph/ceph-csi/pkg/util"
|
|
"github.com/pkg/errors"
|
|
"k8s.io/klog"
|
|
)
|
|
|
|
type volumeMountCacheEntry struct {
|
|
DriverVersion string `json:"driverVersion"`
|
|
|
|
VolumeID string `json:"volumeID"`
|
|
Mounter string `json:"mounter"`
|
|
Secrets map[string]string `json:"secrets"`
|
|
StagingPath string `json:"stagingPath"`
|
|
TargetPaths map[string]bool `json:"targetPaths"`
|
|
CreateTime time.Time `json:"createTime"`
|
|
}
|
|
|
|
type volumeMountCacheMap struct {
|
|
volumes map[string]volumeMountCacheEntry
|
|
nodeCacheStore util.NodeCache
|
|
}
|
|
|
|
var (
|
|
volumeMountCachePrefix = "cephfs-mount-cache-"
|
|
volumeMountCache volumeMountCacheMap
|
|
volumeMountCacheMtx sync.Mutex
|
|
)
|
|
|
|
func initVolumeMountCache(driverName, mountCacheDir string) {
|
|
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
|
|
|
|
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
|
|
volumeMountCache.nodeCacheStore.CacheDir = driverName
|
|
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, util.DriverVersion, mountCacheDir)
|
|
}
|
|
|
|
func remountCachedVolumes() error {
|
|
if err := util.CreateMountPoint(volumeMountCache.nodeCacheStore.BasePath); err != nil {
|
|
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
|
|
return err
|
|
}
|
|
var remountFailCount, remountSuccCount int64
|
|
me := &volumeMountCacheEntry{}
|
|
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
|
volID := me.VolumeID
|
|
if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
|
|
if err, ok := err.(util.ErrKeyNotFound); ok {
|
|
klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
|
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
|
klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID)
|
|
}
|
|
}
|
|
} else {
|
|
// update Mounter from mount cache
|
|
volOpts.Mounter = me.Mounter
|
|
if err := mountOneCacheEntry(volOpts, vid, me); err == nil {
|
|
remountSuccCount++
|
|
volumeMountCache.volumes[me.VolumeID] = *me
|
|
klog.Infof("mount-cache: successfully remounted volume %s", volID)
|
|
} else {
|
|
remountFailCount++
|
|
klog.Errorf("mount-cache: failed to remount volume %s", volID)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
klog.Infof("mount-cache: metastore list cache fail %v", err)
|
|
return err
|
|
}
|
|
if remountFailCount > 0 {
|
|
klog.Infof("mount-cache: successfully remounted %d volumes, failed to remount %d volumes", remountSuccCount, remountFailCount)
|
|
} else {
|
|
klog.Infof("mount-cache: successfully remounted %d volumes", remountSuccCount)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error {
|
|
volumeMountCacheMtx.Lock()
|
|
defer volumeMountCacheMtx.Unlock()
|
|
|
|
var (
|
|
err error
|
|
cr *util.Credentials
|
|
)
|
|
volID := vid.VolumeID
|
|
|
|
if volOptions.ProvisionVolume {
|
|
cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var entity *cephEntity
|
|
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cr = entity.toCredentials()
|
|
} else {
|
|
cr, err = util.GetUserCredentials(decodeCredentials(me.Secrets))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = cleanupMountPoint(me.StagingPath)
|
|
if err != nil {
|
|
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
|
|
return err
|
|
}
|
|
|
|
isMnt, err := util.IsMountPoint(me.StagingPath)
|
|
if err != nil {
|
|
isMnt = false
|
|
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
|
|
}
|
|
|
|
if !isMnt {
|
|
m, err := newMounter(volOptions)
|
|
if err != nil {
|
|
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
|
|
return err
|
|
}
|
|
if err := m.mount(me.StagingPath, cr, volOptions); err != nil {
|
|
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
mountOptions := []string{"bind"}
|
|
for targetPath, readOnly := range me.TargetPaths {
|
|
if err := cleanupMountPoint(targetPath); err == nil {
|
|
if err := bindMount(me.StagingPath, targetPath, readOnly, mountOptions); err != nil {
|
|
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
|
|
volID, me.StagingPath, targetPath, readOnly, err)
|
|
} else {
|
|
klog.Infof("mount-cache: successfully bind-mounted volume %s: %s %s %v",
|
|
volID, me.StagingPath, targetPath, readOnly)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func cleanupMountPoint(mountPoint string) error {
|
|
if _, err := os.Stat(mountPoint); err != nil {
|
|
if isCorruptedMnt(err) {
|
|
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
|
|
err := execCommandErr("umount", mountPoint)
|
|
if err != nil {
|
|
klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err)
|
|
// ignore error return err
|
|
}
|
|
}
|
|
}
|
|
if _, err := os.Stat(mountPoint); err != nil {
|
|
klog.Errorf("mount-cache: failed to stat mount point %s %v", mountPoint, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func isCorruptedMnt(err error) bool {
|
|
var underlyingError error
|
|
switch pe := err.(type) {
|
|
case nil:
|
|
return false
|
|
case *os.PathError:
|
|
underlyingError = pe.Err
|
|
case *os.LinkError:
|
|
underlyingError = pe.Err
|
|
case *os.SyscallError:
|
|
underlyingError = pe.Err
|
|
default:
|
|
return false
|
|
}
|
|
|
|
CorruptedErrors := []error{
|
|
syscall.ENOTCONN, syscall.ESTALE, syscall.EIO, syscall.EACCES}
|
|
|
|
for _, v := range CorruptedErrors {
|
|
if underlyingError == v {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func genVolumeMountCacheFileName(volID string) string {
|
|
cachePath := volumeMountCachePrefix + volID
|
|
return cachePath
|
|
}
|
|
func (mc *volumeMountCacheMap) isEnable() bool {
|
|
// if mount cache dir unset, disable state
|
|
return mc.nodeCacheStore.BasePath != ""
|
|
}
|
|
|
|
func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error {
|
|
if !mc.isEnable() {
|
|
return nil
|
|
}
|
|
volumeMountCacheMtx.Lock()
|
|
defer volumeMountCacheMtx.Unlock()
|
|
|
|
lastTargetPaths := make(map[string]bool)
|
|
me, ok := volumeMountCache.volumes[volID]
|
|
if ok {
|
|
if me.StagingPath == stagingTargetPath {
|
|
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
|
|
return nil
|
|
}
|
|
lastTargetPaths = me.TargetPaths
|
|
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
|
|
}
|
|
|
|
me = volumeMountCacheEntry{DriverVersion: util.DriverVersion}
|
|
|
|
me.VolumeID = volID
|
|
me.Secrets = encodeCredentials(secrets)
|
|
me.StagingPath = stagingTargetPath
|
|
me.TargetPaths = lastTargetPaths
|
|
me.Mounter = mounter
|
|
|
|
me.CreateTime = time.Now()
|
|
volumeMountCache.volumes[volID] = me
|
|
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
|
}
|
|
|
|
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
|
|
if !mc.isEnable() {
|
|
return nil
|
|
}
|
|
volumeMountCacheMtx.Lock()
|
|
defer volumeMountCacheMtx.Unlock()
|
|
delete(volumeMountCache.volumes, volID)
|
|
return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID))
|
|
}
|
|
|
|
func (mc *volumeMountCacheMap) nodePublishVolume(volID, targetPath string, readOnly bool) error {
|
|
if !mc.isEnable() {
|
|
return nil
|
|
}
|
|
volumeMountCacheMtx.Lock()
|
|
defer volumeMountCacheMtx.Unlock()
|
|
|
|
_, ok := volumeMountCache.volumes[volID]
|
|
if !ok {
|
|
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
|
|
}
|
|
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
|
|
return mc.updateNodeCache(volID)
|
|
}
|
|
|
|
func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID, targetPath string) error {
|
|
if !mc.isEnable() {
|
|
return nil
|
|
}
|
|
volumeMountCacheMtx.Lock()
|
|
defer volumeMountCacheMtx.Unlock()
|
|
|
|
_, ok := volumeMountCache.volumes[volID]
|
|
if !ok {
|
|
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
|
|
}
|
|
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
|
|
return mc.updateNodeCache(volID)
|
|
}
|
|
|
|
func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
|
|
me := volumeMountCache.volumes[volID]
|
|
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
|
klog.Infof("mount-cache: metadata not found, delete mount cache failed for volume %s", volID)
|
|
}
|
|
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
|
}
|
|
|
|
func encodeCredentials(input map[string]string) (output map[string]string) {
|
|
output = make(map[string]string)
|
|
for key, value := range input {
|
|
nKey := base64.StdEncoding.EncodeToString([]byte(key))
|
|
nValue := base64.StdEncoding.EncodeToString([]byte(value))
|
|
output[nKey] = nValue
|
|
}
|
|
return output
|
|
}
|
|
|
|
func decodeCredentials(input map[string]string) (output map[string]string) {
|
|
output = make(map[string]string)
|
|
for key, value := range input {
|
|
nKey, err := base64.StdEncoding.DecodeString(key)
|
|
if err != nil {
|
|
klog.Errorf("mount-cache: decode secret fail")
|
|
continue
|
|
}
|
|
nValue, err := base64.StdEncoding.DecodeString(value)
|
|
if err != nil {
|
|
klog.Errorf("mount-cache: decode secret fail")
|
|
continue
|
|
}
|
|
output[string(nKey)] = string(nValue)
|
|
}
|
|
return output
|
|
}
|