mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-17 10:19:30 +00:00
Merge pull request #282 from huaizong/improve-remount-pv-path-when-exit-v2
remount old mount point when csi plugin unexpect exit
This commit is contained in:
commit
d0d5da83c9
@ -31,6 +31,7 @@ var (
|
||||
nodeID = flag.String("nodeid", "", "node id")
|
||||
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
|
||||
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
|
||||
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -56,7 +57,7 @@ func main() {
|
||||
}
|
||||
|
||||
driver := cephfs.NewDriver()
|
||||
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp)
|
||||
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp)
|
||||
|
||||
os.Exit(0)
|
||||
}
|
||||
|
@ -10,6 +10,9 @@ metadata:
|
||||
release: {{ .Release.Name }}
|
||||
heritage: {{ .Release.Service }}
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["configmaps"]
|
||||
verbs: ["get", "list"]
|
||||
- apiGroups: [""]
|
||||
resources: ["nodes"]
|
||||
verbs: ["get", "list", "update"]
|
||||
|
@ -70,6 +70,7 @@ spec:
|
||||
- "--v=5"
|
||||
- "--drivername=$(DRIVER_NAME)"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
- "--mountcachedir=/mount-cache-dir"
|
||||
env:
|
||||
- name: HOST_ROOTFS
|
||||
value: "/rootfs"
|
||||
@ -83,6 +84,8 @@ spec:
|
||||
value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
||||
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.imagePullPolicy }}
|
||||
volumeMounts:
|
||||
- name: mount-cache-dir
|
||||
mountPath: /mount-cache-dir
|
||||
- name: plugin-dir
|
||||
mountPath: {{ .Values.socketDir }}
|
||||
- name: pods-mount-dir
|
||||
@ -103,6 +106,8 @@ spec:
|
||||
resources:
|
||||
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
|
||||
volumes:
|
||||
- name: mount-cache-dir
|
||||
emptyDir: {}
|
||||
- name: plugin-dir
|
||||
hostPath:
|
||||
path: {{ .Values.socketDir }}
|
||||
|
@ -55,6 +55,7 @@ spec:
|
||||
- "--v=5"
|
||||
- "--drivername=cephfs.csi.ceph.com"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
- "--mountcachedir=/mount-cache-dir"
|
||||
env:
|
||||
- name: NODE_ID
|
||||
valueFrom:
|
||||
@ -68,6 +69,8 @@ spec:
|
||||
value: unix:///csi/csi.sock
|
||||
imagePullPolicy: "IfNotPresent"
|
||||
volumeMounts:
|
||||
- name: mount-cache-dir
|
||||
mountPath: /mount-cache-dir
|
||||
- name: plugin-dir
|
||||
mountPath: /csi
|
||||
- name: csi-plugins-dir
|
||||
@ -84,6 +87,8 @@ spec:
|
||||
- name: host-dev
|
||||
mountPath: /dev
|
||||
volumes:
|
||||
- name: mount-cache-dir
|
||||
emptyDir: {}
|
||||
- name: plugin-dir
|
||||
hostPath:
|
||||
path: /var/lib/kubelet/plugins/cephfs.csi.ceph.com/
|
||||
|
@ -10,6 +10,9 @@ apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: cephfs-csi-nodeplugin
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["configmaps"]
|
||||
verbs: ["get", "list"]
|
||||
- apiGroups: [""]
|
||||
resources: ["nodes"]
|
||||
verbs: ["get", "list", "update"]
|
||||
|
@ -34,6 +34,7 @@ Option | Default value | Description
|
||||
`--nodeid` | _empty_ | This node's ID
|
||||
`--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed.
|
||||
`--metadatastorage` | _empty_ | Whether metadata should be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`)
|
||||
`--mountcachedir` | _empty_ | volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached.
|
||||
|
||||
**Available environmental variables:**
|
||||
|
||||
|
@ -77,7 +77,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// ceph CSI driver which can serve multiple parallel requests
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
|
||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
klog.Infof("Driver: %v version: %v", driverName, version)
|
||||
|
||||
// Configuration
|
||||
@ -105,6 +105,13 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
|
||||
klog.Fatalf("failed to write ceph configuration file: %v", err)
|
||||
}
|
||||
|
||||
initVolumeMountCache(driverName, mountCacheDir, cachePersister)
|
||||
if mountCacheDir != "" {
|
||||
if err := remountCachedVolumes(); err != nil {
|
||||
klog.Warningf("failed to remount cached volumes: %v", err)
|
||||
//ignore remount fail
|
||||
}
|
||||
}
|
||||
// Initialize default library driver
|
||||
|
||||
fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
|
||||
|
311
pkg/cephfs/mountcache.go
Normal file
311
pkg/cephfs/mountcache.go
Normal file
@ -0,0 +1,311 @@
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
type volumeMountCacheEntry struct {
|
||||
DriverVersion string `json:"driverVersion"`
|
||||
|
||||
VolumeID string `json:"volumeID"`
|
||||
Secrets map[string]string `json:"secrets"`
|
||||
StagingPath string `json:"stagingPath"`
|
||||
TargetPaths map[string]bool `json:"targetPaths"`
|
||||
CreateTime time.Time `json:"createTime"`
|
||||
}
|
||||
|
||||
type volumeMountCacheMap struct {
|
||||
volumes map[string]volumeMountCacheEntry
|
||||
nodeCacheStore util.NodeCache
|
||||
metadataStore util.CachePersister
|
||||
}
|
||||
|
||||
var (
|
||||
volumeMountCachePrefix = "cephfs-mount-cache-"
|
||||
volumeMountCache volumeMountCacheMap
|
||||
volumeMountCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) {
|
||||
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
|
||||
|
||||
volumeMountCache.metadataStore = cachePersister
|
||||
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
|
||||
volumeMountCache.nodeCacheStore.CacheDir = driverName
|
||||
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir)
|
||||
}
|
||||
|
||||
func remountCachedVolumes() error {
|
||||
if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil {
|
||||
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
|
||||
return err
|
||||
}
|
||||
var remountFailCount, remountSuccCount int64
|
||||
me := &volumeMountCacheEntry{}
|
||||
ce := &controllerCacheEntry{}
|
||||
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
|
||||
volID := me.VolumeID
|
||||
if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Infof("mount-cache: metadata not found, assuming the volume %s to be already deleted (%v)", volID, err)
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := mountOneCacheEntry(ce, me); err == nil {
|
||||
remountSuccCount++
|
||||
volumeMountCache.volumes[me.VolumeID] = *me
|
||||
klog.Infof("mount-cache: successfully remounted volume %s", volID)
|
||||
} else {
|
||||
remountFailCount++
|
||||
klog.Errorf("mount-cache: failed to remount volume %s", volID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: metastore list cache fail %v", err)
|
||||
return err
|
||||
}
|
||||
if remountFailCount > 0 {
|
||||
klog.Infof("mount-cache: successfully remounted %d volumes, failed to remount %d volumes", remountSuccCount, remountFailCount)
|
||||
} else {
|
||||
klog.Infof("mount-cache: successfully remounted %d volumes", remountSuccCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error {
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
var (
|
||||
err error
|
||||
cr *credentials
|
||||
)
|
||||
volID := ce.VolumeID
|
||||
volOptions := ce.VolOptions
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||
cr, err = getAdminCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var entity *cephEntity
|
||||
entity, err = getCephUser(&volOptions, cr, volID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cr = entity.toCredentials()
|
||||
} else {
|
||||
cr, err = getUserCredentials(decodeCredentials(me.Secrets))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = cleanupMountPoint(me.StagingPath)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
|
||||
return err
|
||||
}
|
||||
|
||||
isMnt, err := isMountPoint(me.StagingPath)
|
||||
if err != nil {
|
||||
isMnt = false
|
||||
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
|
||||
}
|
||||
|
||||
if !isMnt {
|
||||
m, err := newMounter(&volOptions)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
|
||||
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
for targetPath, readOnly := range me.TargetPaths {
|
||||
if err := cleanupMountPoint(targetPath); err == nil {
|
||||
if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil {
|
||||
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
|
||||
volID, me.StagingPath, targetPath, readOnly, err)
|
||||
} else {
|
||||
klog.Infof("mount-cache: successfully bind-mounted volume %s: %s %s %v",
|
||||
volID, me.StagingPath, targetPath, readOnly)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupMountPoint(mountPoint string) error {
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
if isCorruptedMnt(err) {
|
||||
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
|
||||
err := execCommandErr("umount", mountPoint)
|
||||
if err != nil {
|
||||
klog.Infof("mount-cache: failed to umount %s %v", mountPoint, err)
|
||||
//ignore error return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, err := os.Stat(mountPoint); err != nil {
|
||||
klog.Errorf("mount-cache: failed to stat mount point %s %v", mountPoint, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isCorruptedMnt(err error) bool {
|
||||
var underlyingError error
|
||||
switch pe := err.(type) {
|
||||
case nil:
|
||||
return false
|
||||
case *os.PathError:
|
||||
underlyingError = pe.Err
|
||||
case *os.LinkError:
|
||||
underlyingError = pe.Err
|
||||
case *os.SyscallError:
|
||||
underlyingError = pe.Err
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
CorruptedErrors := []error{
|
||||
syscall.ENOTCONN, syscall.ESTALE, syscall.EIO, syscall.EACCES}
|
||||
|
||||
for _, v := range CorruptedErrors {
|
||||
if underlyingError == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func genVolumeMountCacheFileName(volID string) string {
|
||||
cachePath := volumeMountCachePrefix + volID
|
||||
return cachePath
|
||||
}
|
||||
func (mc *volumeMountCacheMap) isEnable() bool {
|
||||
//if mount cache dir unset, disable state
|
||||
return mc.nodeCacheStore.BasePath != ""
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
lastTargetPaths := make(map[string]bool)
|
||||
me, ok := volumeMountCache.volumes[volID]
|
||||
if ok {
|
||||
if me.StagingPath == stagingTargetPath {
|
||||
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
|
||||
return nil
|
||||
}
|
||||
lastTargetPaths = me.TargetPaths
|
||||
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
|
||||
}
|
||||
|
||||
me = volumeMountCacheEntry{DriverVersion: version}
|
||||
|
||||
me.VolumeID = volID
|
||||
me.Secrets = encodeCredentials(secrets)
|
||||
me.StagingPath = stagingTargetPath
|
||||
me.TargetPaths = lastTargetPaths
|
||||
|
||||
me.CreateTime = time.Now()
|
||||
volumeMountCache.volumes[volID] = me
|
||||
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
delete(volumeMountCache.volumes, volID)
|
||||
return mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID))
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
|
||||
}
|
||||
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
|
||||
return mc.updateNodeCache(volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error {
|
||||
if !mc.isEnable() {
|
||||
return nil
|
||||
}
|
||||
volumeMountCacheMtx.Lock()
|
||||
defer volumeMountCacheMtx.Unlock()
|
||||
|
||||
_, ok := volumeMountCache.volumes[volID]
|
||||
if !ok {
|
||||
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
|
||||
}
|
||||
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
|
||||
return mc.updateNodeCache(volID)
|
||||
}
|
||||
|
||||
func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
|
||||
me := volumeMountCache.volumes[volID]
|
||||
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
|
||||
klog.Infof("mount-cache: metadata not found, delete mount cache failed for volume %s", volID)
|
||||
}
|
||||
return mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me)
|
||||
}
|
||||
|
||||
func encodeCredentials(input map[string]string) (output map[string]string) {
|
||||
output = make(map[string]string)
|
||||
for key, value := range input {
|
||||
nKey := base64.StdEncoding.EncodeToString([]byte(key))
|
||||
nValue := base64.StdEncoding.EncodeToString([]byte(value))
|
||||
output[nKey] = nValue
|
||||
}
|
||||
return output
|
||||
}
|
||||
|
||||
func decodeCredentials(input map[string]string) (output map[string]string) {
|
||||
output = make(map[string]string)
|
||||
for key, value := range input {
|
||||
nKey, err := base64.StdEncoding.DecodeString(key)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: decode secret fail")
|
||||
continue
|
||||
}
|
||||
nValue, err := base64.StdEncoding.DecodeString(value)
|
||||
if err != nil {
|
||||
klog.Errorf("mount-cache: decode secret fail")
|
||||
continue
|
||||
}
|
||||
output[string(nKey)] = string(nValue)
|
||||
}
|
||||
return output
|
||||
}
|
38
pkg/cephfs/mountcache_test.go
Normal file
38
pkg/cephfs/mountcache_test.go
Normal file
@ -0,0 +1,38 @@
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
func TestMountOneCacheEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemountHisMountedPath(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeStageVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeUnStageVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodePublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeUnpublishVolume(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEncodeDecodeCredentials(t *testing.T) {
|
||||
secrets := make(map[string]string)
|
||||
secrets["user_1"] = "value_1"
|
||||
enSecrets := encodeCredentials(secrets)
|
||||
deSecrets := decodeCredentials(enSecrets)
|
||||
for key, value := range secrets {
|
||||
if deSecrets[key] != value {
|
||||
t.Errorf("key %s of credentials's value %s change after decode %s ", key, value, deSecrets[key])
|
||||
}
|
||||
}
|
||||
}
|
@ -154,6 +154,9 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ
|
||||
klog.Errorf("failed to mount volume %s: %v", volID, err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil {
|
||||
klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -195,6 +198,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := volumeMountCache.nodePublishVolume(volID, targetPath, req.GetReadonly()); err != nil {
|
||||
klog.Warningf("mount-cache: failed to publish volume %s %s: %v", volID, targetPath, err)
|
||||
}
|
||||
|
||||
klog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath)
|
||||
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
@ -209,6 +216,11 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnPublishVolume(volID, targetPath); err != nil {
|
||||
klog.Warningf("mount-cache: failed to unpublish volume %s %s: %v", volID, targetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the bind-mount
|
||||
if err = unmountVolume(targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -232,6 +244,11 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
|
||||
klog.Warningf("mount-cache: failed to unstage volume %s %s: %v", volID, stagingTargetPath, err)
|
||||
}
|
||||
|
||||
// Unmount the volume
|
||||
if err = unmountVolume(stagingTargetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
@ -56,6 +56,7 @@ func NewCachePersister(metadataStore, driverName string) (CachePersister, error)
|
||||
klog.Infof("cache-persister: using node as metadata cache persister")
|
||||
nc := &NodeCache{}
|
||||
nc.BasePath = PluginFolder + "/" + driverName
|
||||
nc.CacheDir = "controller"
|
||||
return nc, nil
|
||||
}
|
||||
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
|
||||
|
@ -32,10 +32,9 @@ import (
|
||||
// NodeCache to store metadata
|
||||
type NodeCache struct {
|
||||
BasePath string
|
||||
CacheDir string
|
||||
}
|
||||
|
||||
var cacheDir = "controller"
|
||||
|
||||
var errDec = errors.New("file not found")
|
||||
|
||||
// EnsureCacheDirectory creates cache directory if not present
|
||||
@ -52,15 +51,15 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
|
||||
|
||||
//ForAll list the metadata in Nodecache and filters outs based on the pattern
|
||||
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
||||
err := nc.EnsureCacheDirectory(cacheDir)
|
||||
err := nc.EnsureCacheDirectory(nc.CacheDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "node-cache: couldn't ensure cache directory exists")
|
||||
}
|
||||
files, err := ioutil.ReadDir(path.Join(nc.BasePath, cacheDir))
|
||||
files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
|
||||
}
|
||||
path := path.Join(nc.BasePath, cacheDir)
|
||||
path := path.Join(nc.BasePath, nc.CacheDir)
|
||||
for _, file := range files {
|
||||
err = decodeObj(path, pattern, file, destObj)
|
||||
if err == errDec {
|
||||
@ -104,7 +103,7 @@ func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{})
|
||||
|
||||
// Create creates the metadata file in cache directory with identifier name
|
||||
func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
fp, err := os.Create(file)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
|
||||
@ -126,7 +125,7 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
||||
|
||||
// Get retrieves the metadata from cache directory with identifier name
|
||||
func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
// #nosec
|
||||
fp, err := os.Open(file)
|
||||
if err != nil {
|
||||
@ -153,7 +152,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
||||
|
||||
// Delete deletes the metadata file from cache directory with identifier name
|
||||
func (nc *NodeCache) Delete(identifier string) error {
|
||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
err := os.Remove(file)
|
||||
if err != nil {
|
||||
if err == os.ErrNotExist {
|
||||
|
Loading…
Reference in New Issue
Block a user