/* Copyright 2019 The Ceph-CSI Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cephfs import ( "context" "encoding/base64" "os" "sync" "syscall" "time" "github.com/ceph/ceph-csi/pkg/util" "github.com/pkg/errors" "k8s.io/klog" ) type volumeMountCacheEntry struct { DriverVersion string `json:"driverVersion"` VolumeID string `json:"volumeID"` Mounter string `json:"mounter"` Secrets map[string]string `json:"secrets"` StagingPath string `json:"stagingPath"` TargetPaths map[string]bool `json:"targetPaths"` CreateTime time.Time `json:"createTime"` } type volumeMountCacheMap struct { volumes map[string]volumeMountCacheEntry nodeCacheStore util.NodeCache } var ( volumeMountCachePrefix = "cephfs-mount-cache-" volumeMountCache volumeMountCacheMap volumeMountCacheMtx sync.Mutex ) func initVolumeMountCache(driverName, mountCacheDir string) { volumeMountCache.volumes = make(map[string]volumeMountCacheEntry) volumeMountCache.nodeCacheStore.BasePath = mountCacheDir volumeMountCache.nodeCacheStore.CacheDir = driverName klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, util.DriverVersion, mountCacheDir) } func remountCachedVolumes() error { if err := util.CreateMountPoint(volumeMountCache.nodeCacheStore.BasePath); err != nil { klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err) return err } var remountFailCount, remountSuccCount int64 me := &volumeMountCacheEntry{} err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { volID := me.VolumeID if volOpts, vid, err := newVolumeOptionsFromVolID(context.TODO(), me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil { if err, ok := err.(util.ErrKeyNotFound); ok { klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err) if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID) } } } else { // update Mounter from mount cache volOpts.Mounter = me.Mounter if err := mountOneCacheEntry(volOpts, vid, me); err == nil { remountSuccCount++ volumeMountCache.volumes[me.VolumeID] = *me klog.Infof("mount-cache: successfully remounted volume %s", volID) } else { remountFailCount++ klog.Errorf("mount-cache: failed to remount volume %s", volID) } } return nil }) if err != nil { klog.Infof("mount-cache: metastore list cache fail %v", err) return err } if remountFailCount > 0 { klog.Infof("mount-cache: successfully remounted %d volumes, failed to remount %d volumes", remountSuccCount, remountFailCount) } else { klog.Infof("mount-cache: successfully remounted %d volumes", remountSuccCount) } return nil } func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error { volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() var ( err error cr *util.Credentials ) volID := vid.VolumeID if volOptions.ProvisionVolume { cr, err = util.NewAdminCredentials(decodeCredentials(me.Secrets)) if err != nil { return err } defer cr.DeleteCredentials() volOptions.RootPath, err = getVolumeRootPathCeph(context.TODO(), volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { return err } } else { cr, err = util.NewUserCredentials(decodeCredentials(me.Secrets)) if err != nil { return err } defer cr.DeleteCredentials() } err = cleanupMountPoint(me.StagingPath) if err != nil { klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err) return err } isMnt, err := util.IsMountPoint(me.StagingPath) if err != nil { isMnt = false klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err) } if !isMnt { m, err := newMounter(volOptions) if err != nil { klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err) return err } if err := m.mount(context.TODO(), me.StagingPath, cr, volOptions); err != nil { klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err) return err } } mountOptions := []string{"bind"} for targetPath, readOnly := range me.TargetPaths { if err := cleanupMountPoint(targetPath); err == nil { if err := bindMount(context.TODO(), me.StagingPath, targetPath, readOnly, mountOptions); err != nil { klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", volID, me.StagingPath, targetPath, readOnly, err) } else { klog.Infof("mount-cache: successfully bind-mounted volume %s: %s %s %v", volID, me.StagingPath, targetPath, readOnly) } } } return nil } func cleanupMountPoint(mountPoint string) error { if _, err := os.Stat(mountPoint); err != nil { if isCorruptedMnt(err) { klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) err := execCommandErr(context.TODO(), "umount", mountPoint) if err != nil { klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err) // ignore error return err } } } if _, err := os.Stat(mountPoint); err != nil { klog.Errorf("mount-cache: failed to stat mount point %s %v", mountPoint, err) return err } return nil } func isCorruptedMnt(err error) bool { var underlyingError error switch pe := err.(type) { case nil: return false case *os.PathError: underlyingError = pe.Err case *os.LinkError: underlyingError = pe.Err case *os.SyscallError: underlyingError = pe.Err default: return false } CorruptedErrors := []error{ syscall.ENOTCONN, syscall.ESTALE, syscall.EIO, syscall.EACCES} for _, v := range CorruptedErrors { if underlyingError == v { return true } } return false } func genVolumeMountCacheFileName(volID string) string { cachePath := volumeMountCachePrefix + volID return cachePath } func (mc *volumeMountCacheMap) isEnable() bool { // if mount cache dir unset, disable state return mc.nodeCacheStore.BasePath != "" } func (mc *volumeMountCacheMap) nodeStageVolume(ctx context.Context, volID, stagingTargetPath, mounter string, secrets map[string]string) error { if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() lastTargetPaths := make(map[string]bool) me, ok := volumeMountCache.volumes[volID] if ok { if me.StagingPath == stagingTargetPath { klog.Warningf(util.Log(ctx, "mount-cache: node unexpected restage volume for volume %s"), volID) return nil } lastTargetPaths = me.TargetPaths klog.Warningf(util.Log(ctx, "mount-cache: node stage volume ignore last cache entry for volume %s"), volID) } me = volumeMountCacheEntry{DriverVersion: util.DriverVersion} me.VolumeID = volID me.Secrets = encodeCredentials(secrets) me.StagingPath = stagingTargetPath me.TargetPaths = lastTargetPaths me.Mounter = mounter me.CreateTime = time.Now() volumeMountCache.volumes[volID] = me return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() delete(volumeMountCache.volumes, volID) return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)) } func (mc *volumeMountCacheMap) nodePublishVolume(ctx context.Context, volID, targetPath string, readOnly bool) error { if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() _, ok := volumeMountCache.volumes[volID] if !ok { return errors.New("mount-cache: node publish volume failed to find cache entry for volume") } volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly return mc.updateNodeCache(ctx, volID) } func (mc *volumeMountCacheMap) nodeUnPublishVolume(ctx context.Context, volID, targetPath string) error { if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() _, ok := volumeMountCache.volumes[volID] if !ok { return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") } delete(volumeMountCache.volumes[volID].TargetPaths, targetPath) return mc.updateNodeCache(ctx, volID) } func (mc *volumeMountCacheMap) updateNodeCache(ctx context.Context, volID string) error { me := volumeMountCache.volumes[volID] if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof(util.Log(ctx, "mount-cache: metadata not found, delete mount cache failed for volume %s"), volID) } return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me) } func encodeCredentials(input map[string]string) (output map[string]string) { output = make(map[string]string) for key, value := range input { nKey := base64.StdEncoding.EncodeToString([]byte(key)) nValue := base64.StdEncoding.EncodeToString([]byte(value)) output[nKey] = nValue } return output } func decodeCredentials(input map[string]string) (output map[string]string) { output = make(map[string]string) for key, value := range input { nKey, err := base64.StdEncoding.DecodeString(key) if err != nil { klog.Errorf("mount-cache: decode secret fail") continue } nValue, err := base64.StdEncoding.DecodeString(value) if err != nil { klog.Errorf("mount-cache: decode secret fail") continue } output[string(nKey)] = string(nValue) } return output }