From d15ded88f558e70d2b6ca5f2f282e42adbc8fed3 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Fri, 10 Jul 2020 16:14:59 +0530 Subject: [PATCH] cleanup: Remove support for Delete and Unmounting v1.1.0 PVC as v1.0.0 is deprecated we need to remove the support for it in the Next coming (v3.0.0) release. This PR removes the support for the same. closes #882 Signed-off-by: Madhu Rajanna --- .../templates/nodeplugin-daemonset.yaml | 5 - .../templates/provisioner-deployment.yaml | 5 - .../templates/nodeplugin-daemonset.yaml | 4 - .../templates/provisioner-deployment.yaml | 4 - cmd/cephcsi.go | 21 +- .../csi-cephfsplugin-provisioner.yaml | 5 - .../cephfs/kubernetes/csi-cephfsplugin.yaml | 5 - .../kubernetes/csi-rbdplugin-provisioner.yaml | 4 - deploy/rbd/kubernetes/csi-rbdplugin.yaml | 4 - docs/deploy-cephfs.md | 10 +- docs/deploy-rbd.md | 1 - internal/cephfs/cephuser.go | 49 ----- internal/cephfs/controllerserver.go | 78 -------- internal/cephfs/driver.go | 14 +- internal/cephfs/util.go | 6 - internal/cephfs/volume.go | 75 +------ internal/rbd/controllerserver.go | 70 +------ internal/rbd/driver.go | 9 +- internal/rbd/nodeserver.go | 34 +--- internal/rbd/nodeserver_test.go | 19 -- internal/rbd/rbd_util.go | 75 +------ internal/rbd/rbd_util_test.go | 18 -- internal/util/cachepersister.go | 56 ------ internal/util/k8s.go | 52 +++++ internal/util/k8scmcache.go | 187 ------------------ internal/util/nodecache.go | 165 ---------------- internal/util/util.go | 40 +--- 27 files changed, 81 insertions(+), 934 deletions(-) delete mode 100644 internal/cephfs/cephuser.go delete mode 100644 internal/util/cachepersister.go create mode 100644 internal/util/k8s.go delete mode 100644 internal/util/k8scmcache.go delete mode 100644 internal/util/nodecache.go diff --git a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml index 66f2bd1a0..fcc6581b7 100644 --- a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml @@ -82,7 +82,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=$(DRIVER_NAME)" - - "--metadatastorage=k8s_configmap" {{- if .Values.topology.enabled }} - "--domainlabels={{ .Values.topology.domainLabels | join "," }}" {{- end }} @@ -97,10 +96,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.pluginSocketFile }}" securityContext: diff --git a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml index 685ed2eb1..b9e25e485 100644 --- a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml +++ b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml @@ -98,7 +98,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=$(DRIVER_NAME)" - - "--metadatastorage=k8s_configmap" env: - name: POD_IP valueFrom: @@ -110,10 +109,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.provisionerSocketFile }}" securityContext: diff --git a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml index 89ad83542..eb31759d3 100644 --- a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml @@ -90,10 +90,6 @@ spec: fieldPath: status.podIP - name: DRIVER_NAME value: {{ .Values.driverName }} - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: NODE_ID valueFrom: fieldRef: diff --git a/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml b/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml index 290ea8112..55777e785 100644 --- a/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml +++ b/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml @@ -131,10 +131,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.provisionerSocketFile }}" securityContext: diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index a17e78722..c4c73d55c 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -20,7 +20,6 @@ import ( "flag" "fmt" "os" - "path/filepath" "runtime" "time" @@ -54,8 +53,6 @@ func init() { flag.StringVar(&conf.NodeID, "nodeid", "", "node id") flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ " instances, when sharing Ceph clusters across CSI instances for provisioning") - flag.StringVar(&conf.MetadataStorage, "metadatastorage", "", "metadata persistence method [node|k8s_configmap]") - flag.StringVar(&conf.PluginPath, "pluginpath", "/var/lib/kubelet/plugins/", "the location of cephcsi plugin") flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") flag.BoolVar(&conf.IsControllerServer, "controllerserver", false, "start cephcsi controller server") flag.BoolVar(&conf.IsNodeServer, "nodeserver", false, "start cephcsi node server") @@ -63,8 +60,6 @@ func init() { " domain the node belongs to, separated by ','") // cephfs related flags - // marking this as deprecated, remove it in next major release - flag.StringVar(&conf.MountCacheDir, "mountcachedir", "", "mount info cache save dir") flag.BoolVar(&conf.ForceKernelCephFS, "forcecephkernelclient", false, "enable Ceph Kernel clients on kernel < 4.17 which support quotas") // liveness/grpc metrics related flags @@ -123,7 +118,6 @@ func main() { os.Exit(0) } util.DefaultLog("Driver version: %s and Git version: %s", util.DriverVersion, util.GitCommit) - var cp util.CachePersister if conf.Vtype == "" { klog.Fatalln("driver type not specified") @@ -134,14 +128,6 @@ func main() { if err != nil { klog.Fatalln(err) // calls exit } - csipluginPath := filepath.Join(conf.PluginPath, dname) - if conf.MetadataStorage != "" { - cp, err = util.CreatePersistanceStorage( - csipluginPath, conf.MetadataStorage, conf.PluginPath) - if err != nil { - os.Exit(1) - } - } // the driver may need a higher PID limit for handling all concurrent requests if conf.PidLimit != 0 { @@ -183,14 +169,11 @@ func main() { validateCloneDepthFlag(&conf) validateMaxSnaphostFlag(&conf) driver := rbd.NewDriver() - driver.Run(&conf, cp) + driver.Run(&conf) case cephfsType: - if conf.MountCacheDir != "" { - klog.Warning("mountcachedir option is deprecated") - } driver := cephfs.NewDriver() - driver.Run(&conf, cp) + driver.Run(&conf) case livenessType: liveness.Run(&conf) diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index 32c73bf0b..5586a1851 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -91,7 +91,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=cephfs.csi.ceph.com" - - "--metadatastorage=k8s_configmap" - "--pidlimit=-1" env: - name: POD_IP @@ -102,10 +101,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi-provisioner.sock imagePullPolicy: "IfNotPresent" diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index adea0ff0c..d62ca9d8b 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -62,7 +62,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=cephfs.csi.ceph.com" - - "--metadatastorage=k8s_configmap" # If topology based provisioning is desired, configure required # node labels representing the nodes topology domain # and pass the label names below, for CSI to consume and advertize @@ -77,10 +76,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index fb90ff570..8e33edfa0 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -74,10 +74,6 @@ spec: env: - name: ADDRESS value: /csi/csi-provisioner.sock - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace imagePullPolicy: "IfNotPresent" volumeMounts: - name: socket-dir diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index 8a4ca5eae..de6736155 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -77,10 +77,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 690ba3562..84c4fffc0 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -44,14 +44,13 @@ that should be resolved in v14.2.3. **Available command line arguments:** | Option | Default value | Description | -|---------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| ------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | | `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | | `--nodeid` | _empty_ | This node's ID | | `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | | `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host | -| `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | | `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available | @@ -72,9 +71,6 @@ CephFS mounter on kernels < 4.17. the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself). -`POD_NAMESPACE`: if you use `k8s_configmap` as metadata store, `POD_NAMESPACE` -is used to define in which namespace you want the configmaps to be stored - **Available volume parameters:** | Parameter | Required | Description | @@ -83,8 +79,8 @@ is used to define in which namespace you want the configmaps to be stored | `fsName` | yes | CephFS filesystem name into which the volume shall be created | | `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter". | | `pool` | no | Ceph pool into which volume data shall be stored | -| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). | -| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) +| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). | +| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) | | `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. | | `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. | | `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value | diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index a56e1134c..ae5b396ea 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -33,7 +33,6 @@ make image-cephcsi | `--nodeid` | _empty_ | This node's ID | | `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | -| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | | `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available | diff --git a/internal/cephfs/cephuser.go b/internal/cephfs/cephuser.go deleted file mode 100644 index 1ac674397..000000000 --- a/internal/cephfs/cephuser.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "context" - - "github.com/ceph/ceph-csi/internal/util" -) - -const ( - cephUserPrefix = "user-" - cephEntityClientPrefix = "client." -) - -func genUserIDs(adminCr *util.Credentials, volID volumeID) (adminID, userID string) { - return cephEntityClientPrefix + adminCr.ID, cephEntityClientPrefix + getCephUserName(volID) -} - -func getCephUserName(volID volumeID) string { - return cephUserPrefix + string(volID) -} - -func deleteCephUserDeprecated(ctx context.Context, volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error { - adminID, userID := genUserIDs(adminCr, volID) - - // TODO: Need to return success if userID is not found - return execCommandErr(ctx, "ceph", - "-m", volOptions.Monitors, - "-n", adminID, - "--keyfile="+adminCr.KeyFile, - "-c", util.CephConfigPath, - "auth", "rm", userID, - ) -} diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index c1051ed32..fa8502486 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -33,17 +33,11 @@ import ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer - MetadataStore util.CachePersister // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks } -type controllerCacheEntry struct { - VolOptions volumeOptions - VolumeID volumeID -} - // createBackingVolume creates the backing subvolume and on any error cleans up any created entities func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { cr, err := util.NewAdminCredentials(secret) @@ -156,72 +150,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return &csi.CreateVolumeResponse{Volume: volume}, nil } -// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin, -// that have state information stored in files or kubernetes config maps -func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - var ( - volID = volumeID(req.GetVolumeId()) - secrets = req.GetSecrets() - ) - - ce := &controllerCacheEntry{} - if err := cs.MetadataStore.Get(string(volID), ce); err != nil { - if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Warningf(util.Log(ctx, "cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)"), volID, err) - return &csi.DeleteVolumeResponse{}, nil - } - - return nil, status.Error(codes.Internal, err.Error()) - } - - if !ce.VolOptions.ProvisionVolume { - // DeleteVolume() is forbidden for statically provisioned volumes! - - klog.Warningf(util.Log(ctx, "volume %s is provisioned statically, aborting delete"), volID) - return &csi.DeleteVolumeResponse{}, nil - } - - // mons may have changed since create volume, - // retrieve the latest mons and override old mons - if mon, secretsErr := util.GetMonValFromSecret(secrets); secretsErr == nil && len(mon) > 0 { - util.ExtendedLog(ctx, "overriding monitors [%q] with [%q] for volume %s", ce.VolOptions.Monitors, mon, volID) - ce.VolOptions.Monitors = mon - } - - // Deleting a volume requires admin credentials - - cr, err := util.NewAdminCredentials(secrets) - if err != nil { - klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - defer cr.DeleteCredentials() - - if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID)) - } - defer cs.VolumeLocks.Release(string(volID)) - - if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil { - klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err = deleteCephUserDeprecated(ctx, &ce.VolOptions, cr, volID); err != nil { - klog.Errorf(util.Log(ctx, "failed to delete ceph user for volume %s: %v"), volID, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err = cs.MetadataStore.Delete(string(volID)); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - util.DebugLog(ctx, "cephfs: successfully deleted volume %s", volID) - - return &csi.DeleteVolumeResponse{}, nil -} - // DeleteVolume deletes the volume in backend and its reservation func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if err := cs.validateDeleteVolumeRequest(); err != nil { @@ -257,12 +185,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } - // ErrInvalidVolID may mean this is an 1.0.0 version volume - var eivi ErrInvalidVolID - if errors.As(err, &eivi) && cs.MetadataStore != nil { - return cs.deleteVolumeDeprecated(ctx, req) - } - // All errors other than ErrVolumeNotFound should return an error back to the caller var evnf ErrVolumeNotFound if !errors.As(err, &evnf) { diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index d4f9476a7..df14c216d 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -37,9 +37,6 @@ const ( radosNamespace = "csi" ) -// PluginFolder defines the location of ceph plugin -var PluginFolder = "" - // Driver contains the default identity,node and controller struct type Driver struct { cd *csicommon.CSIDriver @@ -72,10 +69,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { } // NewControllerServer initialize a controller server for ceph CSI driver -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { +func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), - MetadataStore: cachePersister, VolumeLocks: util.NewVolumeLocks(), } } @@ -90,13 +86,11 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { +func (fs *Driver) Run(conf *util.Config) { var err error var topology map[string]string // Configuration - PluginFolder = conf.PluginPath - if err = loadAvailableMounters(conf); err != nil { klog.Fatalf("cephfs: failed to load ceph mounters: %v", err) } @@ -142,7 +136,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { } if conf.IsControllerServer { - fs.cs = NewControllerServer(fs.cd, cachePersister) + fs.cs = NewControllerServer(fs.cd) } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) @@ -150,7 +144,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { klog.Fatalln(err) } fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology) - fs.cs = NewControllerServer(fs.cd, cachePersister) + fs.cs = NewControllerServer(fs.cd) } server := csicommon.NewNonBlockingGRPCServer() diff --git a/internal/cephfs/util.go b/internal/cephfs/util.go index b7482ef27..8ae92ae61 100644 --- a/internal/cephfs/util.go +++ b/internal/cephfs/util.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "os" "os/exec" "github.com/ceph/ceph-csi/internal/util" @@ -77,11 +76,6 @@ func execCommandJSON(ctx context.Context, v interface{}, program string, args .. return nil } -func pathExists(p string) bool { - _, err := os.Stat(p) - return err == nil -} - // Controller service request validation func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { diff --git a/internal/cephfs/volume.go b/internal/cephfs/volume.go index 9c89e1075..2d36476b8 100644 --- a/internal/cephfs/volume.go +++ b/internal/cephfs/volume.go @@ -19,7 +19,6 @@ package cephfs import ( "context" "fmt" - "os" "path" "strconv" "strings" @@ -40,18 +39,14 @@ var ( inValidCommmand = "no valid command found" ) -func getCephRootVolumePathLocalDeprecated(volID volumeID) string { - return path.Join(getCephRootPathLocalDeprecated(volID), "csi-volumes", string(volID)) -} +const ( + cephEntityClientPrefix = "client." +) func getVolumeRootPathCephDeprecated(volID volumeID) string { return path.Join("/", "csi-volumes", string(volID)) } -func getCephRootPathLocalDeprecated(volID volumeID) string { - return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID)) -} - func getVolumeNotFoundErrorString(volID volumeID) string { return fmt.Sprintf("Error ENOENT: Subvolume '%s' not found", string(volID)) } @@ -201,70 +196,6 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede return createVolume(ctx, volOptions, cr, volID, bytesQuota) } -func mountCephRoot(ctx context.Context, volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error { - cephRoot := getCephRootPathLocalDeprecated(volID) - - // Root path is not set for dynamically provisioned volumes - // Access to cephfs's / is required - volOptions.RootPath = "/" - - if err := util.CreateMountPoint(cephRoot); err != nil { - return err - } - - m, err := newMounter(volOptions) - if err != nil { - return fmt.Errorf("failed to create mounter: %v", err) - } - - if err = m.mount(ctx, cephRoot, adminCr, volOptions); err != nil { - return fmt.Errorf("error mounting ceph root: %v", err) - } - - return nil -} - -func unmountCephRoot(ctx context.Context, volID volumeID) { - cephRoot := getCephRootPathLocalDeprecated(volID) - - if err := unmountVolume(ctx, cephRoot); err != nil { - klog.Errorf(util.Log(ctx, "failed to unmount %s with error %s"), cephRoot, err) - } else { - if err := os.Remove(cephRoot); err != nil { - klog.Errorf(util.Log(ctx, "failed to remove %s with error %s"), cephRoot, err) - } - } -} - -func purgeVolumeDeprecated(ctx context.Context, volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error { - if err := mountCephRoot(ctx, volID, volOptions, adminCr); err != nil { - return err - } - defer unmountCephRoot(ctx, volID) - - var ( - volRoot = getCephRootVolumePathLocalDeprecated(volID) - volRootDeleting = volRoot + "-deleting" - ) - - if pathExists(volRoot) { - if err := os.Rename(volRoot, volRootDeleting); err != nil { - return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err) - } - } else { - if !pathExists(volRootDeleting) { - util.DebugLog(ctx, "cephfs: volume %s not found, assuming it to be already deleted", volID) - return nil - } - } - - if err := os.RemoveAll(volRootDeleting); err != nil { - return fmt.Errorf("failed to delete volume %s: %v", volID, err) - } - - return nil -} - func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error { err := execCommandErr( ctx, diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 85b8af0f8..05f028b46 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -40,7 +40,6 @@ const ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer - MetadataStore util.CachePersister // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks @@ -100,7 +99,7 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi. } // if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks - rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock), false) + rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock)) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -461,60 +460,6 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source") } -// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin -func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.DeleteVolumeRequest, cr *util.Credentials) (*csi.DeleteVolumeResponse, error) { - volumeID := req.GetVolumeId() - - if cs.MetadataStore == nil { - return nil, status.Errorf(codes.InvalidArgument, "missing metadata store configuration to"+ - " proceed with deleting legacy volume ID (%s)", volumeID) - } - - if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) - } - defer cs.VolumeLocks.Release(volumeID) - - rbdVol := &rbdVolume{} - if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { - if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Warningf(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err) - return &csi.DeleteVolumeResponse{}, nil - } - - return nil, status.Error(codes.Internal, err.Error()) - } - - // Fill up Monitors - if err := updateMons(rbdVol, nil, req.GetSecrets()); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - defer rbdVol.Destroy() - - err := rbdVol.Connect(cr) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - // Update rbdImageName as the VolName when dealing with version 1 volumes - rbdVol.RbdImageName = rbdVol.VolName - - util.DebugLog(ctx, "deleting legacy volume %s", rbdVol.VolName) - if err := deleteImage(ctx, rbdVol, cr); err != nil { - // TODO: can we detect "already deleted" situations here and proceed? - klog.Errorf(util.Log(ctx, "failed to delete legacy rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err := cs.MetadataStore.Delete(volumeID); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - return &csi.DeleteVolumeResponse{}, nil -} - // DeleteVolume deletes the volume in backend and removes the volume metadata // from store // TODO: make this function less complex @@ -554,19 +499,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } - // If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt - // to process it as such - var eivi ErrInvalidVolID - if errors.As(err, &eivi) { - if isLegacyVolumeID(volumeID) { - util.UsefulLog(ctx, "attempting deletion of potential legacy volume (%s)", volumeID) - return cs.DeleteLegacyVolume(ctx, req, cr) - } - - // Consider unknown volumeID as a successfully deleted volume - return &csi.DeleteVolumeResponse{}, nil - } - // if error is ErrKeyNotFound, then a previous attempt at deletion was complete // or partially complete (image and imageOMap are garbage collected already), hence return // success as deletion is complete diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index 65ad8ec55..5efef6c97 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -75,10 +75,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { } // NewControllerServer initialize a controller server for rbd CSI driver -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { +func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), - MetadataStore: cachePersister, VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), } @@ -96,7 +95,7 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // Run start a non-blocking grpc controller,node and identityserver for // rbd CSI driver which can serve multiple parallel requests -func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { +func (r *Driver) Run(conf *util.Config) { var err error var topology map[string]string @@ -155,7 +154,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { } if conf.IsControllerServer { - r.cs = NewControllerServer(r.cd, cachePersister) + r.cs = NewControllerServer(r.cd) } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) @@ -166,7 +165,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) } - r.cs = NewControllerServer(r.cd, cachePersister) + r.cs = NewControllerServer(r.cd) } s := csicommon.NewNonBlockingGRPCServer() diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 081d170eb..ebc81d9ea 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "strconv" - "strings" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/journal" @@ -155,23 +154,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } - isLegacyVolume := isLegacyVolumeID(volID) - volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume) + volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } // get rbd image name from the volume journal // for static volumes, the image name is actually the volume ID itself - // for legacy volumes (v1.0.0), the image name can be found in the staging path switch { case staticVol: volOptions.RbdImageName = volID - case isLegacyVolume: - volOptions.RbdImageName, err = getLegacyVolumeName(stagingTargetPath) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } default: var vi util.CSIIdentifier var imageAttributes *journal.ImageAttributes @@ -421,30 +413,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } -func getLegacyVolumeName(mountPath string) (string, error) { - var volName string - - if strings.HasSuffix(mountPath, "/globalmount") { - s := strings.Split(strings.TrimSuffix(mountPath, "/globalmount"), "/") - volName = s[len(s)-1] - return volName, nil - } - - if strings.HasSuffix(mountPath, "/mount") { - s := strings.Split(strings.TrimSuffix(mountPath, "/mount"), "/") - volName = s[len(s)-1] - return volName, nil - } - - // get volume name for block volume - s := strings.Split(mountPath, "/") - if len(s) == 0 { - return "", fmt.Errorf("rbd: malformed value of stage target path: %s", mountPath) - } - volName = s[len(s)-1] - return volName, nil -} - func (ns *NodeServer) mountVolumeToStagePath(ctx context.Context, req *csi.NodeStageVolumeRequest, staticVol bool, stagingPath, devicePath string) (bool, error) { readOnly := false fsType := req.GetVolumeCapability().GetMount().GetFsType() diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 0bc8aac4a..e0ef185b0 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -22,25 +22,6 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" ) -func TestGetLegacyVolumeName(t *testing.T) { - tests := []struct { - mountPath string - volName string - }{ - {"csi/vol/56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea/globalmount", "56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea"}, - {"csi/vol/9fdb7491-3469-4414-8fe2-ea96be6f7f72/mount", "9fdb7491-3469-4414-8fe2-ea96be6f7f72"}, - {"csi/vol/82cd91c4-4582-47b3-bb08-a84f8c5716d6", "82cd91c4-4582-47b3-bb08-a84f8c5716d6"}, - } - - for _, test := range tests { - if got, err := getLegacyVolumeName(test.mountPath); err != nil { - t.Errorf("getLegacyVolumeName(%s) returned error when it shouldn't: %s", test.mountPath, err.Error()) - } else if got != test.volName { - t.Errorf("getLegacyVolumeName(%s) = %s, want %s", test.mountPath, got, test.volName) - } - } -} - func TestGetStagingPath(t *testing.T) { var stagingPath string // test with nodestagevolumerequest diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index de71a51ae..4c8892ad3 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -36,7 +36,6 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" - "github.com/pborman/uuid" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cloud-provider/volume/helpers" klog "k8s.io/klog/v2" @@ -698,66 +697,7 @@ func getMonsAndClusterID(ctx context.Context, options map[string]string) (monito return } -// isLegacyVolumeID checks if passed in volume ID string conforms to volume ID naming scheme used -// by the version 1.0.0 (legacy) of the plugin, and returns true if found to be conforming -func isLegacyVolumeID(volumeID string) bool { - // Version 1.0.0 volumeID format: "csi-rbd-vol-" + UUID string - // length: 12 ("csi-rbd-vol-") + 36 (UUID string) - - // length check - if len(volumeID) != 48 { - return false - } - - // Header check - if !strings.HasPrefix(volumeID, "csi-rbd-vol-") { - return false - } - - // Trailer UUID format check - if uuid.Parse(volumeID[12:]) == nil { - return false - } - - return true -} - -// upadateMons function is used to update the rbdVolume.Monitors for volumes that were provisioned -// using the 1.0.0 version (legacy) of the plugin. -func updateMons(rbdVol *rbdVolume, options, credentials map[string]string) error { - var ok bool - - // read monitors and MonValueFromSecret from options, else check passed in rbdVolume for - // MonValueFromSecret key in credentials - monInSecret := "" - if options != nil { - if rbdVol.Monitors, ok = options["monitors"]; !ok { - rbdVol.Monitors = "" - } - if monInSecret, ok = options["monValueFromSecret"]; !ok { - monInSecret = "" - } - } else { - monInSecret = rbdVol.MonValueFromSecret - } - - // if monitors are present in secrets and we have the credentials, use monitors from the - // credentials overriding monitors from other sources - if monInSecret != "" && credentials != nil { - monsFromSecret, ok := credentials[monInSecret] - if ok { - rbdVol.Monitors = monsFromSecret - } - } - - if rbdVol.Monitors == "" { - return errors.New("either monitors or monValueFromSecret must be set") - } - - return nil -} - -func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks, isLegacyVolume bool) (*rbdVolume, error) { +func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) { var ( ok bool err error @@ -776,16 +716,9 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st rbdVol.NamePrefix = namePrefix } - if isLegacyVolume { - err = updateMons(rbdVol, volOptions, credentials) - if err != nil { - return nil, err - } - } else { - rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions) - if err != nil { - return nil, err - } + rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions) + if err != nil { + return nil, err } // if no image features is provided, it results in empty string diff --git a/internal/rbd/rbd_util_test.go b/internal/rbd/rbd_util_test.go index 8b458d617..a6637a9c5 100644 --- a/internal/rbd/rbd_util_test.go +++ b/internal/rbd/rbd_util_test.go @@ -23,24 +23,6 @@ import ( librbd "github.com/ceph/go-ceph/rbd" ) -func TestIsLegacyVolumeID(t *testing.T) { - tests := []struct { - volID string - isLegacy bool - }{ - {"prefix-bda37d42-9979-420f-9389-74362f3f98f6", false}, - {"csi-rbd-vo-f997e783-ff00-48b0-8cc7-30cb36c3df3d", false}, - {"csi-rbd-vol-this-is-clearly-not-a-valid-UUID----", false}, - {"csi-rbd-vol-b82f27de-3b3a-43f2-b5e7-9f8d0aad04e9", true}, - } - - for _, test := range tests { - if got := isLegacyVolumeID(test.volID); got != test.isLegacy { - t.Errorf("isLegacyVolumeID(%s) = %t, want %t", test.volID, got, test.isLegacy) - } - } -} - func TestHasSnapshotFeature(t *testing.T) { tests := []struct { features string diff --git a/internal/util/cachepersister.go b/internal/util/cachepersister.go deleted file mode 100644 index 6a27a7c8a..000000000 --- a/internal/util/cachepersister.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "errors" -) - -// ForAllFunc is a unary predicate for visiting all cache entries -// matching the `pattern' in CachePersister's ForAll function. -type ForAllFunc func(identifier string) error - -// CacheEntryNotFound is an error type for "Not Found" cache errors -type CacheEntryNotFound struct { - error -} - -// CachePersister interface implemented for store -type CachePersister interface { - Create(identifier string, data interface{}) error - Get(identifier string, data interface{}) error - ForAll(pattern string, destObj interface{}, f ForAllFunc) error - Delete(identifier string) error -} - -// NewCachePersister returns CachePersister based on store -func NewCachePersister(metadataStore, pluginPath string) (CachePersister, error) { - if metadataStore == "k8s_configmap" { - DebugLogMsg("cache-perister: using kubernetes configmap as metadata cache persister") - k8scm := &K8sCMCache{} - k8scm.Client = NewK8sClient() - k8scm.Namespace = GetK8sNamespace() - return k8scm, nil - } else if metadataStore == "node" { - DebugLogMsg("cache-persister: using node as metadata cache persister") - nc := &NodeCache{} - nc.BasePath = pluginPath - nc.CacheDir = "controller" - return nc, nil - } - return nil, errors.New("cache-persister: couldn't parse metadatastorage flag") -} diff --git a/internal/util/k8s.go b/internal/util/k8s.go new file mode 100644 index 000000000..34b349978 --- /dev/null +++ b/internal/util/k8s.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "os" + + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + klog "k8s.io/klog/v2" +) + +// NewK8sClient create kubernetes client +func NewK8sClient() *k8s.Clientset { + var cfg *rest.Config + var err error + cPath := os.Getenv("KUBERNETES_CONFIG_PATH") + if cPath != "" { + cfg, err = clientcmd.BuildConfigFromFlags("", cPath) + if err != nil { + klog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } else { + cfg, err = rest.InClusterConfig() + if err != nil { + klog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } + client, err := k8s.NewForConfig(cfg) + if err != nil { + klog.Errorf("Failed to create client with error: %v\n", err) + os.Exit(1) + } + return client +} diff --git a/internal/util/k8scmcache.go b/internal/util/k8scmcache.go deleted file mode 100644 index 6c2a03b8e..000000000 --- a/internal/util/k8scmcache.go +++ /dev/null @@ -1,187 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "context" - "encoding/json" - "fmt" - "os" - "regexp" - - v1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - klog "k8s.io/klog/v2" -) - -// K8sCMCache to store metadata -type K8sCMCache struct { - Client *k8s.Clientset - Namespace string -} - -const ( - defaultNamespace = "default" - - cmLabel = "csi-metadata" - cmDataKey = "content" - - csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata" -) - -// GetK8sNamespace returns pod namespace. if pod namespace is empty -// it returns default namespace -func GetK8sNamespace() string { - namespace := os.Getenv("POD_NAMESPACE") - if namespace == "" { - return defaultNamespace - } - return namespace -} - -// NewK8sClient create kubernetes client -func NewK8sClient() *k8s.Clientset { - var cfg *rest.Config - var err error - cPath := os.Getenv("KUBERNETES_CONFIG_PATH") - if cPath != "" { - cfg, err = clientcmd.BuildConfigFromFlags("", cPath) - if err != nil { - klog.Errorf("Failed to get cluster config with error: %v\n", err) - os.Exit(1) - } - } else { - cfg, err = rest.InClusterConfig() - if err != nil { - klog.Errorf("Failed to get cluster config with error: %v\n", err) - os.Exit(1) - } - } - client, err := k8s.NewForConfig(cfg) - if err != nil { - klog.Errorf("Failed to create client with error: %v\n", err) - os.Exit(1) - } - return client -} - -func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) { - cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(context.TODO(), resourceID, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return cm, nil -} - -// ForAll list the metadata in configmaps and filters outs based on the pattern -func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)} - cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(context.TODO(), listOpts) - if err != nil { - return fmt.Errorf("k8s-cm-cache: failed to list metadata configmaps: %w", err) - } - - for i := range cms.Items { - data := cms.Items[i].Data[cmDataKey] - match, err := regexp.MatchString(pattern, cms.Items[i].ObjectMeta.Name) - if err != nil { - continue - } - if !match { - continue - } - if err = json.Unmarshal([]byte(data), destObj); err != nil { - return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", cms.Items[i].ObjectMeta.Name, err) - } - if err = f(cms.Items[i].ObjectMeta.Name); err != nil { - return err - } - } - return nil -} - -// Create stores the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error { - cm, err := k8scm.getMetadataCM(identifier) - if cm != nil && err == nil { - DebugLogMsg("k8s-cm-cache: configmap %s already exists, skipping configmap creation", identifier) - return nil - } - dataJSON, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("k8s-cm-cache: JSON marshaling failed for configmap %s: %w", identifier, err) - } - cm = &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: identifier, - Namespace: k8scm.Namespace, - Labels: map[string]string{ - csiMetadataLabelAttr: cmLabel, - }, - }, - Data: map[string]string{}, - } - cm.Data[cmDataKey] = string(dataJSON) - - _, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) - if err != nil { - if apierrs.IsAlreadyExists(err) { - DebugLogMsg("k8s-cm-cache: configmap %s already exists", identifier) - return nil - } - return fmt.Errorf("k8s-cm-cache: couldn't persist %s metadata as configmap: %w", identifier, err) - } - - DebugLogMsg("k8s-cm-cache: configmap %s successfully created", identifier) - return nil -} - -// Get retrieves the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error { - cm, err := k8scm.getMetadataCM(identifier) - if err != nil { - if apierrs.IsNotFound(err) { - return &CacheEntryNotFound{err} - } - - return err - } - err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data) - if err != nil { - return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", identifier, err) - } - return nil -} - -// Delete deletes the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Delete(identifier string) error { - err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(context.TODO(), identifier, metav1.DeleteOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - DebugLogMsg("k8s-cm-cache: cannot delete missing metadata configmap %s, assuming it's already deleted", identifier) - return nil - } - - return fmt.Errorf("k8s-cm-cache: couldn't delete metadata configmap %s: %w", identifier, err) - } - DebugLogMsg("k8s-cm-cache: successfully deleted metadata configmap %s", identifier) - return nil -} diff --git a/internal/util/nodecache.go b/internal/util/nodecache.go deleted file mode 100644 index 3c5c588c2..000000000 --- a/internal/util/nodecache.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "os" - "path" - "path/filepath" - "regexp" - "strings" - - klog "k8s.io/klog/v2" -) - -// NodeCache to store metadata -type NodeCache struct { - BasePath string - CacheDir string -} - -var errDec = errors.New("file not found") - -// EnsureCacheDirectory creates cache directory if not present -func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error { - fullPath := path.Join(nc.BasePath, cacheDir) - if _, err := os.Stat(fullPath); os.IsNotExist(err) { - // #nosec - if err := os.Mkdir(fullPath, 0755); err != nil { - return fmt.Errorf("node-cache: failed to create %s folder: %w", fullPath, err) - } - } - return nil -} - -// ForAll list the metadata in Nodecache and filters outs based on the pattern -func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - err := nc.EnsureCacheDirectory(nc.CacheDir) - if err != nil { - return fmt.Errorf("node-cache: couldn't ensure cache directory exists: %w", err) - } - files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir)) - if err != nil { - return fmt.Errorf("node-cache: failed to read %s folder: %w", nc.BasePath, err) - } - cachePath := path.Join(nc.BasePath, nc.CacheDir) - for _, file := range files { - err = decodeObj(cachePath, pattern, file, destObj) - if errors.Is(err, errDec) { - continue - } else if err == nil { - if err = f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil { - return err - } - } - return err - } - return nil -} - -func decodeObj(fpath, pattern string, file os.FileInfo, destObj interface{}) error { - match, err := regexp.MatchString(pattern, file.Name()) - if err != nil || !match { - return errDec - } - if !strings.HasSuffix(file.Name(), ".json") { - return errDec - } - // #nosec - fp, err := os.Open(path.Join(fpath, file.Name())) - if err != nil { - DebugLogMsg("node-cache: open file: %s err %v", file.Name(), err) - return errDec - } - decoder := json.NewDecoder(fp) - if err = decoder.Decode(destObj); err != nil { - if err = fp.Close(); err != nil { - return fmt.Errorf("failed to close file %s: %w", file.Name(), err) - } - return fmt.Errorf("node-cache: couldn't decode file %s: %w", file.Name(), err) - } - return nil -} - -// Create creates the metadata file in cache directory with identifier name -func (nc *NodeCache) Create(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - fp, err := os.Create(file) - if err != nil { - return fmt.Errorf("node-cache: failed to create metadata storage file %s: %w", file, err) - } - - defer func() { - if err = fp.Close(); err != nil { - klog.Warningf("failed to close file:%s %v", fp.Name(), err) - } - }() - - encoder := json.NewEncoder(fp) - if err = encoder.Encode(data); err != nil { - return fmt.Errorf("node-cache: failed to encode metadata for file: %s: %w", file, err) - } - DebugLogMsg("node-cache: successfully saved metadata into file: %s\n", file) - return nil -} - -// Get retrieves the metadata from cache directory with identifier name -func (nc *NodeCache) Get(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - // #nosec - fp, err := os.Open(file) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return &CacheEntryNotFound{err} - } - - return fmt.Errorf("node-cache: open error for %s: %w", file, err) - } - - defer func() { - if err = fp.Close(); err != nil { - klog.Warningf("failed to close file:%s %v", fp.Name(), err) - } - }() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(data); err != nil { - return fmt.Errorf("rbd: decode error: %w", err) - } - - return nil -} - -// Delete deletes the metadata file from cache directory with identifier name -func (nc *NodeCache) Delete(identifier string) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - err := os.Remove(file) - if err != nil { - if os.IsNotExist(err) { - DebugLogMsg("node-cache: cannot delete missing metadata storage file %s, assuming it's already deleted", file) - return nil - } - - return fmt.Errorf("node-cache: error removing file %s: %w", file, err) - } - DebugLogMsg("node-cache: successfully deleted metadata storage file at: %+v\n", file) - return nil -} diff --git a/internal/util/util.go b/internal/util/util.go index 26e6e170f..ce913fb51 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -22,7 +22,6 @@ import ( "fmt" "math" "os" - "path" "strconv" "strings" "time" @@ -79,17 +78,13 @@ var ( // Config holds the parameters list which can be configured type Config struct { - Vtype string // driver type [rbd|cephfs|liveness] - Endpoint string // CSI endpoint - DriverName string // name of the driver - NodeID string // node id - InstanceID string // unique ID distinguishing this instance of Ceph CSI - MetadataStorage string // metadata persistence method [node|k8s_configmap] - PluginPath string // location of cephcsi plugin - DomainLabels string // list of domain labels to read from the node - - // cephfs related flags - MountCacheDir string // mount info cache save dir + Vtype string // driver type [rbd|cephfs|liveness] + Endpoint string // CSI endpoint + DriverName string // name of the driver + NodeID string // node id + InstanceID string // unique ID distinguishing this instance of Ceph CSI + PluginPath string // location of cephcsi plugin + DomainLabels string // list of domain labels to read from the node // metrics related flags MetricsPath string // path of prometheus endpoint where metrics will be available @@ -124,27 +119,6 @@ type Config struct { MaxSnapshotsOnImage uint } -// CreatePersistanceStorage creates storage path and initializes new cache -func CreatePersistanceStorage(sPath, metaDataStore, pluginPath string) (CachePersister, error) { - var err error - if err = CreateMountPoint(path.Join(sPath, "controller")); err != nil { - klog.Errorf("failed to create persistent storage for controller: %v", err) - return nil, err - } - - if err = CreateMountPoint(path.Join(sPath, "node")); err != nil { - klog.Errorf("failed to create persistent storage for node: %v", err) - return nil, err - } - - cp, err := NewCachePersister(metaDataStore, pluginPath) - if err != nil { - klog.Errorf("failed to define cache persistence method: %v", err) - return nil, err - } - return cp, err -} - // ValidateDriverName validates the driver name func ValidateDriverName(driverName string) error { if driverName == "" {