mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-09 16:00:22 +00:00
fix code style
This commit is contained in:
parent
af330fe68e
commit
5b53e90ee4
@ -50,7 +50,6 @@ func main() {
|
||||
}
|
||||
//update plugin name
|
||||
cephfs.PluginFolder = cephfs.PluginFolder + *driverName
|
||||
cephfs.MountCacheDir = *mountCacheDir
|
||||
|
||||
cp, err := util.CreatePersistanceStorage(cephfs.PluginFolder, *metadataStorage, *driverName)
|
||||
if err != nil {
|
||||
@ -58,7 +57,7 @@ func main() {
|
||||
}
|
||||
|
||||
driver := cephfs.NewDriver()
|
||||
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp)
|
||||
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp)
|
||||
|
||||
os.Exit(0)
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// ceph CSI driver which can serve multiple parallel requests
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
klog.Infof("Driver: %v version: %v", driverName, version)
|
||||
|
||||
// Configuration
|
||||
@ -105,10 +105,13 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
|
||||
klog.Fatalf("failed to write ceph configuration file: %v", err)
|
||||
}
|
||||
|
||||
if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil {
|
||||
klog.Warningf("failed to remounted history mounted path: %v", err)
|
||||
initVolumeMountCache(driverName, mountCacheDir, cachePersister)
|
||||
if mountCacheDir != "" {
|
||||
if err := remountCachedVolumes(); err != nil {
|
||||
klog.Warningf("failed to remount cached volumes: %v", err)
|
||||
//ignore remount fail
|
||||
}
|
||||
}
|
||||
// Initialize default library driver
|
||||
|
||||
fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
|
||||
|
@ -12,82 +12,66 @@ import (
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type volumeMountEntry struct {
|
||||
NodeID string `json:"nodeID"`
|
||||
type volumeMountCacheEntry struct {
|
||||
DriverName string `json:"driverName"`
|
||||
DriverVersion string `json:"driverVersion"`
|
||||
|
||||
Namespace string `json:"namespace"`
|
||||
|
||||
VolumeID string `json:"volumeID"`
|
||||
Secrets map[string]string `json:"secrets"`
|
||||
StagingPath string `json:"stagingPath"`
|
||||
TargetPaths map[string]bool `json:"targetPaths"`
|
||||
CreateTime time.Time `json:"createTime"`
|
||||
LastMountTime time.Time `json:"lastMountTime"`
|
||||
LoadCount uint64 `json:"loadCount"`
|
||||
}
|
||||
|
||||
type volumeMountCacheMap struct {
|
||||
DriverName string
|
||||
DriverVersion string
|
||||
NodeID string
|
||||
MountFailNum int64
|
||||
MountSuccNum int64
|
||||
Volumes map[string]volumeMountEntry
|
||||
NodeCacheStore util.NodeCache
|
||||
MetadataStore util.CachePersister
|
||||
driverName string
|
||||
volumes map[string]volumeMountCacheEntry
|
||||
nodeCacheStore util.NodeCache
|
||||
metadataStore util.CachePersister
|
||||
}
|
||||
|
||||
var (
|
||||
MountCacheDir = ""
|
||||
volumeMountCachePrefix = "cephfs-mount-cache-"
|
||||
volumeMountCache volumeMountCacheMap
|
||||
volumeMountCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error {
|
||||
volumeMountCache.Volumes = make(map[string]volumeMountEntry)
|
||||
volumeMountCache.NodeID = nodeID
|
||||
volumeMountCache.DriverName = name
|
||||
volumeMountCache.DriverVersion = v
|
||||
volumeMountCache.MountSuccNum = 0
|
||||
volumeMountCache.MountFailNum = 0
|
||||
func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
|
||||
|
||||
volumeMountCache.MetadataStore = cachePersister
|
||||
volumeMountCache.driverName = driverName
|
||||
volumeMountCache.metadataStore = cachePersister
|
||||
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
|
||||
volumeMountCache.nodeCacheStore.CacheDir = ""
|
||||
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir)
|
||||
}
|
||||
|
||||
volumeMountCache.NodeCacheStore.BasePath = MountCacheDir
|
||||
volumeMountCache.NodeCacheStore.CacheDir = ""
|
||||
|
||||
if len(MountCacheDir) == 0 {
|
||||
//if mount cache dir unset, disable remount
|
||||
klog.Infof("mount-cache: mountcachedir no define disalbe mount cache.")
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.Infof("mount-cache: MountCacheDir: %s", MountCacheDir)
|
||||
if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil {
|
||||
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err)
|
||||
func remountCachedVolumes() error {
|
||||
if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil {
|
||||
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
|
||||
return err
|
||||
}
|
||||
me := &volumeMountEntry{}
|
||||
var remountFailCount, remountSuccCount int64
|
||||
me := &volumeMountCacheEntry{}
|
||||
ce := &controllerCacheEntry{}
|
||||
err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
volID := me.VolumeID
|
||||
klog.Infof("mount-cache: load %v", me)
|
||||
if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil {
|
||||
if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := mountOneCacheEntry(ce, me); err == nil {
|
||||
volumeMountCache.MountSuccNum++
|
||||
volumeMountCache.Volumes[me.VolumeID] = *me
|
||||
remountSuccCount++
|
||||
volumeMountCache.volumes[me.VolumeID] = *me
|
||||
klog.Infof("mount-cache: remount volume %s succ", volID)
|
||||
} else {
|
||||
volumeMountCache.MountFailNum++
|
||||
remountFailCount++
|
||||
klog.Infof("mount-cache: remount volume cache %s fail", volID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -96,33 +80,42 @@ func remountHisMountedPath(name string, v string, nodeID string, cachePersister
|
||||
klog.Infof("mount-cache: metastore list cache fail %v", err)
|
||||
return err
|
||||
}
|
||||
if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum {
|
||||
return errors.New("mount-cache: too many volumes mount fail")
|
||||
if remountFailCount > 0 {
|
||||
klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount)
|
||||
} else {
|
||||
klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount)
|
||||
}
|
||||
klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum)
|
||||
return nil
|
||||
}
|
||||
|
||||
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error {
|
||||
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error {
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
var err error
|
||||
var (
|
||||
err error
|
||||
cr *credentials
|
||||
)
|
||||
volID := ce.VolumeID
|
||||
volOptions := ce.VolOptions
|
||||
|
||||
adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets))
|
||||
if volOptions.ProvisionVolume {
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||
cr, err = getAdminCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
entity, err := getCephUser(&volOptions, adminCr, volID)
|
||||
var entity *cephEntity
|
||||
entity, err = getCephUser(&volOptions, cr, volID)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath)
|
||||
return err
|
||||
}
|
||||
cr = entity.toCredentials()
|
||||
} else {
|
||||
cr, err = getUserCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cr := entity.toCredentials()
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||
}
|
||||
|
||||
err = cleanupMountPoint(me.StagingPath)
|
||||
@ -164,7 +157,7 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error {
|
||||
|
||||
func cleanupMountPoint(mountPoint string) error {
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
if IsCorruptedMnt(err) {
|
||||
if isCorruptedMnt(err) {
|
||||
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
|
||||
err := execCommandErr("umount", mountPoint)
|
||||
if err != nil {
|
||||
@ -180,7 +173,7 @@ func cleanupMountPoint(mountPoint string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsCorruptedMnt(err error) bool {
|
||||
func isCorruptedMnt(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
@ -203,17 +196,20 @@ func genVolumeMountCacheFileName(volID string) string {
|
||||
cachePath := volumeMountCachePrefix + volID
|
||||
return cachePath
|
||||
}
|
||||
func (mc *volumeMountCacheMap) isEnable() bool {
|
||||
//if mount cache dir unset, disable state
|
||||
return mc.nodeCacheStore.BasePath != ""
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
|
||||
if len(MountCacheDir) == 0 {
|
||||
//if mount cache dir unset, disable remount
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
lastTargetPaths := make(map[string]bool)
|
||||
me, ok := volumeMountCache.Volumes[volID]
|
||||
me, ok := volumeMountCache.volumes[volID]
|
||||
if ok {
|
||||
if me.StagingPath == stagingTargetPath {
|
||||
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
|
||||
@ -223,82 +219,70 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s
|
||||
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
|
||||
}
|
||||
|
||||
me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion}
|
||||
me = volumeMountCacheEntry{DriverName: mc.driverName, DriverVersion: version}
|
||||
|
||||
me.VolumeID = volID
|
||||
me.Secrets = encodeCredentials(secrets)
|
||||
me.StagingPath = stagingTargetPath
|
||||
me.TargetPaths = lastTargetPaths
|
||||
|
||||
curTime := time.Now()
|
||||
me.CreateTime = curTime
|
||||
me.CreateTime = curTime
|
||||
me.LoadCount = 0
|
||||
volumeMountCache.Volumes[volID] = me
|
||||
if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
|
||||
klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err)
|
||||
me.CreateTime = time.Now()
|
||||
volumeMountCache.volumes[volID] = me
|
||||
if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error {
|
||||
if len(MountCacheDir) == 0 {
|
||||
//if mount cache dir unset, disable remount
|
||||
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
delete(volumeMountCache.Volumes, volID)
|
||||
if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil {
|
||||
klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err)
|
||||
delete(volumeMountCache.volumes, volID)
|
||||
if err := mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error {
|
||||
if len(MountCacheDir) == 0 {
|
||||
//if mount cache dir unset, disable remount
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.Volumes[volID]
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID)
|
||||
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
|
||||
}
|
||||
volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly
|
||||
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
|
||||
return mc.updateNodeCache(volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error {
|
||||
if len(MountCacheDir) == 0 {
|
||||
//if mount cache dir unset, disable remount
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.Volumes[volID]
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID)
|
||||
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
|
||||
}
|
||||
delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath)
|
||||
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
|
||||
return mc.updateNodeCache(volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
|
||||
me := volumeMountCache.Volumes[volID]
|
||||
if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
me := volumeMountCache.volumes[volID]
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID)
|
||||
}
|
||||
if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
|
||||
klog.Errorf("mount-cache: mount cache failed to update for volume %s: %v", volID, err)
|
||||
if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -245,7 +245,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil {
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
|
||||
klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user