diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index b2272e853..ee7b446b8 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -105,6 +105,10 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP klog.Fatalf("failed to write ceph configuration file: %v", err) } + if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil { + klog.Warningf("failed to remounted history mounted path: %v", err) + //ignore remount fail + } // Initialize default library driver fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go new file mode 100644 index 000000000..2f7a0a5c6 --- /dev/null +++ b/pkg/cephfs/mountcache.go @@ -0,0 +1,314 @@ +package cephfs + +import ( + "encoding/base64" + "os" + "sync" + "syscall" + "time" + + "github.com/ceph/ceph-csi/pkg/util" + "github.com/pkg/errors" + "k8s.io/klog" +) + +type volumeMountEntry struct { + NodeID string `json:"nodeID"` + DriverName string `json:"driverName"` + DriverVersion string `json:"driverVersion"` + + Namespace string `json:"namespace"` + + VolumeID string `json:"volumeID"` + Secrets map[string]string `json:"secrets"` + StagingPath string `json:"stagingPath"` + TargetPaths map[string]bool `json:"targetPaths"` + CreateTime time.Time `json:"createTime"` + LastMountTime time.Time `json:"lastMountTime"` + LoadCount uint64 `json:"loadCount"` +} + +type volumeMountCacheMap struct { + DriverName string + DriverVersion string + NodeID string + MountFailNum int64 + MountSuccNum int64 + Volumes map[string]volumeMountEntry + NodeCacheStore util.NodeCache + MetadataStore util.CachePersister +} + +var ( + csiPersistentVolumeRoot = "/var/lib/kubelet/plugins/kubernetes.io/csi" + volumeMountCachePrefix = "cephfs-mount-cache-" + volumeMountCache volumeMountCacheMap + volumeMountCacheMtx sync.Mutex +) + +func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error { + volumeMountCache.Volumes = make(map[string]volumeMountEntry) + volumeMountCache.NodeID = nodeID + volumeMountCache.DriverName = name + volumeMountCache.DriverVersion = v + volumeMountCache.MountSuccNum = 0 + volumeMountCache.MountFailNum = 0 + + volumeMountCache.MetadataStore = cachePersister + + volumeMountCache.NodeCacheStore.BasePath = PluginFolder + volumeMountCache.NodeCacheStore.CacheDir = "volumes-mount-cache" + + if _, err := os.Stat(csiPersistentVolumeRoot); err != nil { + klog.Infof("mount-cache: csi pv root path %s stat fail %v, may not in daemonset csi plugin, exit", csiPersistentVolumeRoot, err) + return err + } + + if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil { + klog.Fatalf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err) + return err + } + me := &volumeMountEntry{} + ce := &controllerCacheEntry{} + err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { + volID := me.VolumeID + klog.Infof("mount-cache: load %v", me) + if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil { + if err, ok := err.(*util.CacheEntryNotFound); ok { + klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err) + if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { + klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID) + } + } + } else { + if err := mountOneCacheEntry(ce, me); err == nil { + volumeMountCache.MountSuccNum++ + volumeMountCache.Volumes[me.VolumeID] = *me + } else { + volumeMountCache.MountFailNum++ + } + } + return nil + }) + if err != nil { + klog.Infof("mount-cache: metastore list cache fail %v", err) + return err + } + if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum { + return errors.New("mount-cache: too many volumes mount fail") + } + klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum) + return nil +} + +func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + var err error + volID := ce.VolumeID + volOptions := ce.VolOptions + + adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets)) + if err != nil { + return err + } + entity, err := getCephUser(&volOptions, adminCr, volID) + if err != nil { + klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath) + } + cr := entity.toCredentials() + + if volOptions.ProvisionVolume { + volOptions.RootPath = getVolumeRootPathCeph(volID) + } + + err = cleanupMountPoint(me.StagingPath) + if err != nil { + klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err) + return err + } + + isMnt, err := isMountPoint(me.StagingPath) + if err != nil { + isMnt = false + klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err) + } + + if !isMnt { + m, err := newMounter(&volOptions) + if err != nil { + klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err) + return err + } + if err := m.mount(me.StagingPath, cr, &volOptions); err != nil { + klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err) + return err + } + } + for targetPath, readOnly := range me.TargetPaths { + if err := cleanupMountPoint(targetPath); err == nil { + if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil { + klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v", + volID, me.StagingPath, targetPath, readOnly, err) + } else { + klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v", + volID, me.StagingPath, targetPath, readOnly) + } + } + } + return nil +} + +func cleanupMountPoint(mountPoint string) error { + if _, err := os.Stat(mountPoint); err != nil { + if IsCorruptedMnt(err) { + klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint) + err := execCommandErr("umount", mountPoint) + if err != nil { + klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err) + //ignore error return err + } + } + } + if _, err := os.Stat(mountPoint); err != nil { + klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err) + return err + } + return nil +} + +func IsCorruptedMnt(err error) bool { + if err == nil { + return false + } + var underlyingError error + switch pe := err.(type) { + case nil: + return false + case *os.PathError: + underlyingError = pe.Err + case *os.LinkError: + underlyingError = pe.Err + case *os.SyscallError: + underlyingError = pe.Err + } + + return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES +} + +func genVolumeMountCacheFileName(volID string) string { + cachePath := volumeMountCachePrefix + volID + return cachePath +} + +func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + lastTargetPaths := make(map[string]bool) + me, ok := volumeMountCache.Volumes[volID] + if ok { + if me.StagingPath == stagingTargetPath { + klog.Infof("mount-cache: node stage volume last cache entry for volume %s stagingTargetPath %s no equal %s", + volID, me.StagingPath, stagingTargetPath) + return nil + } + lastTargetPaths = me.TargetPaths + klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID) + } + + me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion} + + me.VolumeID = volID + me.Secrets = encodeCredentials(secrets) + me.StagingPath = stagingTargetPath + me.TargetPaths = lastTargetPaths + + curTime := time.Now() + me.CreateTime = curTime + me.CreateTime = curTime + me.LoadCount = 0 + volumeMountCache.Volumes[volID] = me + if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me) + return nil +} + +func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + delete(volumeMountCache.Volumes, volID) + if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil { + klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err) + return err + } + return nil +} + +func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + _, ok := volumeMountCache.Volumes[volID] + if !ok { + klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID) + return errors.New("mount-cache: node publish volume failed to find cache entry for volume") + } + volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly + me := volumeMountCache.Volumes[volID] + if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node publish volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + return nil +} + +func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error { + volumeMountCacheMtx.Lock() + defer volumeMountCacheMtx.Unlock() + + _, ok := volumeMountCache.Volumes[volID] + if !ok { + klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID) + return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume") + } + delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath) + me := volumeMountCache.Volumes[volID] + if err := mc.NodeCacheStore.Update(genVolumeMountCacheFileName(volID), me); err != nil { + klog.Errorf("mount-cache: node unpublish volume failed to store a cache entry for volume %s: %v", volID, err) + return err + } + return nil +} + +func encodeCredentials(input map[string]string) (output map[string]string) { + output = make(map[string]string) + for key, value := range input { + nKey := base64.StdEncoding.EncodeToString([]byte(key)) + nValue := base64.StdEncoding.EncodeToString([]byte(value)) + output[nKey] = nValue + } + return output +} + +func decodeCredentials(input map[string]string) (output map[string]string) { + output = make(map[string]string) + for key, value := range input { + nKey, err := base64.StdEncoding.DecodeString(key) + if err != nil { + klog.Errorf("mount-cache: decode secret fail") + continue + } + nValue, err := base64.StdEncoding.DecodeString(value) + if err != nil { + klog.Errorf("mount-cache: decode secret fail") + continue + } + output[string(nKey)] = string(nValue) + } + return output +} diff --git a/pkg/cephfs/mountcache_test.go b/pkg/cephfs/mountcache_test.go new file mode 100644 index 000000000..6bba59c55 --- /dev/null +++ b/pkg/cephfs/mountcache_test.go @@ -0,0 +1,38 @@ +package cephfs + +import ( + "testing" +) + +func init() { +} + +func TestMountOneCacheEntry(t *testing.T) { +} + +func TestRemountHisMountedPath(t *testing.T) { +} + +func TestNodeStageVolume(t *testing.T) { +} + +func TestNodeUnStageVolume(t *testing.T) { +} + +func TestNodePublishVolume(t *testing.T) { +} + +func TestNodeUnpublishVolume(t *testing.T) { +} + +func TestEncodeDecodeCredentials(t *testing.T) { + secrets := make(map[string]string) + secrets["user_1"] = "value_1" + enSecrets := encodeCredentials(secrets) + deSecrets := decodeCredentials(enSecrets) + for key, value := range secrets { + if deSecrets[key] != value { + t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key]) + } + } +} diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 51c44933a..345e4904d 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -154,6 +154,9 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ klog.Errorf("failed to mount volume %s: %v", volID, err) return status.Error(codes.Internal, err.Error()) } + if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil { + klog.Warningf("mount-cache: failed stage volume %s %s: %v", volID, stagingTargetPath, err) + } return nil } @@ -195,6 +198,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } + if err := volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil { + klog.Warningf("mount-cache: failed publish volume %s %s: %v", volID, targetPath, err) + } + klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath) return &csi.NodePublishVolumeResponse{}, nil @@ -209,6 +216,11 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil { + klog.Warningf("mount-cache: failed unpublish volume %s %s: %v", volID, targetPath, err) + } + // Unmount the bind-mount if err = unmountVolume(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -232,6 +244,11 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag stagingTargetPath := req.GetStagingTargetPath() + volID := req.GetVolumeId() + if err = volumeMountCache.nodeUnStageVolume(volID, stagingTargetPath); err != nil { + klog.Warningf("mount-cache: failed unstage volume %s %s: %v", volID, stagingTargetPath, err) + } + // Unmount the volume if err = unmountVolume(stagingTargetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/util/cachepersister.go b/pkg/util/cachepersister.go index ba8918587..4faf6366c 100644 --- a/pkg/util/cachepersister.go +++ b/pkg/util/cachepersister.go @@ -56,6 +56,7 @@ func NewCachePersister(metadataStore, driverName string) (CachePersister, error) klog.Infof("cache-persister: using node as metadata cache persister") nc := &NodeCache{} nc.BasePath = PluginFolder + "/" + driverName + nc.CacheDir = "controller" return nc, nil } return nil, errors.New("cache-persister: couldn't parse metadatastorage flag") diff --git a/pkg/util/nodecache.go b/pkg/util/nodecache.go index 5659d4eaa..86278f4d2 100644 --- a/pkg/util/nodecache.go +++ b/pkg/util/nodecache.go @@ -32,10 +32,9 @@ import ( // NodeCache to store metadata type NodeCache struct { BasePath string + CacheDir string } -var cacheDir = "controller" - var errDec = errors.New("file not found") // EnsureCacheDirectory creates cache directory if not present @@ -52,15 +51,15 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error { //ForAll list the metadata in Nodecache and filters outs based on the pattern func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - err := nc.EnsureCacheDirectory(cacheDir) + err := nc.EnsureCacheDirectory(nc.CacheDir) if err != nil { return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists") } - files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir)) + files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir)) if err != nil { return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath) } - path := path.Join(nc.BasePath, cacheDir) + path := path.Join(nc.BasePath, nc.CacheDir) for _, file := range files { err = decodeObj(path, pattern, file, destObj) if err == errDec { @@ -102,9 +101,23 @@ func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{}) } +func (nc *NodeCache) Update(identifier string, data interface{}) error { + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") + identifierTmp := identifier + ".creating" + fileTmp := path.Join(nc.BasePath, nc.CacheDir, identifierTmp+".json") + os.Remove(fileTmp) + if err := nc.Create(identifierTmp, data); err != nil { + return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) + } + if err := os.Rename(fileTmp, file); err != nil { + return errors.Wrapf(err, "node-cache: couldn't rename %s as %s", fileTmp, file) + } + return nil +} + // Create creates the metadata file in cache directory with identifier name func (nc *NodeCache) Create(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") fp, err := os.Create(file) if err != nil { return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file) @@ -126,7 +139,7 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error { // Get retrieves the metadata from cache directory with identifier name func (nc *NodeCache) Get(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") // #nosec fp, err := os.Open(file) if err != nil { @@ -153,7 +166,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error { // Delete deletes the metadata file from cache directory with identifier name func (nc *NodeCache) Delete(identifier string) error { - file := path.Join(nc.BasePath, cacheDir, identifier+".json") + file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") err := os.Remove(file) if err != nil { if err == os.ErrNotExist {