diff --git a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml index 66f2bd1a0..fcc6581b7 100644 --- a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml @@ -82,7 +82,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=$(DRIVER_NAME)" - - "--metadatastorage=k8s_configmap" {{- if .Values.topology.enabled }} - "--domainlabels={{ .Values.topology.domainLabels | join "," }}" {{- end }} @@ -97,10 +96,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.pluginSocketFile }}" securityContext: diff --git a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml index 685ed2eb1..b9e25e485 100644 --- a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml +++ b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml @@ -98,7 +98,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=$(DRIVER_NAME)" - - "--metadatastorage=k8s_configmap" env: - name: POD_IP valueFrom: @@ -110,10 +109,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.provisionerSocketFile }}" securityContext: diff --git a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml index 89ad83542..eb31759d3 100644 --- a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml @@ -90,10 +90,6 @@ spec: fieldPath: status.podIP - name: DRIVER_NAME value: {{ .Values.driverName }} - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: NODE_ID valueFrom: fieldRef: diff --git a/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml b/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml index 290ea8112..55777e785 100644 --- a/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml +++ b/charts/ceph-csi-rbd/templates/provisioner-deployment.yaml @@ -131,10 +131,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: "unix:///csi/{{ .Values.provisionerSocketFile }}" securityContext: diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index a17e78722..c4c73d55c 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -20,7 +20,6 @@ import ( "flag" "fmt" "os" - "path/filepath" "runtime" "time" @@ -54,8 +53,6 @@ func init() { flag.StringVar(&conf.NodeID, "nodeid", "", "node id") flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ " instances, when sharing Ceph clusters across CSI instances for provisioning") - flag.StringVar(&conf.MetadataStorage, "metadatastorage", "", "metadata persistence method [node|k8s_configmap]") - flag.StringVar(&conf.PluginPath, "pluginpath", "/var/lib/kubelet/plugins/", "the location of cephcsi plugin") flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") flag.BoolVar(&conf.IsControllerServer, "controllerserver", false, "start cephcsi controller server") flag.BoolVar(&conf.IsNodeServer, "nodeserver", false, "start cephcsi node server") @@ -63,8 +60,6 @@ func init() { " domain the node belongs to, separated by ','") // cephfs related flags - // marking this as deprecated, remove it in next major release - flag.StringVar(&conf.MountCacheDir, "mountcachedir", "", "mount info cache save dir") flag.BoolVar(&conf.ForceKernelCephFS, "forcecephkernelclient", false, "enable Ceph Kernel clients on kernel < 4.17 which support quotas") // liveness/grpc metrics related flags @@ -123,7 +118,6 @@ func main() { os.Exit(0) } util.DefaultLog("Driver version: %s and Git version: %s", util.DriverVersion, util.GitCommit) - var cp util.CachePersister if conf.Vtype == "" { klog.Fatalln("driver type not specified") @@ -134,14 +128,6 @@ func main() { if err != nil { klog.Fatalln(err) // calls exit } - csipluginPath := filepath.Join(conf.PluginPath, dname) - if conf.MetadataStorage != "" { - cp, err = util.CreatePersistanceStorage( - csipluginPath, conf.MetadataStorage, conf.PluginPath) - if err != nil { - os.Exit(1) - } - } // the driver may need a higher PID limit for handling all concurrent requests if conf.PidLimit != 0 { @@ -183,14 +169,11 @@ func main() { validateCloneDepthFlag(&conf) validateMaxSnaphostFlag(&conf) driver := rbd.NewDriver() - driver.Run(&conf, cp) + driver.Run(&conf) case cephfsType: - if conf.MountCacheDir != "" { - klog.Warning("mountcachedir option is deprecated") - } driver := cephfs.NewDriver() - driver.Run(&conf, cp) + driver.Run(&conf) case livenessType: liveness.Run(&conf) diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index 32c73bf0b..5586a1851 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -91,7 +91,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=cephfs.csi.ceph.com" - - "--metadatastorage=k8s_configmap" - "--pidlimit=-1" env: - name: POD_IP @@ -102,10 +101,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi-provisioner.sock imagePullPolicy: "IfNotPresent" diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index adea0ff0c..d62ca9d8b 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -62,7 +62,6 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=cephfs.csi.ceph.com" - - "--metadatastorage=k8s_configmap" # If topology based provisioning is desired, configure required # node labels representing the nodes topology domain # and pass the label names below, for CSI to consume and advertize @@ -77,10 +76,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index fb90ff570..8e33edfa0 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -74,10 +74,6 @@ spec: env: - name: ADDRESS value: /csi/csi-provisioner.sock - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace imagePullPolicy: "IfNotPresent" volumeMounts: - name: socket-dir diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index 8a4ca5eae..de6736155 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -77,10 +77,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: CSI_ENDPOINT value: unix:///csi/csi.sock imagePullPolicy: "IfNotPresent" diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 690ba3562..84c4fffc0 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -44,14 +44,13 @@ that should be resolved in v14.2.3. **Available command line arguments:** | Option | Default value | Description | -|---------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| ------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | | `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | | `--nodeid` | _empty_ | This node's ID | | `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | | `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host | -| `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | | `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available | @@ -72,9 +71,6 @@ CephFS mounter on kernels < 4.17. the path of your k8s config file (if not specified, the plugin will assume you're running it inside a k8s cluster and find the config itself). -`POD_NAMESPACE`: if you use `k8s_configmap` as metadata store, `POD_NAMESPACE` -is used to define in which namespace you want the configmaps to be stored - **Available volume parameters:** | Parameter | Required | Description | @@ -83,8 +79,8 @@ is used to define in which namespace you want the configmaps to be stored | `fsName` | yes | CephFS filesystem name into which the volume shall be created | | `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter". | | `pool` | no | Ceph pool into which volume data shall be stored | -| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). | -| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) +| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). | +| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) | | `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. | | `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. | | `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value | diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index a56e1134c..ae5b396ea 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -33,7 +33,6 @@ make image-cephcsi | `--nodeid` | _empty_ | This node's ID | | `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | -| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | | `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available | diff --git a/internal/cephfs/cephuser.go b/internal/cephfs/cephuser.go deleted file mode 100644 index 1ac674397..000000000 --- a/internal/cephfs/cephuser.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "context" - - "github.com/ceph/ceph-csi/internal/util" -) - -const ( - cephUserPrefix = "user-" - cephEntityClientPrefix = "client." -) - -func genUserIDs(adminCr *util.Credentials, volID volumeID) (adminID, userID string) { - return cephEntityClientPrefix + adminCr.ID, cephEntityClientPrefix + getCephUserName(volID) -} - -func getCephUserName(volID volumeID) string { - return cephUserPrefix + string(volID) -} - -func deleteCephUserDeprecated(ctx context.Context, volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error { - adminID, userID := genUserIDs(adminCr, volID) - - // TODO: Need to return success if userID is not found - return execCommandErr(ctx, "ceph", - "-m", volOptions.Monitors, - "-n", adminID, - "--keyfile="+adminCr.KeyFile, - "-c", util.CephConfigPath, - "auth", "rm", userID, - ) -} diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index c1051ed32..fa8502486 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -33,17 +33,11 @@ import ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer - MetadataStore util.CachePersister // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks } -type controllerCacheEntry struct { - VolOptions volumeOptions - VolumeID volumeID -} - // createBackingVolume creates the backing subvolume and on any error cleans up any created entities func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { cr, err := util.NewAdminCredentials(secret) @@ -156,72 +150,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return &csi.CreateVolumeResponse{Volume: volume}, nil } -// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin, -// that have state information stored in files or kubernetes config maps -func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - var ( - volID = volumeID(req.GetVolumeId()) - secrets = req.GetSecrets() - ) - - ce := &controllerCacheEntry{} - if err := cs.MetadataStore.Get(string(volID), ce); err != nil { - if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Warningf(util.Log(ctx, "cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)"), volID, err) - return &csi.DeleteVolumeResponse{}, nil - } - - return nil, status.Error(codes.Internal, err.Error()) - } - - if !ce.VolOptions.ProvisionVolume { - // DeleteVolume() is forbidden for statically provisioned volumes! - - klog.Warningf(util.Log(ctx, "volume %s is provisioned statically, aborting delete"), volID) - return &csi.DeleteVolumeResponse{}, nil - } - - // mons may have changed since create volume, - // retrieve the latest mons and override old mons - if mon, secretsErr := util.GetMonValFromSecret(secrets); secretsErr == nil && len(mon) > 0 { - util.ExtendedLog(ctx, "overriding monitors [%q] with [%q] for volume %s", ce.VolOptions.Monitors, mon, volID) - ce.VolOptions.Monitors = mon - } - - // Deleting a volume requires admin credentials - - cr, err := util.NewAdminCredentials(secrets) - if err != nil { - klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - defer cr.DeleteCredentials() - - if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID)) - } - defer cs.VolumeLocks.Release(string(volID)) - - if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil { - klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err = deleteCephUserDeprecated(ctx, &ce.VolOptions, cr, volID); err != nil { - klog.Errorf(util.Log(ctx, "failed to delete ceph user for volume %s: %v"), volID, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err = cs.MetadataStore.Delete(string(volID)); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - util.DebugLog(ctx, "cephfs: successfully deleted volume %s", volID) - - return &csi.DeleteVolumeResponse{}, nil -} - // DeleteVolume deletes the volume in backend and its reservation func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if err := cs.validateDeleteVolumeRequest(); err != nil { @@ -257,12 +185,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } - // ErrInvalidVolID may mean this is an 1.0.0 version volume - var eivi ErrInvalidVolID - if errors.As(err, &eivi) && cs.MetadataStore != nil { - return cs.deleteVolumeDeprecated(ctx, req) - } - // All errors other than ErrVolumeNotFound should return an error back to the caller var evnf ErrVolumeNotFound if !errors.As(err, &evnf) { diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index d4f9476a7..df14c216d 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -37,9 +37,6 @@ const ( radosNamespace = "csi" ) -// PluginFolder defines the location of ceph plugin -var PluginFolder = "" - // Driver contains the default identity,node and controller struct type Driver struct { cd *csicommon.CSIDriver @@ -72,10 +69,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { } // NewControllerServer initialize a controller server for ceph CSI driver -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { +func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), - MetadataStore: cachePersister, VolumeLocks: util.NewVolumeLocks(), } } @@ -90,13 +86,11 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { +func (fs *Driver) Run(conf *util.Config) { var err error var topology map[string]string // Configuration - PluginFolder = conf.PluginPath - if err = loadAvailableMounters(conf); err != nil { klog.Fatalf("cephfs: failed to load ceph mounters: %v", err) } @@ -142,7 +136,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { } if conf.IsControllerServer { - fs.cs = NewControllerServer(fs.cd, cachePersister) + fs.cs = NewControllerServer(fs.cd) } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) @@ -150,7 +144,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { klog.Fatalln(err) } fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology) - fs.cs = NewControllerServer(fs.cd, cachePersister) + fs.cs = NewControllerServer(fs.cd) } server := csicommon.NewNonBlockingGRPCServer() diff --git a/internal/cephfs/util.go b/internal/cephfs/util.go index b7482ef27..8ae92ae61 100644 --- a/internal/cephfs/util.go +++ b/internal/cephfs/util.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "os" "os/exec" "github.com/ceph/ceph-csi/internal/util" @@ -77,11 +76,6 @@ func execCommandJSON(ctx context.Context, v interface{}, program string, args .. return nil } -func pathExists(p string) bool { - _, err := os.Stat(p) - return err == nil -} - // Controller service request validation func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { diff --git a/internal/cephfs/volume.go b/internal/cephfs/volume.go index 9c89e1075..2d36476b8 100644 --- a/internal/cephfs/volume.go +++ b/internal/cephfs/volume.go @@ -19,7 +19,6 @@ package cephfs import ( "context" "fmt" - "os" "path" "strconv" "strings" @@ -40,18 +39,14 @@ var ( inValidCommmand = "no valid command found" ) -func getCephRootVolumePathLocalDeprecated(volID volumeID) string { - return path.Join(getCephRootPathLocalDeprecated(volID), "csi-volumes", string(volID)) -} +const ( + cephEntityClientPrefix = "client." +) func getVolumeRootPathCephDeprecated(volID volumeID) string { return path.Join("/", "csi-volumes", string(volID)) } -func getCephRootPathLocalDeprecated(volID volumeID) string { - return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID)) -} - func getVolumeNotFoundErrorString(volID volumeID) string { return fmt.Sprintf("Error ENOENT: Subvolume '%s' not found", string(volID)) } @@ -201,70 +196,6 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede return createVolume(ctx, volOptions, cr, volID, bytesQuota) } -func mountCephRoot(ctx context.Context, volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error { - cephRoot := getCephRootPathLocalDeprecated(volID) - - // Root path is not set for dynamically provisioned volumes - // Access to cephfs's / is required - volOptions.RootPath = "/" - - if err := util.CreateMountPoint(cephRoot); err != nil { - return err - } - - m, err := newMounter(volOptions) - if err != nil { - return fmt.Errorf("failed to create mounter: %v", err) - } - - if err = m.mount(ctx, cephRoot, adminCr, volOptions); err != nil { - return fmt.Errorf("error mounting ceph root: %v", err) - } - - return nil -} - -func unmountCephRoot(ctx context.Context, volID volumeID) { - cephRoot := getCephRootPathLocalDeprecated(volID) - - if err := unmountVolume(ctx, cephRoot); err != nil { - klog.Errorf(util.Log(ctx, "failed to unmount %s with error %s"), cephRoot, err) - } else { - if err := os.Remove(cephRoot); err != nil { - klog.Errorf(util.Log(ctx, "failed to remove %s with error %s"), cephRoot, err) - } - } -} - -func purgeVolumeDeprecated(ctx context.Context, volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error { - if err := mountCephRoot(ctx, volID, volOptions, adminCr); err != nil { - return err - } - defer unmountCephRoot(ctx, volID) - - var ( - volRoot = getCephRootVolumePathLocalDeprecated(volID) - volRootDeleting = volRoot + "-deleting" - ) - - if pathExists(volRoot) { - if err := os.Rename(volRoot, volRootDeleting); err != nil { - return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err) - } - } else { - if !pathExists(volRootDeleting) { - util.DebugLog(ctx, "cephfs: volume %s not found, assuming it to be already deleted", volID) - return nil - } - } - - if err := os.RemoveAll(volRootDeleting); err != nil { - return fmt.Errorf("failed to delete volume %s: %v", volID, err) - } - - return nil -} - func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error { err := execCommandErr( ctx, diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 85b8af0f8..05f028b46 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -40,7 +40,6 @@ const ( // controller server spec. type ControllerServer struct { *csicommon.DefaultControllerServer - MetadataStore util.CachePersister // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID/volume name) return an Aborted error VolumeLocks *util.VolumeLocks @@ -100,7 +99,7 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi. } // if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks - rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock), false) + rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock)) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -461,60 +460,6 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source") } -// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin -func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.DeleteVolumeRequest, cr *util.Credentials) (*csi.DeleteVolumeResponse, error) { - volumeID := req.GetVolumeId() - - if cs.MetadataStore == nil { - return nil, status.Errorf(codes.InvalidArgument, "missing metadata store configuration to"+ - " proceed with deleting legacy volume ID (%s)", volumeID) - } - - if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) - } - defer cs.VolumeLocks.Release(volumeID) - - rbdVol := &rbdVolume{} - if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { - if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Warningf(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err) - return &csi.DeleteVolumeResponse{}, nil - } - - return nil, status.Error(codes.Internal, err.Error()) - } - - // Fill up Monitors - if err := updateMons(rbdVol, nil, req.GetSecrets()); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - defer rbdVol.Destroy() - - err := rbdVol.Connect(cr) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - // Update rbdImageName as the VolName when dealing with version 1 volumes - rbdVol.RbdImageName = rbdVol.VolName - - util.DebugLog(ctx, "deleting legacy volume %s", rbdVol.VolName) - if err := deleteImage(ctx, rbdVol, cr); err != nil { - // TODO: can we detect "already deleted" situations here and proceed? - klog.Errorf(util.Log(ctx, "failed to delete legacy rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err := cs.MetadataStore.Delete(volumeID); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - return &csi.DeleteVolumeResponse{}, nil -} - // DeleteVolume deletes the volume in backend and removes the volume metadata // from store // TODO: make this function less complex @@ -554,19 +499,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } - // If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt - // to process it as such - var eivi ErrInvalidVolID - if errors.As(err, &eivi) { - if isLegacyVolumeID(volumeID) { - util.UsefulLog(ctx, "attempting deletion of potential legacy volume (%s)", volumeID) - return cs.DeleteLegacyVolume(ctx, req, cr) - } - - // Consider unknown volumeID as a successfully deleted volume - return &csi.DeleteVolumeResponse{}, nil - } - // if error is ErrKeyNotFound, then a previous attempt at deletion was complete // or partially complete (image and imageOMap are garbage collected already), hence return // success as deletion is complete diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index 65ad8ec55..5efef6c97 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -75,10 +75,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { } // NewControllerServer initialize a controller server for rbd CSI driver -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { +func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), - MetadataStore: cachePersister, VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), } @@ -96,7 +95,7 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // Run start a non-blocking grpc controller,node and identityserver for // rbd CSI driver which can serve multiple parallel requests -func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { +func (r *Driver) Run(conf *util.Config) { var err error var topology map[string]string @@ -155,7 +154,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { } if conf.IsControllerServer { - r.cs = NewControllerServer(r.cd, cachePersister) + r.cs = NewControllerServer(r.cd) } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) @@ -166,7 +165,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) } - r.cs = NewControllerServer(r.cd, cachePersister) + r.cs = NewControllerServer(r.cd) } s := csicommon.NewNonBlockingGRPCServer() diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 081d170eb..ebc81d9ea 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "strconv" - "strings" csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/journal" @@ -155,23 +154,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } - isLegacyVolume := isLegacyVolumeID(volID) - volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume) + volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } // get rbd image name from the volume journal // for static volumes, the image name is actually the volume ID itself - // for legacy volumes (v1.0.0), the image name can be found in the staging path switch { case staticVol: volOptions.RbdImageName = volID - case isLegacyVolume: - volOptions.RbdImageName, err = getLegacyVolumeName(stagingTargetPath) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } default: var vi util.CSIIdentifier var imageAttributes *journal.ImageAttributes @@ -421,30 +413,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } -func getLegacyVolumeName(mountPath string) (string, error) { - var volName string - - if strings.HasSuffix(mountPath, "/globalmount") { - s := strings.Split(strings.TrimSuffix(mountPath, "/globalmount"), "/") - volName = s[len(s)-1] - return volName, nil - } - - if strings.HasSuffix(mountPath, "/mount") { - s := strings.Split(strings.TrimSuffix(mountPath, "/mount"), "/") - volName = s[len(s)-1] - return volName, nil - } - - // get volume name for block volume - s := strings.Split(mountPath, "/") - if len(s) == 0 { - return "", fmt.Errorf("rbd: malformed value of stage target path: %s", mountPath) - } - volName = s[len(s)-1] - return volName, nil -} - func (ns *NodeServer) mountVolumeToStagePath(ctx context.Context, req *csi.NodeStageVolumeRequest, staticVol bool, stagingPath, devicePath string) (bool, error) { readOnly := false fsType := req.GetVolumeCapability().GetMount().GetFsType() diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 0bc8aac4a..e0ef185b0 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -22,25 +22,6 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" ) -func TestGetLegacyVolumeName(t *testing.T) { - tests := []struct { - mountPath string - volName string - }{ - {"csi/vol/56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea/globalmount", "56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea"}, - {"csi/vol/9fdb7491-3469-4414-8fe2-ea96be6f7f72/mount", "9fdb7491-3469-4414-8fe2-ea96be6f7f72"}, - {"csi/vol/82cd91c4-4582-47b3-bb08-a84f8c5716d6", "82cd91c4-4582-47b3-bb08-a84f8c5716d6"}, - } - - for _, test := range tests { - if got, err := getLegacyVolumeName(test.mountPath); err != nil { - t.Errorf("getLegacyVolumeName(%s) returned error when it shouldn't: %s", test.mountPath, err.Error()) - } else if got != test.volName { - t.Errorf("getLegacyVolumeName(%s) = %s, want %s", test.mountPath, got, test.volName) - } - } -} - func TestGetStagingPath(t *testing.T) { var stagingPath string // test with nodestagevolumerequest diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index de71a51ae..4c8892ad3 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -36,7 +36,6 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" - "github.com/pborman/uuid" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cloud-provider/volume/helpers" klog "k8s.io/klog/v2" @@ -698,66 +697,7 @@ func getMonsAndClusterID(ctx context.Context, options map[string]string) (monito return } -// isLegacyVolumeID checks if passed in volume ID string conforms to volume ID naming scheme used -// by the version 1.0.0 (legacy) of the plugin, and returns true if found to be conforming -func isLegacyVolumeID(volumeID string) bool { - // Version 1.0.0 volumeID format: "csi-rbd-vol-" + UUID string - // length: 12 ("csi-rbd-vol-") + 36 (UUID string) - - // length check - if len(volumeID) != 48 { - return false - } - - // Header check - if !strings.HasPrefix(volumeID, "csi-rbd-vol-") { - return false - } - - // Trailer UUID format check - if uuid.Parse(volumeID[12:]) == nil { - return false - } - - return true -} - -// upadateMons function is used to update the rbdVolume.Monitors for volumes that were provisioned -// using the 1.0.0 version (legacy) of the plugin. -func updateMons(rbdVol *rbdVolume, options, credentials map[string]string) error { - var ok bool - - // read monitors and MonValueFromSecret from options, else check passed in rbdVolume for - // MonValueFromSecret key in credentials - monInSecret := "" - if options != nil { - if rbdVol.Monitors, ok = options["monitors"]; !ok { - rbdVol.Monitors = "" - } - if monInSecret, ok = options["monValueFromSecret"]; !ok { - monInSecret = "" - } - } else { - monInSecret = rbdVol.MonValueFromSecret - } - - // if monitors are present in secrets and we have the credentials, use monitors from the - // credentials overriding monitors from other sources - if monInSecret != "" && credentials != nil { - monsFromSecret, ok := credentials[monInSecret] - if ok { - rbdVol.Monitors = monsFromSecret - } - } - - if rbdVol.Monitors == "" { - return errors.New("either monitors or monValueFromSecret must be set") - } - - return nil -} - -func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks, isLegacyVolume bool) (*rbdVolume, error) { +func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) { var ( ok bool err error @@ -776,16 +716,9 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st rbdVol.NamePrefix = namePrefix } - if isLegacyVolume { - err = updateMons(rbdVol, volOptions, credentials) - if err != nil { - return nil, err - } - } else { - rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions) - if err != nil { - return nil, err - } + rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions) + if err != nil { + return nil, err } // if no image features is provided, it results in empty string diff --git a/internal/rbd/rbd_util_test.go b/internal/rbd/rbd_util_test.go index 8b458d617..a6637a9c5 100644 --- a/internal/rbd/rbd_util_test.go +++ b/internal/rbd/rbd_util_test.go @@ -23,24 +23,6 @@ import ( librbd "github.com/ceph/go-ceph/rbd" ) -func TestIsLegacyVolumeID(t *testing.T) { - tests := []struct { - volID string - isLegacy bool - }{ - {"prefix-bda37d42-9979-420f-9389-74362f3f98f6", false}, - {"csi-rbd-vo-f997e783-ff00-48b0-8cc7-30cb36c3df3d", false}, - {"csi-rbd-vol-this-is-clearly-not-a-valid-UUID----", false}, - {"csi-rbd-vol-b82f27de-3b3a-43f2-b5e7-9f8d0aad04e9", true}, - } - - for _, test := range tests { - if got := isLegacyVolumeID(test.volID); got != test.isLegacy { - t.Errorf("isLegacyVolumeID(%s) = %t, want %t", test.volID, got, test.isLegacy) - } - } -} - func TestHasSnapshotFeature(t *testing.T) { tests := []struct { features string diff --git a/internal/util/cachepersister.go b/internal/util/cachepersister.go deleted file mode 100644 index 6a27a7c8a..000000000 --- a/internal/util/cachepersister.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "errors" -) - -// ForAllFunc is a unary predicate for visiting all cache entries -// matching the `pattern' in CachePersister's ForAll function. -type ForAllFunc func(identifier string) error - -// CacheEntryNotFound is an error type for "Not Found" cache errors -type CacheEntryNotFound struct { - error -} - -// CachePersister interface implemented for store -type CachePersister interface { - Create(identifier string, data interface{}) error - Get(identifier string, data interface{}) error - ForAll(pattern string, destObj interface{}, f ForAllFunc) error - Delete(identifier string) error -} - -// NewCachePersister returns CachePersister based on store -func NewCachePersister(metadataStore, pluginPath string) (CachePersister, error) { - if metadataStore == "k8s_configmap" { - DebugLogMsg("cache-perister: using kubernetes configmap as metadata cache persister") - k8scm := &K8sCMCache{} - k8scm.Client = NewK8sClient() - k8scm.Namespace = GetK8sNamespace() - return k8scm, nil - } else if metadataStore == "node" { - DebugLogMsg("cache-persister: using node as metadata cache persister") - nc := &NodeCache{} - nc.BasePath = pluginPath - nc.CacheDir = "controller" - return nc, nil - } - return nil, errors.New("cache-persister: couldn't parse metadatastorage flag") -} diff --git a/internal/util/k8s.go b/internal/util/k8s.go new file mode 100644 index 000000000..34b349978 --- /dev/null +++ b/internal/util/k8s.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "os" + + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + klog "k8s.io/klog/v2" +) + +// NewK8sClient create kubernetes client +func NewK8sClient() *k8s.Clientset { + var cfg *rest.Config + var err error + cPath := os.Getenv("KUBERNETES_CONFIG_PATH") + if cPath != "" { + cfg, err = clientcmd.BuildConfigFromFlags("", cPath) + if err != nil { + klog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } else { + cfg, err = rest.InClusterConfig() + if err != nil { + klog.Errorf("Failed to get cluster config with error: %v\n", err) + os.Exit(1) + } + } + client, err := k8s.NewForConfig(cfg) + if err != nil { + klog.Errorf("Failed to create client with error: %v\n", err) + os.Exit(1) + } + return client +} diff --git a/internal/util/k8scmcache.go b/internal/util/k8scmcache.go deleted file mode 100644 index 6c2a03b8e..000000000 --- a/internal/util/k8scmcache.go +++ /dev/null @@ -1,187 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "context" - "encoding/json" - "fmt" - "os" - "regexp" - - v1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - klog "k8s.io/klog/v2" -) - -// K8sCMCache to store metadata -type K8sCMCache struct { - Client *k8s.Clientset - Namespace string -} - -const ( - defaultNamespace = "default" - - cmLabel = "csi-metadata" - cmDataKey = "content" - - csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata" -) - -// GetK8sNamespace returns pod namespace. if pod namespace is empty -// it returns default namespace -func GetK8sNamespace() string { - namespace := os.Getenv("POD_NAMESPACE") - if namespace == "" { - return defaultNamespace - } - return namespace -} - -// NewK8sClient create kubernetes client -func NewK8sClient() *k8s.Clientset { - var cfg *rest.Config - var err error - cPath := os.Getenv("KUBERNETES_CONFIG_PATH") - if cPath != "" { - cfg, err = clientcmd.BuildConfigFromFlags("", cPath) - if err != nil { - klog.Errorf("Failed to get cluster config with error: %v\n", err) - os.Exit(1) - } - } else { - cfg, err = rest.InClusterConfig() - if err != nil { - klog.Errorf("Failed to get cluster config with error: %v\n", err) - os.Exit(1) - } - } - client, err := k8s.NewForConfig(cfg) - if err != nil { - klog.Errorf("Failed to create client with error: %v\n", err) - os.Exit(1) - } - return client -} - -func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) { - cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(context.TODO(), resourceID, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return cm, nil -} - -// ForAll list the metadata in configmaps and filters outs based on the pattern -func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)} - cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(context.TODO(), listOpts) - if err != nil { - return fmt.Errorf("k8s-cm-cache: failed to list metadata configmaps: %w", err) - } - - for i := range cms.Items { - data := cms.Items[i].Data[cmDataKey] - match, err := regexp.MatchString(pattern, cms.Items[i].ObjectMeta.Name) - if err != nil { - continue - } - if !match { - continue - } - if err = json.Unmarshal([]byte(data), destObj); err != nil { - return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", cms.Items[i].ObjectMeta.Name, err) - } - if err = f(cms.Items[i].ObjectMeta.Name); err != nil { - return err - } - } - return nil -} - -// Create stores the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error { - cm, err := k8scm.getMetadataCM(identifier) - if cm != nil && err == nil { - DebugLogMsg("k8s-cm-cache: configmap %s already exists, skipping configmap creation", identifier) - return nil - } - dataJSON, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("k8s-cm-cache: JSON marshaling failed for configmap %s: %w", identifier, err) - } - cm = &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: identifier, - Namespace: k8scm.Namespace, - Labels: map[string]string{ - csiMetadataLabelAttr: cmLabel, - }, - }, - Data: map[string]string{}, - } - cm.Data[cmDataKey] = string(dataJSON) - - _, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) - if err != nil { - if apierrs.IsAlreadyExists(err) { - DebugLogMsg("k8s-cm-cache: configmap %s already exists", identifier) - return nil - } - return fmt.Errorf("k8s-cm-cache: couldn't persist %s metadata as configmap: %w", identifier, err) - } - - DebugLogMsg("k8s-cm-cache: configmap %s successfully created", identifier) - return nil -} - -// Get retrieves the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error { - cm, err := k8scm.getMetadataCM(identifier) - if err != nil { - if apierrs.IsNotFound(err) { - return &CacheEntryNotFound{err} - } - - return err - } - err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data) - if err != nil { - return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", identifier, err) - } - return nil -} - -// Delete deletes the metadata in configmaps with identifier name -func (k8scm *K8sCMCache) Delete(identifier string) error { - err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(context.TODO(), identifier, metav1.DeleteOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - DebugLogMsg("k8s-cm-cache: cannot delete missing metadata configmap %s, assuming it's already deleted", identifier) - return nil - } - - return fmt.Errorf("k8s-cm-cache: couldn't delete metadata configmap %s: %w", identifier, err) - } - DebugLogMsg("k8s-cm-cache: successfully deleted metadata configmap %s", identifier) - return nil -} diff --git a/internal/util/nodecache.go b/internal/util/nodecache.go deleted file mode 100644 index 3c5c588c2..000000000 --- a/internal/util/nodecache.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2018 The Ceph-CSI Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "os" - "path" - "path/filepath" - "regexp" - "strings" - - klog "k8s.io/klog/v2" -) - -// NodeCache to store metadata -type NodeCache struct { - BasePath string - CacheDir string -} - -var errDec = errors.New("file not found") - -// EnsureCacheDirectory creates cache directory if not present -func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error { - fullPath := path.Join(nc.BasePath, cacheDir) - if _, err := os.Stat(fullPath); os.IsNotExist(err) { - // #nosec - if err := os.Mkdir(fullPath, 0755); err != nil { - return fmt.Errorf("node-cache: failed to create %s folder: %w", fullPath, err) - } - } - return nil -} - -// ForAll list the metadata in Nodecache and filters outs based on the pattern -func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error { - err := nc.EnsureCacheDirectory(nc.CacheDir) - if err != nil { - return fmt.Errorf("node-cache: couldn't ensure cache directory exists: %w", err) - } - files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir)) - if err != nil { - return fmt.Errorf("node-cache: failed to read %s folder: %w", nc.BasePath, err) - } - cachePath := path.Join(nc.BasePath, nc.CacheDir) - for _, file := range files { - err = decodeObj(cachePath, pattern, file, destObj) - if errors.Is(err, errDec) { - continue - } else if err == nil { - if err = f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil { - return err - } - } - return err - } - return nil -} - -func decodeObj(fpath, pattern string, file os.FileInfo, destObj interface{}) error { - match, err := regexp.MatchString(pattern, file.Name()) - if err != nil || !match { - return errDec - } - if !strings.HasSuffix(file.Name(), ".json") { - return errDec - } - // #nosec - fp, err := os.Open(path.Join(fpath, file.Name())) - if err != nil { - DebugLogMsg("node-cache: open file: %s err %v", file.Name(), err) - return errDec - } - decoder := json.NewDecoder(fp) - if err = decoder.Decode(destObj); err != nil { - if err = fp.Close(); err != nil { - return fmt.Errorf("failed to close file %s: %w", file.Name(), err) - } - return fmt.Errorf("node-cache: couldn't decode file %s: %w", file.Name(), err) - } - return nil -} - -// Create creates the metadata file in cache directory with identifier name -func (nc *NodeCache) Create(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - fp, err := os.Create(file) - if err != nil { - return fmt.Errorf("node-cache: failed to create metadata storage file %s: %w", file, err) - } - - defer func() { - if err = fp.Close(); err != nil { - klog.Warningf("failed to close file:%s %v", fp.Name(), err) - } - }() - - encoder := json.NewEncoder(fp) - if err = encoder.Encode(data); err != nil { - return fmt.Errorf("node-cache: failed to encode metadata for file: %s: %w", file, err) - } - DebugLogMsg("node-cache: successfully saved metadata into file: %s\n", file) - return nil -} - -// Get retrieves the metadata from cache directory with identifier name -func (nc *NodeCache) Get(identifier string, data interface{}) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - // #nosec - fp, err := os.Open(file) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return &CacheEntryNotFound{err} - } - - return fmt.Errorf("node-cache: open error for %s: %w", file, err) - } - - defer func() { - if err = fp.Close(); err != nil { - klog.Warningf("failed to close file:%s %v", fp.Name(), err) - } - }() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(data); err != nil { - return fmt.Errorf("rbd: decode error: %w", err) - } - - return nil -} - -// Delete deletes the metadata file from cache directory with identifier name -func (nc *NodeCache) Delete(identifier string) error { - file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json") - err := os.Remove(file) - if err != nil { - if os.IsNotExist(err) { - DebugLogMsg("node-cache: cannot delete missing metadata storage file %s, assuming it's already deleted", file) - return nil - } - - return fmt.Errorf("node-cache: error removing file %s: %w", file, err) - } - DebugLogMsg("node-cache: successfully deleted metadata storage file at: %+v\n", file) - return nil -} diff --git a/internal/util/util.go b/internal/util/util.go index 26e6e170f..ce913fb51 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -22,7 +22,6 @@ import ( "fmt" "math" "os" - "path" "strconv" "strings" "time" @@ -79,17 +78,13 @@ var ( // Config holds the parameters list which can be configured type Config struct { - Vtype string // driver type [rbd|cephfs|liveness] - Endpoint string // CSI endpoint - DriverName string // name of the driver - NodeID string // node id - InstanceID string // unique ID distinguishing this instance of Ceph CSI - MetadataStorage string // metadata persistence method [node|k8s_configmap] - PluginPath string // location of cephcsi plugin - DomainLabels string // list of domain labels to read from the node - - // cephfs related flags - MountCacheDir string // mount info cache save dir + Vtype string // driver type [rbd|cephfs|liveness] + Endpoint string // CSI endpoint + DriverName string // name of the driver + NodeID string // node id + InstanceID string // unique ID distinguishing this instance of Ceph CSI + PluginPath string // location of cephcsi plugin + DomainLabels string // list of domain labels to read from the node // metrics related flags MetricsPath string // path of prometheus endpoint where metrics will be available @@ -124,27 +119,6 @@ type Config struct { MaxSnapshotsOnImage uint } -// CreatePersistanceStorage creates storage path and initializes new cache -func CreatePersistanceStorage(sPath, metaDataStore, pluginPath string) (CachePersister, error) { - var err error - if err = CreateMountPoint(path.Join(sPath, "controller")); err != nil { - klog.Errorf("failed to create persistent storage for controller: %v", err) - return nil, err - } - - if err = CreateMountPoint(path.Join(sPath, "node")); err != nil { - klog.Errorf("failed to create persistent storage for node: %v", err) - return nil, err - } - - cp, err := NewCachePersister(metaDataStore, pluginPath) - if err != nil { - klog.Errorf("failed to define cache persistence method: %v", err) - return nil, err - } - return cp, err -} - // ValidateDriverName validates the driver name func ValidateDriverName(driverName string) error { if driverName == "" {