diff --git a/cmd/cephfs/main.go b/cmd/cephfs/main.go index e0972f17f..a4d01a5bc 100644 --- a/cmd/cephfs/main.go +++ b/cmd/cephfs/main.go @@ -50,7 +50,6 @@ func main() { } //update plugin name cephfs.PluginFolder = cephfs.PluginFolder + *driverName - cephfs.MountCacheDir = *mountCacheDir cp, err := util.CreatePersistanceStorage(cephfs.PluginFolder, *metadataStorage, *driverName) if err != nil { @@ -58,7 +57,7 @@ func main() { } driver := cephfs.NewDriver() - driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp) + driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp) os.Exit(0) } diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index ee7b446b8..7c5dfc61e 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -77,7 +77,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer { // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) { +func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) { klog.Infof("Driver: %v version: %v", driverName, version) // Configuration @@ -105,9 +105,12 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP klog.Fatalf("failed to write ceph configuration file: %v", err) } - if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil { - klog.Warningf("failed to remounted history mounted path: %v", err) - //ignore remount fail + initVolumeMountCache(driverName, mountCacheDir, cachePersister) + if mountCacheDir != "" { + if err := remountCachedVolumes(); err != nil { + klog.Warningf("failed to remount cached volumes: %v", err) + //ignore remount fail + } } // Initialize default library driver diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index dd7e89cb0..64daf9a01 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -12,82 +12,66 @@ import ( "k8s.io/klog" ) -type volumeMountEntry struct { - NodeID string `json:"nodeID"` +type volumeMountCacheEntry struct { DriverName string `json:"driverName"` DriverVersion string `json:"driverVersion"` - Namespace string `json:"namespace"` - VolumeID string `json:"volumeID"` Secrets map[string]string `json:"secrets"` StagingPath string `json:"stagingPath"` TargetPaths map[string]bool `json:"targetPaths"` CreateTime time.Time `json:"createTime"` LastMountTime time.Time `json:"lastMountTime"` - LoadCount uint64 `json:"loadCount"` } type volumeMountCacheMap struct { - DriverName string - DriverVersion string - NodeID string - MountFailNum int64 - MountSuccNum int64 - Volumes map[string]volumeMountEntry - NodeCacheStore util.NodeCache - MetadataStore util.CachePersister + driverName string + volumes map[string]volumeMountCacheEntry + nodeCacheStore util.NodeCache + metadataStore util.CachePersister } var ( - MountCacheDir = "" volumeMountCachePrefix = "cephfs-mount-cache-" volumeMountCache volumeMountCacheMap volumeMountCacheMtx sync.Mutex ) -func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error { - volumeMountCache.Volumes = make(map[string]volumeMountEntry) - volumeMountCache.NodeID = nodeID - volumeMountCache.DriverName = name - volumeMountCache.DriverVersion = v - volumeMountCache.MountSuccNum = 0 - volumeMountCache.MountFailNum = 0 +func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) { + volumeMountCache.volumes = make(map[string]volumeMountCacheEntry) - volumeMountCache.MetadataStore = cachePersister + volumeMountCache.driverName = driverName + volumeMountCache.metadataStore = cachePersister + volumeMountCache.nodeCacheStore.BasePath = mountCacheDir + volumeMountCache.nodeCacheStore.CacheDir = "" + klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir) +} - volumeMountCache.NodeCacheStore.BasePath = MountCacheDir - volumeMountCache.NodeCacheStore.CacheDir = "" - - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount - klog.Infof("mount-cache: mountcachedir no define disalbe mount cache.") - return nil - } - - klog.Infof("mount-cache: MountCacheDir: %s", MountCacheDir) - if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil { - klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) +func remountCachedVolumes() error { + if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil { + klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err) return err } - me := &volumeMountEntry{} + var remountFailCount, remountSuccCount int64 + me := &volumeMountCacheEntry{} ce := &controllerCacheEntry{} - err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { + err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { volID := me.VolumeID - klog.Infof("mount-cache: load %v", me) - if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil { + if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) - if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + klog.Infof("mount-cache: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID) } } } else { if err := mountOneCacheEntry(ce, me); err == nil { - volumeMountCache.MountSuccNum++ - volumeMountCache.Volumes[me.VolumeID] = *me + remountSuccCount++ + volumeMountCache.volumes[me.VolumeID] = *me + klog.Infof("mount-cache: remount volume %s succ", volID) } else { - volumeMountCache.MountFailNum++ + remountFailCount++ + klog.Infof("mount-cache: remount volume cache %s fail", volID) } } return nil @@ -96,33 +80,42 @@ func remountHisMountedPath(name string, v string, nodeID string, cachePersister klog.Infof("mount-cache: metastore list cache fail %v", err) return err } - if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum { - return errors.New("mount-cache: too many volumes mount fail") + if remountFailCount > 0 { + klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount) + } else { + klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount) } - klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum) return nil } -func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { +func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error { volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - var err error + var ( + err error + cr *credentials + ) volID := ce.VolumeID volOptions := ce.VolOptions - adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets)) - if err != nil { - return err - } - entity, err := getCephUser(&volOptions, adminCr, volID) - if err != nil { - klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath) - } - cr := entity.toCredentials() - if volOptions.ProvisionVolume { volOptions.RootPath = getVolumeRootPathCeph(volID) + cr, err = getAdminCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } + var entity *cephEntity + entity, err = getCephUser(&volOptions, cr, volID) + if err != nil { + return err + } + cr = entity.toCredentials() + } else { + cr, err = getUserCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } } err = cleanupMountPoint(me.StagingPath) @@ -164,7 +157,7 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { func cleanupMountPoint(mountPoint string) error { if _, err := os.Stat(mountPoint); err != nil { - if IsCorruptedMnt(err) { + if isCorruptedMnt(err) { klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) err := execCommandErr("umount", mountPoint) if err != nil { @@ -180,7 +173,7 @@ func cleanupMountPoint(mountPoint string) error { return nil } -func IsCorruptedMnt(err error) bool { +func isCorruptedMnt(err error) bool { if err == nil { return false } @@ -203,17 +196,20 @@ func genVolumeMountCacheFileName(volID string) string { cachePath := volumeMountCachePrefix + volID return cachePath } +func (mc *volumeMountCacheMap) isEnable() bool { + //if mount cache dir unset, disable state + return mc.nodeCacheStore.BasePath != "" +} func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() lastTargetPaths := make(map[string]bool) - me, ok := volumeMountCache.Volumes[volID] + me, ok := volumeMountCache.volumes[volID] if ok { if me.StagingPath == stagingTargetPath { klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID) @@ -223,82 +219,70 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) } - me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion} + me = volumeMountCacheEntry{DriverName: mc.driverName, DriverVersion: version} me.VolumeID = volID me.Secrets = encodeCredentials(secrets) me.StagingPath = stagingTargetPath me.TargetPaths = lastTargetPaths - curTime := time.Now() - me.CreateTime = curTime - me.CreateTime = curTime - me.LoadCount = 0 - volumeMountCache.Volumes[volID] = me - if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err) + me.CreateTime = time.Now() + volumeMountCache.volumes[volID] = me + if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { return err } - klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me) return nil } -func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount +func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error { + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - delete(volumeMountCache.Volumes, volID) - if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { - klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err) + delete(volumeMountCache.volumes, volID) + if err := mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { return err } return nil } func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - _, ok := volumeMountCache.Volumes[volID] + _, ok := volumeMountCache.volumes[volID] if !ok { - klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID) return errors.New("mount-cache: node publish volume failed to find cache entry for volume") } - volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly + volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly return mc.updateNodeCache(volID) } func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error { - if len(MountCacheDir) == 0 { - //if mount cache dir unset, disable remount + if !mc.isEnable() { return nil } volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() - _, ok := volumeMountCache.Volumes[volID] + _, ok := volumeMountCache.volumes[volID] if !ok { - klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID) return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") } - delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath) + delete(volumeMountCache.volumes[volID].TargetPaths, targetPath) return mc.updateNodeCache(volID) } func (mc *volumeMountCacheMap) updateNodeCache(volID string) error { - me := volumeMountCache.Volumes[volID] - if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + me := volumeMountCache.volumes[volID] + if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID) } - if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { - klog.Errorf("mount-cache: mount cache failed to update for volume %s: %v", volID, err) + if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { return err } return nil diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 345e4904d..56d909ba9 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -245,7 +245,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag stagingTargetPath := req.GetStagingTargetPath() volID := req.GetVolumeId() - if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil { + if err = volumeMountCache.nodeUnStageVolume(volID); err != nil { klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err) }