mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-17 11:50:18 +00:00
cleanup: Remove support for Delete and Unmounting v1.1.0 PVC
as v1.0.0 is deprecated we need to remove the support for it in the Next coming (v3.0.0) release. This PR removes the support for the same. closes #882 Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
a0fd805a8b
commit
d15ded88f5
@ -82,7 +82,6 @@ spec:
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
- "--v=5"
|
||||
- "--drivername=$(DRIVER_NAME)"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
{{- if .Values.topology.enabled }}
|
||||
- "--domainlabels={{ .Values.topology.domainLabels | join "," }}"
|
||||
{{- end }}
|
||||
@ -97,10 +96,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: "unix:///csi/{{ .Values.pluginSocketFile }}"
|
||||
securityContext:
|
||||
|
@ -98,7 +98,6 @@ spec:
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
- "--v=5"
|
||||
- "--drivername=$(DRIVER_NAME)"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
env:
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
@ -110,10 +109,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: "unix:///csi/{{ .Values.provisionerSocketFile }}"
|
||||
securityContext:
|
||||
|
@ -90,10 +90,6 @@ spec:
|
||||
fieldPath: status.podIP
|
||||
- name: DRIVER_NAME
|
||||
value: {{ .Values.driverName }}
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: NODE_ID
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
|
@ -131,10 +131,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: "unix:///csi/{{ .Values.provisionerSocketFile }}"
|
||||
securityContext:
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
@ -54,8 +53,6 @@ func init() {
|
||||
flag.StringVar(&conf.NodeID, "nodeid", "", "node id")
|
||||
flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
|
||||
" instances, when sharing Ceph clusters across CSI instances for provisioning")
|
||||
flag.StringVar(&conf.MetadataStorage, "metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
|
||||
flag.StringVar(&conf.PluginPath, "pluginpath", "/var/lib/kubelet/plugins/", "the location of cephcsi plugin")
|
||||
flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups")
|
||||
flag.BoolVar(&conf.IsControllerServer, "controllerserver", false, "start cephcsi controller server")
|
||||
flag.BoolVar(&conf.IsNodeServer, "nodeserver", false, "start cephcsi node server")
|
||||
@ -63,8 +60,6 @@ func init() {
|
||||
" domain the node belongs to, separated by ','")
|
||||
|
||||
// cephfs related flags
|
||||
// marking this as deprecated, remove it in next major release
|
||||
flag.StringVar(&conf.MountCacheDir, "mountcachedir", "", "mount info cache save dir")
|
||||
flag.BoolVar(&conf.ForceKernelCephFS, "forcecephkernelclient", false, "enable Ceph Kernel clients on kernel < 4.17 which support quotas")
|
||||
|
||||
// liveness/grpc metrics related flags
|
||||
@ -123,7 +118,6 @@ func main() {
|
||||
os.Exit(0)
|
||||
}
|
||||
util.DefaultLog("Driver version: %s and Git version: %s", util.DriverVersion, util.GitCommit)
|
||||
var cp util.CachePersister
|
||||
|
||||
if conf.Vtype == "" {
|
||||
klog.Fatalln("driver type not specified")
|
||||
@ -134,14 +128,6 @@ func main() {
|
||||
if err != nil {
|
||||
klog.Fatalln(err) // calls exit
|
||||
}
|
||||
csipluginPath := filepath.Join(conf.PluginPath, dname)
|
||||
if conf.MetadataStorage != "" {
|
||||
cp, err = util.CreatePersistanceStorage(
|
||||
csipluginPath, conf.MetadataStorage, conf.PluginPath)
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// the driver may need a higher PID limit for handling all concurrent requests
|
||||
if conf.PidLimit != 0 {
|
||||
@ -183,14 +169,11 @@ func main() {
|
||||
validateCloneDepthFlag(&conf)
|
||||
validateMaxSnaphostFlag(&conf)
|
||||
driver := rbd.NewDriver()
|
||||
driver.Run(&conf, cp)
|
||||
driver.Run(&conf)
|
||||
|
||||
case cephfsType:
|
||||
if conf.MountCacheDir != "" {
|
||||
klog.Warning("mountcachedir option is deprecated")
|
||||
}
|
||||
driver := cephfs.NewDriver()
|
||||
driver.Run(&conf, cp)
|
||||
driver.Run(&conf)
|
||||
|
||||
case livenessType:
|
||||
liveness.Run(&conf)
|
||||
|
@ -91,7 +91,6 @@ spec:
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
- "--v=5"
|
||||
- "--drivername=cephfs.csi.ceph.com"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
- "--pidlimit=-1"
|
||||
env:
|
||||
- name: POD_IP
|
||||
@ -102,10 +101,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: unix:///csi/csi-provisioner.sock
|
||||
imagePullPolicy: "IfNotPresent"
|
||||
|
@ -62,7 +62,6 @@ spec:
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
- "--v=5"
|
||||
- "--drivername=cephfs.csi.ceph.com"
|
||||
- "--metadatastorage=k8s_configmap"
|
||||
# If topology based provisioning is desired, configure required
|
||||
# node labels representing the nodes topology domain
|
||||
# and pass the label names below, for CSI to consume and advertize
|
||||
@ -77,10 +76,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: unix:///csi/csi.sock
|
||||
imagePullPolicy: "IfNotPresent"
|
||||
|
@ -74,10 +74,6 @@ spec:
|
||||
env:
|
||||
- name: ADDRESS
|
||||
value: /csi/csi-provisioner.sock
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
imagePullPolicy: "IfNotPresent"
|
||||
volumeMounts:
|
||||
- name: socket-dir
|
||||
|
@ -77,10 +77,6 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: POD_NAMESPACE
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: CSI_ENDPOINT
|
||||
value: unix:///csi/csi.sock
|
||||
imagePullPolicy: "IfNotPresent"
|
||||
|
@ -44,14 +44,13 @@ that should be resolved in v14.2.3.
|
||||
**Available command line arguments:**
|
||||
|
||||
| Option | Default value | Description |
|
||||
|---------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| ------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
|
||||
| `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
|
||||
| `--nodeid` | _empty_ | This node's ID |
|
||||
| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
|
||||
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
|
||||
| `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host |
|
||||
| `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
|
||||
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
|
||||
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
|
||||
| `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available |
|
||||
@ -72,9 +71,6 @@ CephFS mounter on kernels < 4.17.
|
||||
the path of your k8s config file (if not specified, the plugin will assume
|
||||
you're running it inside a k8s cluster and find the config itself).
|
||||
|
||||
`POD_NAMESPACE`: if you use `k8s_configmap` as metadata store, `POD_NAMESPACE`
|
||||
is used to define in which namespace you want the configmaps to be stored
|
||||
|
||||
**Available volume parameters:**
|
||||
|
||||
| Parameter | Required | Description |
|
||||
@ -83,8 +79,8 @@ is used to define in which namespace you want the configmaps to be stored
|
||||
| `fsName` | yes | CephFS filesystem name into which the volume shall be created |
|
||||
| `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter". |
|
||||
| `pool` | no | Ceph pool into which volume data shall be stored |
|
||||
| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). |
|
||||
| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`)
|
||||
| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). |
|
||||
| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) |
|
||||
| `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. |
|
||||
| `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. |
|
||||
| `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value |
|
||||
|
@ -33,7 +33,6 @@ make image-cephcsi
|
||||
| `--nodeid` | _empty_ | This node's ID |
|
||||
| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
|
||||
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
|
||||
| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
|
||||
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
|
||||
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
|
||||
| `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available |
|
||||
|
@ -1,49 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
)
|
||||
|
||||
const (
|
||||
cephUserPrefix = "user-"
|
||||
cephEntityClientPrefix = "client."
|
||||
)
|
||||
|
||||
func genUserIDs(adminCr *util.Credentials, volID volumeID) (adminID, userID string) {
|
||||
return cephEntityClientPrefix + adminCr.ID, cephEntityClientPrefix + getCephUserName(volID)
|
||||
}
|
||||
|
||||
func getCephUserName(volID volumeID) string {
|
||||
return cephUserPrefix + string(volID)
|
||||
}
|
||||
|
||||
func deleteCephUserDeprecated(ctx context.Context, volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error {
|
||||
adminID, userID := genUserIDs(adminCr, volID)
|
||||
|
||||
// TODO: Need to return success if userID is not found
|
||||
return execCommandErr(ctx, "ceph",
|
||||
"-m", volOptions.Monitors,
|
||||
"-n", adminID,
|
||||
"--keyfile="+adminCr.KeyFile,
|
||||
"-c", util.CephConfigPath,
|
||||
"auth", "rm", userID,
|
||||
)
|
||||
}
|
@ -33,17 +33,11 @@ import (
|
||||
// controller server spec.
|
||||
type ControllerServer struct {
|
||||
*csicommon.DefaultControllerServer
|
||||
MetadataStore util.CachePersister
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
}
|
||||
|
||||
type controllerCacheEntry struct {
|
||||
VolOptions volumeOptions
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities
|
||||
func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := util.NewAdminCredentials(secret)
|
||||
@ -156,72 +150,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
return &csi.CreateVolumeResponse{Volume: volume}, nil
|
||||
}
|
||||
|
||||
// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin,
|
||||
// that have state information stored in files or kubernetes config maps
|
||||
func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
var (
|
||||
volID = volumeID(req.GetVolumeId())
|
||||
secrets = req.GetSecrets()
|
||||
)
|
||||
|
||||
ce := &controllerCacheEntry{}
|
||||
if err := cs.MetadataStore.Get(string(volID), ce); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Warningf(util.Log(ctx, "cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)"), volID, err)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if !ce.VolOptions.ProvisionVolume {
|
||||
// DeleteVolume() is forbidden for statically provisioned volumes!
|
||||
|
||||
klog.Warningf(util.Log(ctx, "volume %s is provisioned statically, aborting delete"), volID)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// mons may have changed since create volume,
|
||||
// retrieve the latest mons and override old mons
|
||||
if mon, secretsErr := util.GetMonValFromSecret(secrets); secretsErr == nil && len(mon) > 0 {
|
||||
util.ExtendedLog(ctx, "overriding monitors [%q] with [%q] for volume %s", ce.VolOptions.Monitors, mon, volID)
|
||||
ce.VolOptions.Monitors = mon
|
||||
}
|
||||
|
||||
// Deleting a volume requires admin credentials
|
||||
|
||||
cr, err := util.NewAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired {
|
||||
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID))
|
||||
}
|
||||
defer cs.VolumeLocks.Release(string(volID))
|
||||
|
||||
if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = deleteCephUserDeprecated(ctx, &ce.VolOptions, cr, volID); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete ceph user for volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = cs.MetadataStore.Delete(string(volID)); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
util.DebugLog(ctx, "cephfs: successfully deleted volume %s", volID)
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// DeleteVolume deletes the volume in backend and its reservation
|
||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
||||
@ -257,12 +185,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// ErrInvalidVolID may mean this is an 1.0.0 version volume
|
||||
var eivi ErrInvalidVolID
|
||||
if errors.As(err, &eivi) && cs.MetadataStore != nil {
|
||||
return cs.deleteVolumeDeprecated(ctx, req)
|
||||
}
|
||||
|
||||
// All errors other than ErrVolumeNotFound should return an error back to the caller
|
||||
var evnf ErrVolumeNotFound
|
||||
if !errors.As(err, &evnf) {
|
||||
|
@ -37,9 +37,6 @@ const (
|
||||
radosNamespace = "csi"
|
||||
)
|
||||
|
||||
// PluginFolder defines the location of ceph plugin
|
||||
var PluginFolder = ""
|
||||
|
||||
// Driver contains the default identity,node and controller struct
|
||||
type Driver struct {
|
||||
cd *csicommon.CSIDriver
|
||||
@ -72,10 +69,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
||||
}
|
||||
|
||||
// NewControllerServer initialize a controller server for ceph CSI driver
|
||||
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
||||
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
|
||||
return &ControllerServer{
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
MetadataStore: cachePersister,
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
}
|
||||
}
|
||||
@ -90,13 +86,11 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string)
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// ceph CSI driver which can serve multiple parallel requests
|
||||
func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
func (fs *Driver) Run(conf *util.Config) {
|
||||
var err error
|
||||
var topology map[string]string
|
||||
|
||||
// Configuration
|
||||
PluginFolder = conf.PluginPath
|
||||
|
||||
if err = loadAvailableMounters(conf); err != nil {
|
||||
klog.Fatalf("cephfs: failed to load ceph mounters: %v", err)
|
||||
}
|
||||
@ -142,7 +136,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
}
|
||||
|
||||
if conf.IsControllerServer {
|
||||
fs.cs = NewControllerServer(fs.cd, cachePersister)
|
||||
fs.cs = NewControllerServer(fs.cd)
|
||||
}
|
||||
if !conf.IsControllerServer && !conf.IsNodeServer {
|
||||
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
|
||||
@ -150,7 +144,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
klog.Fatalln(err)
|
||||
}
|
||||
fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology)
|
||||
fs.cs = NewControllerServer(fs.cd, cachePersister)
|
||||
fs.cs = NewControllerServer(fs.cd)
|
||||
}
|
||||
|
||||
server := csicommon.NewNonBlockingGRPCServer()
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
@ -77,11 +76,6 @@ func execCommandJSON(ctx context.Context, v interface{}, program string, args ..
|
||||
return nil
|
||||
}
|
||||
|
||||
func pathExists(p string) bool {
|
||||
_, err := os.Stat(p)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Controller service request validation
|
||||
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||
|
@ -19,7 +19,6 @@ package cephfs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -40,18 +39,14 @@ var (
|
||||
inValidCommmand = "no valid command found"
|
||||
)
|
||||
|
||||
func getCephRootVolumePathLocalDeprecated(volID volumeID) string {
|
||||
return path.Join(getCephRootPathLocalDeprecated(volID), "csi-volumes", string(volID))
|
||||
}
|
||||
const (
|
||||
cephEntityClientPrefix = "client."
|
||||
)
|
||||
|
||||
func getVolumeRootPathCephDeprecated(volID volumeID) string {
|
||||
return path.Join("/", "csi-volumes", string(volID))
|
||||
}
|
||||
|
||||
func getCephRootPathLocalDeprecated(volID volumeID) string {
|
||||
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
|
||||
}
|
||||
|
||||
func getVolumeNotFoundErrorString(volID volumeID) string {
|
||||
return fmt.Sprintf("Error ENOENT: Subvolume '%s' not found", string(volID))
|
||||
}
|
||||
@ -201,70 +196,6 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
|
||||
return createVolume(ctx, volOptions, cr, volID, bytesQuota)
|
||||
}
|
||||
|
||||
func mountCephRoot(ctx context.Context, volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
|
||||
cephRoot := getCephRootPathLocalDeprecated(volID)
|
||||
|
||||
// Root path is not set for dynamically provisioned volumes
|
||||
// Access to cephfs's / is required
|
||||
volOptions.RootPath = "/"
|
||||
|
||||
if err := util.CreateMountPoint(cephRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err := newMounter(volOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create mounter: %v", err)
|
||||
}
|
||||
|
||||
if err = m.mount(ctx, cephRoot, adminCr, volOptions); err != nil {
|
||||
return fmt.Errorf("error mounting ceph root: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func unmountCephRoot(ctx context.Context, volID volumeID) {
|
||||
cephRoot := getCephRootPathLocalDeprecated(volID)
|
||||
|
||||
if err := unmountVolume(ctx, cephRoot); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to unmount %s with error %s"), cephRoot, err)
|
||||
} else {
|
||||
if err := os.Remove(cephRoot); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to remove %s with error %s"), cephRoot, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func purgeVolumeDeprecated(ctx context.Context, volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := mountCephRoot(ctx, volID, volOptions, adminCr); err != nil {
|
||||
return err
|
||||
}
|
||||
defer unmountCephRoot(ctx, volID)
|
||||
|
||||
var (
|
||||
volRoot = getCephRootVolumePathLocalDeprecated(volID)
|
||||
volRootDeleting = volRoot + "-deleting"
|
||||
)
|
||||
|
||||
if pathExists(volRoot) {
|
||||
if err := os.Rename(volRoot, volRootDeleting); err != nil {
|
||||
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
|
||||
}
|
||||
} else {
|
||||
if !pathExists(volRootDeleting) {
|
||||
util.DebugLog(ctx, "cephfs: volume %s not found, assuming it to be already deleted", volID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(volRootDeleting); err != nil {
|
||||
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
err := execCommandErr(
|
||||
ctx,
|
||||
|
@ -40,7 +40,6 @@ const (
|
||||
// controller server spec.
|
||||
type ControllerServer struct {
|
||||
*csicommon.DefaultControllerServer
|
||||
MetadataStore util.CachePersister
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
@ -100,7 +99,7 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi.
|
||||
}
|
||||
|
||||
// if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks
|
||||
rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock), false)
|
||||
rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock))
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
@ -461,60 +460,6 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u
|
||||
return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source")
|
||||
}
|
||||
|
||||
// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin
|
||||
func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.DeleteVolumeRequest, cr *util.Credentials) (*csi.DeleteVolumeResponse, error) {
|
||||
volumeID := req.GetVolumeId()
|
||||
|
||||
if cs.MetadataStore == nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "missing metadata store configuration to"+
|
||||
" proceed with deleting legacy volume ID (%s)", volumeID)
|
||||
}
|
||||
|
||||
if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
||||
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volumeID)
|
||||
|
||||
rbdVol := &rbdVolume{}
|
||||
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.Warningf(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Fill up Monitors
|
||||
if err := updateMons(rbdVol, nil, req.GetSecrets()); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
defer rbdVol.Destroy()
|
||||
|
||||
err := rbdVol.Connect(cr)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Update rbdImageName as the VolName when dealing with version 1 volumes
|
||||
rbdVol.RbdImageName = rbdVol.VolName
|
||||
|
||||
util.DebugLog(ctx, "deleting legacy volume %s", rbdVol.VolName)
|
||||
if err := deleteImage(ctx, rbdVol, cr); err != nil {
|
||||
// TODO: can we detect "already deleted" situations here and proceed?
|
||||
klog.Errorf(util.Log(ctx, "failed to delete legacy rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := cs.MetadataStore.Delete(volumeID); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// DeleteVolume deletes the volume in backend and removes the volume metadata
|
||||
// from store
|
||||
// TODO: make this function less complex
|
||||
@ -554,19 +499,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
|
||||
// to process it as such
|
||||
var eivi ErrInvalidVolID
|
||||
if errors.As(err, &eivi) {
|
||||
if isLegacyVolumeID(volumeID) {
|
||||
util.UsefulLog(ctx, "attempting deletion of potential legacy volume (%s)", volumeID)
|
||||
return cs.DeleteLegacyVolume(ctx, req, cr)
|
||||
}
|
||||
|
||||
// Consider unknown volumeID as a successfully deleted volume
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||
// or partially complete (image and imageOMap are garbage collected already), hence return
|
||||
// success as deletion is complete
|
||||
|
@ -75,10 +75,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
||||
}
|
||||
|
||||
// NewControllerServer initialize a controller server for rbd CSI driver
|
||||
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
||||
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
|
||||
return &ControllerServer{
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
MetadataStore: cachePersister,
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
SnapshotLocks: util.NewVolumeLocks(),
|
||||
}
|
||||
@ -96,7 +95,7 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string)
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// rbd CSI driver which can serve multiple parallel requests
|
||||
func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
func (r *Driver) Run(conf *util.Config) {
|
||||
var err error
|
||||
var topology map[string]string
|
||||
|
||||
@ -155,7 +154,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
}
|
||||
|
||||
if conf.IsControllerServer {
|
||||
r.cs = NewControllerServer(r.cd, cachePersister)
|
||||
r.cs = NewControllerServer(r.cd)
|
||||
}
|
||||
if !conf.IsControllerServer && !conf.IsNodeServer {
|
||||
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
|
||||
@ -166,7 +165,7 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to start node server, err %v\n", err)
|
||||
}
|
||||
r.cs = NewControllerServer(r.cd, cachePersister)
|
||||
r.cs = NewControllerServer(r.cd)
|
||||
}
|
||||
|
||||
s := csicommon.NewNonBlockingGRPCServer()
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/journal"
|
||||
@ -155,23 +154,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
isLegacyVolume := isLegacyVolumeID(volID)
|
||||
volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume)
|
||||
volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// get rbd image name from the volume journal
|
||||
// for static volumes, the image name is actually the volume ID itself
|
||||
// for legacy volumes (v1.0.0), the image name can be found in the staging path
|
||||
switch {
|
||||
case staticVol:
|
||||
volOptions.RbdImageName = volID
|
||||
case isLegacyVolume:
|
||||
volOptions.RbdImageName, err = getLegacyVolumeName(stagingTargetPath)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
default:
|
||||
var vi util.CSIIdentifier
|
||||
var imageAttributes *journal.ImageAttributes
|
||||
@ -421,30 +413,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func getLegacyVolumeName(mountPath string) (string, error) {
|
||||
var volName string
|
||||
|
||||
if strings.HasSuffix(mountPath, "/globalmount") {
|
||||
s := strings.Split(strings.TrimSuffix(mountPath, "/globalmount"), "/")
|
||||
volName = s[len(s)-1]
|
||||
return volName, nil
|
||||
}
|
||||
|
||||
if strings.HasSuffix(mountPath, "/mount") {
|
||||
s := strings.Split(strings.TrimSuffix(mountPath, "/mount"), "/")
|
||||
volName = s[len(s)-1]
|
||||
return volName, nil
|
||||
}
|
||||
|
||||
// get volume name for block volume
|
||||
s := strings.Split(mountPath, "/")
|
||||
if len(s) == 0 {
|
||||
return "", fmt.Errorf("rbd: malformed value of stage target path: %s", mountPath)
|
||||
}
|
||||
volName = s[len(s)-1]
|
||||
return volName, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) mountVolumeToStagePath(ctx context.Context, req *csi.NodeStageVolumeRequest, staticVol bool, stagingPath, devicePath string) (bool, error) {
|
||||
readOnly := false
|
||||
fsType := req.GetVolumeCapability().GetMount().GetFsType()
|
||||
|
@ -22,25 +22,6 @@ import (
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
)
|
||||
|
||||
func TestGetLegacyVolumeName(t *testing.T) {
|
||||
tests := []struct {
|
||||
mountPath string
|
||||
volName string
|
||||
}{
|
||||
{"csi/vol/56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea/globalmount", "56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea"},
|
||||
{"csi/vol/9fdb7491-3469-4414-8fe2-ea96be6f7f72/mount", "9fdb7491-3469-4414-8fe2-ea96be6f7f72"},
|
||||
{"csi/vol/82cd91c4-4582-47b3-bb08-a84f8c5716d6", "82cd91c4-4582-47b3-bb08-a84f8c5716d6"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
if got, err := getLegacyVolumeName(test.mountPath); err != nil {
|
||||
t.Errorf("getLegacyVolumeName(%s) returned error when it shouldn't: %s", test.mountPath, err.Error())
|
||||
} else if got != test.volName {
|
||||
t.Errorf("getLegacyVolumeName(%s) = %s, want %s", test.mountPath, got, test.volName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetStagingPath(t *testing.T) {
|
||||
var stagingPath string
|
||||
// test with nodestagevolumerequest
|
||||
|
@ -36,7 +36,6 @@ import (
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/pborman/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/cloud-provider/volume/helpers"
|
||||
klog "k8s.io/klog/v2"
|
||||
@ -698,66 +697,7 @@ func getMonsAndClusterID(ctx context.Context, options map[string]string) (monito
|
||||
return
|
||||
}
|
||||
|
||||
// isLegacyVolumeID checks if passed in volume ID string conforms to volume ID naming scheme used
|
||||
// by the version 1.0.0 (legacy) of the plugin, and returns true if found to be conforming
|
||||
func isLegacyVolumeID(volumeID string) bool {
|
||||
// Version 1.0.0 volumeID format: "csi-rbd-vol-" + UUID string
|
||||
// length: 12 ("csi-rbd-vol-") + 36 (UUID string)
|
||||
|
||||
// length check
|
||||
if len(volumeID) != 48 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Header check
|
||||
if !strings.HasPrefix(volumeID, "csi-rbd-vol-") {
|
||||
return false
|
||||
}
|
||||
|
||||
// Trailer UUID format check
|
||||
if uuid.Parse(volumeID[12:]) == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// upadateMons function is used to update the rbdVolume.Monitors for volumes that were provisioned
|
||||
// using the 1.0.0 version (legacy) of the plugin.
|
||||
func updateMons(rbdVol *rbdVolume, options, credentials map[string]string) error {
|
||||
var ok bool
|
||||
|
||||
// read monitors and MonValueFromSecret from options, else check passed in rbdVolume for
|
||||
// MonValueFromSecret key in credentials
|
||||
monInSecret := ""
|
||||
if options != nil {
|
||||
if rbdVol.Monitors, ok = options["monitors"]; !ok {
|
||||
rbdVol.Monitors = ""
|
||||
}
|
||||
if monInSecret, ok = options["monValueFromSecret"]; !ok {
|
||||
monInSecret = ""
|
||||
}
|
||||
} else {
|
||||
monInSecret = rbdVol.MonValueFromSecret
|
||||
}
|
||||
|
||||
// if monitors are present in secrets and we have the credentials, use monitors from the
|
||||
// credentials overriding monitors from other sources
|
||||
if monInSecret != "" && credentials != nil {
|
||||
monsFromSecret, ok := credentials[monInSecret]
|
||||
if ok {
|
||||
rbdVol.Monitors = monsFromSecret
|
||||
}
|
||||
}
|
||||
|
||||
if rbdVol.Monitors == "" {
|
||||
return errors.New("either monitors or monValueFromSecret must be set")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks, isLegacyVolume bool) (*rbdVolume, error) {
|
||||
func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
|
||||
var (
|
||||
ok bool
|
||||
err error
|
||||
@ -776,16 +716,9 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st
|
||||
rbdVol.NamePrefix = namePrefix
|
||||
}
|
||||
|
||||
if isLegacyVolume {
|
||||
err = updateMons(rbdVol, volOptions, credentials)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(ctx, volOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if no image features is provided, it results in empty string
|
||||
|
@ -23,24 +23,6 @@ import (
|
||||
librbd "github.com/ceph/go-ceph/rbd"
|
||||
)
|
||||
|
||||
func TestIsLegacyVolumeID(t *testing.T) {
|
||||
tests := []struct {
|
||||
volID string
|
||||
isLegacy bool
|
||||
}{
|
||||
{"prefix-bda37d42-9979-420f-9389-74362f3f98f6", false},
|
||||
{"csi-rbd-vo-f997e783-ff00-48b0-8cc7-30cb36c3df3d", false},
|
||||
{"csi-rbd-vol-this-is-clearly-not-a-valid-UUID----", false},
|
||||
{"csi-rbd-vol-b82f27de-3b3a-43f2-b5e7-9f8d0aad04e9", true},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
if got := isLegacyVolumeID(test.volID); got != test.isLegacy {
|
||||
t.Errorf("isLegacyVolumeID(%s) = %t, want %t", test.volID, got, test.isLegacy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHasSnapshotFeature(t *testing.T) {
|
||||
tests := []struct {
|
||||
features string
|
||||
|
@ -1,56 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
// ForAllFunc is a unary predicate for visiting all cache entries
|
||||
// matching the `pattern' in CachePersister's ForAll function.
|
||||
type ForAllFunc func(identifier string) error
|
||||
|
||||
// CacheEntryNotFound is an error type for "Not Found" cache errors
|
||||
type CacheEntryNotFound struct {
|
||||
error
|
||||
}
|
||||
|
||||
// CachePersister interface implemented for store
|
||||
type CachePersister interface {
|
||||
Create(identifier string, data interface{}) error
|
||||
Get(identifier string, data interface{}) error
|
||||
ForAll(pattern string, destObj interface{}, f ForAllFunc) error
|
||||
Delete(identifier string) error
|
||||
}
|
||||
|
||||
// NewCachePersister returns CachePersister based on store
|
||||
func NewCachePersister(metadataStore, pluginPath string) (CachePersister, error) {
|
||||
if metadataStore == "k8s_configmap" {
|
||||
DebugLogMsg("cache-perister: using kubernetes configmap as metadata cache persister")
|
||||
k8scm := &K8sCMCache{}
|
||||
k8scm.Client = NewK8sClient()
|
||||
k8scm.Namespace = GetK8sNamespace()
|
||||
return k8scm, nil
|
||||
} else if metadataStore == "node" {
|
||||
DebugLogMsg("cache-persister: using node as metadata cache persister")
|
||||
nc := &NodeCache{}
|
||||
nc.BasePath = pluginPath
|
||||
nc.CacheDir = "controller"
|
||||
return nc, nil
|
||||
}
|
||||
return nil, errors.New("cache-persister: couldn't parse metadatastorage flag")
|
||||
}
|
52
internal/util/k8s.go
Normal file
52
internal/util/k8s.go
Normal file
@ -0,0 +1,52 @@
|
||||
/*
|
||||
Copyright 2020 The CephCSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
k8s "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
klog "k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// NewK8sClient create kubernetes client
|
||||
func NewK8sClient() *k8s.Clientset {
|
||||
var cfg *rest.Config
|
||||
var err error
|
||||
cPath := os.Getenv("KUBERNETES_CONFIG_PATH")
|
||||
if cPath != "" {
|
||||
cfg, err = clientcmd.BuildConfigFromFlags("", cPath)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster config with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else {
|
||||
cfg, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster config with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
client, err := k8s.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create client with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return client
|
||||
}
|
@ -1,187 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8s "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
klog "k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// K8sCMCache to store metadata
|
||||
type K8sCMCache struct {
|
||||
Client *k8s.Clientset
|
||||
Namespace string
|
||||
}
|
||||
|
||||
const (
|
||||
defaultNamespace = "default"
|
||||
|
||||
cmLabel = "csi-metadata"
|
||||
cmDataKey = "content"
|
||||
|
||||
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
|
||||
)
|
||||
|
||||
// GetK8sNamespace returns pod namespace. if pod namespace is empty
|
||||
// it returns default namespace
|
||||
func GetK8sNamespace() string {
|
||||
namespace := os.Getenv("POD_NAMESPACE")
|
||||
if namespace == "" {
|
||||
return defaultNamespace
|
||||
}
|
||||
return namespace
|
||||
}
|
||||
|
||||
// NewK8sClient create kubernetes client
|
||||
func NewK8sClient() *k8s.Clientset {
|
||||
var cfg *rest.Config
|
||||
var err error
|
||||
cPath := os.Getenv("KUBERNETES_CONFIG_PATH")
|
||||
if cPath != "" {
|
||||
cfg, err = clientcmd.BuildConfigFromFlags("", cPath)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster config with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else {
|
||||
cfg, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster config with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
client, err := k8s.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create client with error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error) {
|
||||
cm, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Get(context.TODO(), resourceID, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
// ForAll list the metadata in configmaps and filters outs based on the pattern
|
||||
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
||||
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
|
||||
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(context.TODO(), listOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("k8s-cm-cache: failed to list metadata configmaps: %w", err)
|
||||
}
|
||||
|
||||
for i := range cms.Items {
|
||||
data := cms.Items[i].Data[cmDataKey]
|
||||
match, err := regexp.MatchString(pattern, cms.Items[i].ObjectMeta.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
if err = json.Unmarshal([]byte(data), destObj); err != nil {
|
||||
return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", cms.Items[i].ObjectMeta.Name, err)
|
||||
}
|
||||
if err = f(cms.Items[i].ObjectMeta.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create stores the metadata in configmaps with identifier name
|
||||
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
|
||||
cm, err := k8scm.getMetadataCM(identifier)
|
||||
if cm != nil && err == nil {
|
||||
DebugLogMsg("k8s-cm-cache: configmap %s already exists, skipping configmap creation", identifier)
|
||||
return nil
|
||||
}
|
||||
dataJSON, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("k8s-cm-cache: JSON marshaling failed for configmap %s: %w", identifier, err)
|
||||
}
|
||||
cm = &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: identifier,
|
||||
Namespace: k8scm.Namespace,
|
||||
Labels: map[string]string{
|
||||
csiMetadataLabelAttr: cmLabel,
|
||||
},
|
||||
},
|
||||
Data: map[string]string{},
|
||||
}
|
||||
cm.Data[cmDataKey] = string(dataJSON)
|
||||
|
||||
_, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if apierrs.IsAlreadyExists(err) {
|
||||
DebugLogMsg("k8s-cm-cache: configmap %s already exists", identifier)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("k8s-cm-cache: couldn't persist %s metadata as configmap: %w", identifier, err)
|
||||
}
|
||||
|
||||
DebugLogMsg("k8s-cm-cache: configmap %s successfully created", identifier)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves the metadata in configmaps with identifier name
|
||||
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
|
||||
cm, err := k8scm.getMetadataCM(identifier)
|
||||
if err != nil {
|
||||
if apierrs.IsNotFound(err) {
|
||||
return &CacheEntryNotFound{err}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
err = json.Unmarshal([]byte(cm.Data[cmDataKey]), data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("k8s-cm-cache: JSON unmarshaling failed for configmap %s: %w", identifier, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the metadata in configmaps with identifier name
|
||||
func (k8scm *K8sCMCache) Delete(identifier string) error {
|
||||
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(context.TODO(), identifier, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
if apierrs.IsNotFound(err) {
|
||||
DebugLogMsg("k8s-cm-cache: cannot delete missing metadata configmap %s, assuming it's already deleted", identifier)
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("k8s-cm-cache: couldn't delete metadata configmap %s: %w", identifier, err)
|
||||
}
|
||||
DebugLogMsg("k8s-cm-cache: successfully deleted metadata configmap %s", identifier)
|
||||
return nil
|
||||
}
|
@ -1,165 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
klog "k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// NodeCache to store metadata
|
||||
type NodeCache struct {
|
||||
BasePath string
|
||||
CacheDir string
|
||||
}
|
||||
|
||||
var errDec = errors.New("file not found")
|
||||
|
||||
// EnsureCacheDirectory creates cache directory if not present
|
||||
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
|
||||
fullPath := path.Join(nc.BasePath, cacheDir)
|
||||
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
|
||||
// #nosec
|
||||
if err := os.Mkdir(fullPath, 0755); err != nil {
|
||||
return fmt.Errorf("node-cache: failed to create %s folder: %w", fullPath, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForAll list the metadata in Nodecache and filters outs based on the pattern
|
||||
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
||||
err := nc.EnsureCacheDirectory(nc.CacheDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node-cache: couldn't ensure cache directory exists: %w", err)
|
||||
}
|
||||
files, err := ioutil.ReadDir(path.Join(nc.BasePath, nc.CacheDir))
|
||||
if err != nil {
|
||||
return fmt.Errorf("node-cache: failed to read %s folder: %w", nc.BasePath, err)
|
||||
}
|
||||
cachePath := path.Join(nc.BasePath, nc.CacheDir)
|
||||
for _, file := range files {
|
||||
err = decodeObj(cachePath, pattern, file, destObj)
|
||||
if errors.Is(err, errDec) {
|
||||
continue
|
||||
} else if err == nil {
|
||||
if err = f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func decodeObj(fpath, pattern string, file os.FileInfo, destObj interface{}) error {
|
||||
match, err := regexp.MatchString(pattern, file.Name())
|
||||
if err != nil || !match {
|
||||
return errDec
|
||||
}
|
||||
if !strings.HasSuffix(file.Name(), ".json") {
|
||||
return errDec
|
||||
}
|
||||
// #nosec
|
||||
fp, err := os.Open(path.Join(fpath, file.Name()))
|
||||
if err != nil {
|
||||
DebugLogMsg("node-cache: open file: %s err %v", file.Name(), err)
|
||||
return errDec
|
||||
}
|
||||
decoder := json.NewDecoder(fp)
|
||||
if err = decoder.Decode(destObj); err != nil {
|
||||
if err = fp.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close file %s: %w", file.Name(), err)
|
||||
}
|
||||
return fmt.Errorf("node-cache: couldn't decode file %s: %w", file.Name(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create creates the metadata file in cache directory with identifier name
|
||||
func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
fp, err := os.Create(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node-cache: failed to create metadata storage file %s: %w", file, err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err = fp.Close(); err != nil {
|
||||
klog.Warningf("failed to close file:%s %v", fp.Name(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
encoder := json.NewEncoder(fp)
|
||||
if err = encoder.Encode(data); err != nil {
|
||||
return fmt.Errorf("node-cache: failed to encode metadata for file: %s: %w", file, err)
|
||||
}
|
||||
DebugLogMsg("node-cache: successfully saved metadata into file: %s\n", file)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves the metadata from cache directory with identifier name
|
||||
func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
// #nosec
|
||||
fp, err := os.Open(file)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return &CacheEntryNotFound{err}
|
||||
}
|
||||
|
||||
return fmt.Errorf("node-cache: open error for %s: %w", file, err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err = fp.Close(); err != nil {
|
||||
klog.Warningf("failed to close file:%s %v", fp.Name(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
decoder := json.NewDecoder(fp)
|
||||
if err = decoder.Decode(data); err != nil {
|
||||
return fmt.Errorf("rbd: decode error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the metadata file from cache directory with identifier name
|
||||
func (nc *NodeCache) Delete(identifier string) error {
|
||||
file := path.Join(nc.BasePath, nc.CacheDir, identifier+".json")
|
||||
err := os.Remove(file)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
DebugLogMsg("node-cache: cannot delete missing metadata storage file %s, assuming it's already deleted", file)
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("node-cache: error removing file %s: %w", file, err)
|
||||
}
|
||||
DebugLogMsg("node-cache: successfully deleted metadata storage file at: %+v\n", file)
|
||||
return nil
|
||||
}
|
@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -79,17 +78,13 @@ var (
|
||||
|
||||
// Config holds the parameters list which can be configured
|
||||
type Config struct {
|
||||
Vtype string // driver type [rbd|cephfs|liveness]
|
||||
Endpoint string // CSI endpoint
|
||||
DriverName string // name of the driver
|
||||
NodeID string // node id
|
||||
InstanceID string // unique ID distinguishing this instance of Ceph CSI
|
||||
MetadataStorage string // metadata persistence method [node|k8s_configmap]
|
||||
PluginPath string // location of cephcsi plugin
|
||||
DomainLabels string // list of domain labels to read from the node
|
||||
|
||||
// cephfs related flags
|
||||
MountCacheDir string // mount info cache save dir
|
||||
Vtype string // driver type [rbd|cephfs|liveness]
|
||||
Endpoint string // CSI endpoint
|
||||
DriverName string // name of the driver
|
||||
NodeID string // node id
|
||||
InstanceID string // unique ID distinguishing this instance of Ceph CSI
|
||||
PluginPath string // location of cephcsi plugin
|
||||
DomainLabels string // list of domain labels to read from the node
|
||||
|
||||
// metrics related flags
|
||||
MetricsPath string // path of prometheus endpoint where metrics will be available
|
||||
@ -124,27 +119,6 @@ type Config struct {
|
||||
MaxSnapshotsOnImage uint
|
||||
}
|
||||
|
||||
// CreatePersistanceStorage creates storage path and initializes new cache
|
||||
func CreatePersistanceStorage(sPath, metaDataStore, pluginPath string) (CachePersister, error) {
|
||||
var err error
|
||||
if err = CreateMountPoint(path.Join(sPath, "controller")); err != nil {
|
||||
klog.Errorf("failed to create persistent storage for controller: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = CreateMountPoint(path.Join(sPath, "node")); err != nil {
|
||||
klog.Errorf("failed to create persistent storage for node: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cp, err := NewCachePersister(metaDataStore, pluginPath)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to define cache persistence method: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return cp, err
|
||||
}
|
||||
|
||||
// ValidateDriverName validates the driver name
|
||||
func ValidateDriverName(driverName string) error {
|
||||
if driverName == "" {
|
||||
|
Loading…
Reference in New Issue
Block a user