Support mounting and deleting version 1.0.0 RBD volumes

This commit adds support to mount and delete volumes provisioned by older
plugin versions (1.0.0) in order to support backward compatibility to 1.0.0
created volumes.

It adds back the ability to specify where older meta data was specified, using
the metadatastorage option to the plugin. Further, using the provided meta data
to mount and delete the older volumes.

It also supports a variety of ways in which monitor information may have been
specified (in the storage class, or in the secret), to keep the monitor
information current.

Testing done:
- Mount/Delete 1.0.0 plugin created volume with monitors in the StorageClass
- Mount/Delete 1.0.0 plugin created volume with monitors in the secret with
  a key "monitors"
- Mount/Delete 1.0.0 plugin created volume with monitors in the secret with
  a user specified key
- PVC creation and deletion with the current version (to ensure at the minimum
  no broken functionality)
- Tested some negative cases, where monitor information is missing in secrets
  or present with a different key name, to understand if failure scenarios work
  as expected

Updates #378

Follow-up work:
- Documentation on how to upgrade to 1.1 plugin and retain above functionality
  for older volumes

Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
ShyamsundarR 2019-05-31 14:09:24 -04:00 committed by mergify[bot]
parent 09f126691c
commit fa68c35f3b
7 changed files with 234 additions and 52 deletions

View File

@ -44,6 +44,7 @@ var (
nodeID = flag.String("nodeid", "", "node id")
instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
// rbd related flags
containerized = flag.Bool("containerized", true, "whether run as containerized")
@ -51,7 +52,6 @@ var (
// cephfs related flags
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
)
func init() {
@ -109,8 +109,15 @@ func main() {
switch driverType {
case rbdType:
rbd.PluginFolder += dname
if *metadataStorage != "" {
cp, err = util.CreatePersistanceStorage(
rbd.PluginFolder, *metadataStorage, dname)
if err != nil {
os.Exit(1)
}
}
driver := rbd.NewDriver()
driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized)
driver.Run(dname, *nodeID, *endpoint, *instanceID, *containerized, cp)
case cephfsType:
cephfs.PluginFolder += dname

View File

@ -27,13 +27,14 @@ make image-cephcsi
**Available command line arguments:**
| Option | Default value | Description |
| ----------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `rbd.csi.ceph.com` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--drivername` | `rbd.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
| `--type` | _empty_ | driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
| `--containerized` | true | Whether running in containerized mode |
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
**Available environmental variables:**

View File

@ -38,6 +38,7 @@ const (
// controller server spec.
type ControllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
}
func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) error {
@ -83,7 +84,7 @@ func (cs *ControllerServer) parseVolCreateRequest(req *csi.CreateVolumeRequest)
}
// if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks
rbdVol, err := genVolFromVolumeOptions(req.GetParameters(), (isMultiNode && isBlock))
rbdVol, err := genVolFromVolumeOptions(req.GetParameters(), nil, (isMultiNode && isBlock), false)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
@ -226,6 +227,50 @@ func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *
return nil
}
// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin
func (cs *ControllerServer) DeleteLegacyVolume(req *csi.DeleteVolumeRequest, cr *util.Credentials) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
if cs.MetadataStore == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing metadata store configuration to"+
" proceed with deleting legacy volume ID (%s)", volumeID)
}
idLk := legacyVolumeIDLocker.Lock(volumeID)
defer legacyVolumeIDLocker.Unlock(idLk, volumeID)
rbdVol := &rbdVolume{}
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof("metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)", volumeID, err)
return &csi.DeleteVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
}
// Fill up Monitors
if err := updateMons(rbdVol, nil, req.GetSecrets()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// Update rbdImageName as the VolName when dealing with version 1 volumes
rbdVol.RbdImageName = rbdVol.VolName
klog.V(4).Infof("deleting legacy volume %s", rbdVol.VolName)
if err := deleteImage(rbdVol, cr); err != nil {
// TODO: can we detect "already deleted" situations here and proceed?
klog.V(3).Infof("failed to delete legacy rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err := cs.MetadataStore.Delete(volumeID); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeResponse{}, nil
}
// DeleteVolume deletes the volume in backend and removes the volume metadata
// from store
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
@ -247,6 +292,18 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
rbdVol := &rbdVolume{}
if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil {
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
// to process it as such
if _, ok := err.(ErrInvalidVolID); ok {
if isLegacyVolumeID(volumeID) {
klog.V(2).Infof("attempting deletion of potential legacy volume (%s)", volumeID)
return cs.DeleteLegacyVolume(req, cr)
}
// Consider unknown volumeID as a successfully deleted volume
return &csi.DeleteVolumeResponse{}, nil
}
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (image and imageOMap are garbage collected already), hence return
// success as deletion is complete
@ -284,6 +341,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error())
}
if err := undoVolReservation(rbdVol, cr); err != nil {
klog.Errorf("failed to remove reservation for volume (%s) with backing image (%s) (%s)",
rbdVol.RequestName, rbdVol.RbdImageName, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeResponse{}, nil
}

View File

@ -47,3 +47,13 @@ type ErrVolNameConflict struct {
func (e ErrVolNameConflict) Error() string {
return e.err.Error()
}
// ErrInvalidVolID is returned when a CSI passed VolumeID does not conform to any known volume ID
// formats
type ErrInvalidVolID struct {
err error
}
func (e ErrInvalidVolID) Error() string {
return e.err.Error()
}

View File

@ -67,6 +67,22 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
disableInUseChecks := false
isLegacyVolume := false
volName, err := getVolumeName(req)
if err != nil {
// error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name
// pattern match in addition to error to ensure this is a likely v1.0.0 volume
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
volName, err = getLegacyVolumeName(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
isLegacyVolume = true
}
isBlock := req.GetVolumeCapability().GetBlock() != nil
// Check if that target path exists properly
notMnt, err := ns.createTargetPath(targetPath, isBlock)
@ -88,12 +104,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
}
volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), disableInUseChecks)
if err != nil {
return nil, err
}
volName, err := ns.getVolumeName(req)
volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume)
if err != nil {
return nil, err
}
@ -118,18 +129,40 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *NodeServer) getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
func getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
var vi util.CSIIdentifier
err := vi.DecomposeCSIID(req.GetVolumeId())
if err != nil {
klog.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId())
return "", status.Error(codes.InvalidArgument, err.Error())
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId())
return "", ErrInvalidVolID{err}
}
return volJournal.NamingPrefix() + vi.ObjectUUID, nil
}
func getLegacyVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
var volName string
isBlock := req.GetVolumeCapability().GetBlock() != nil
targetPath := req.GetTargetPath()
if isBlock {
// Get volName from targetPath
s := strings.Split(targetPath, "/")
volName = s[len(s)-1]
} else {
// Get volName from targetPath
if !strings.HasSuffix(targetPath, "/mount") {
return "", fmt.Errorf("rbd: malformed value of target path: %s", targetPath)
}
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName = s[len(s)-1]
}
return volName, nil
}
func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
// Publish Path
fsType := req.GetVolumeCapability().GetMount().GetFsType()

View File

@ -74,9 +74,10 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
}
// NewControllerServer initialize a controller server for rbd CSI driver
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister,
}
}
@ -98,7 +99,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err
// Run start a non-blocking grpc controller,node and identityserver for
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool) {
func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool, cachePersister util.CachePersister) {
var err error
klog.Infof("Driver: %v version: %v", driverName, version)
@ -147,7 +148,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containeri
klog.Fatalf("failed to start node server, err %v\n", err)
}
r.cs = NewControllerServer(r.cd)
r.cs = NewControllerServer(r.cd, cachePersister)
s := csicommon.NewNonBlockingGRPCServer()
s.Start(endpoint, r.ids, r.cs, r.ns)

View File

@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
@ -45,20 +46,28 @@ const (
// rbdVolume represents a CSI volume and its RBD image specifics
type rbdVolume struct {
// RbdImageName is the name of the RBD image backing this rbdVolume
// RbdImageName is the name of the RBD image backing this rbdVolume. This does not have a
// JSON tag as it is not stashed in JSON encoded config maps in v1.0.0
// VolID is the volume ID that is exchanged with CSI drivers, identifying this rbdVol
// RequestName is the CSI generated volume name for the rbdVolume
// RequestName is the CSI generated volume name for the rbdVolume. This does not have a
// JSON tag as it is not stashed in JSON encoded config maps in v1.0.0
// VolName and MonValueFromSecret are retained from older plugin versions (<= 1.0.0)
// for backward compatibility reasons
RbdImageName string
VolID string
Monitors string
Pool string
ImageFormat string
ImageFeatures string
VolSize int64
Mounter string
DisableInUseChecks bool
ClusterID string
VolID string `json:"volID"`
Monitors string `json:"monitors"`
Pool string `json:"pool"`
ImageFormat string `json:"imageFormat"`
ImageFeatures string `json:"imageFeatures"`
VolSize int64 `json:"volSize"`
AdminID string `json:"adminId"`
UserID string `json:"userId"`
Mounter string `json:"mounter"`
DisableInUseChecks bool `json:"disableInUseChecks"`
ClusterID string `json:"clusterId"`
RequestName string
VolName string `json:"volName"`
MonValueFromSecret string `json:"monValueFromSecret"`
}
// rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics
@ -89,6 +98,8 @@ var (
snapshotNameLocker = util.NewIDLocker()
// serializes operations based on "mount target path" as key
targetPathLocker = util.NewIDLocker()
// serializes delete operations on legacy volumes
legacyVolumeIDLocker = util.NewIDLocker()
supportedFeatures = sets.NewString("layering")
)
@ -172,14 +183,6 @@ func deleteImage(pOpts *rbdVolume, cr *util.Credentials) error {
output, err = execCommand("rbd", args)
if err != nil {
klog.Errorf("failed to delete rbd image: %v, command output: %s", err, string(output))
return err
}
err = undoVolReservation(pOpts, cr)
if err != nil {
klog.Errorf("failed to remove reservation for volume (%s) with backing image (%s) (%s)",
pOpts.RequestName, pOpts.RbdImageName, err)
err = nil
}
return err
@ -283,9 +286,8 @@ func genVolFromVolID(rbdVol *rbdVolume, volumeID string, cr *util.Credentials) e
err := vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
klog.V(4).Infof("error decoding volume ID (%s) (%s)", err, rbdVol.VolID)
return err
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, rbdVol.VolID)
return ErrInvalidVolID{err}
}
rbdVol.ClusterID = vi.ClusterID
@ -336,7 +338,66 @@ func getMonsAndClusterID(options map[string]string) (monitors, clusterID string,
return
}
func genVolFromVolumeOptions(volOptions map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
// isLegacyVolumeID checks if passed in volume ID string conforms to volume ID naming scheme used
// by the version 1.0.0 (legacy) of the plugin, and returns true if found to be conforming
func isLegacyVolumeID(volumeID string) bool {
// Version 1.0.0 volumeID format: "csi-rbd-vol-" + UUID string
// length: 12 ("csi-rbd-vol-") + 36 (UUID string)
// length check
if len(volumeID) != 48 {
return false
}
// Header check
if !strings.HasPrefix(volumeID, "csi-rbd-vol-") {
return false
}
// Trailer UUID format check
if uuid.Parse(volumeID[12:]) == nil {
return false
}
return true
}
// upadateMons function is used to update the rbdVolume.Monitors for volumes that were provisioned
// using the 1.0.0 version (legacy) of the plugin.
func updateMons(rbdVol *rbdVolume, options, credentials map[string]string) error {
var ok bool
// read monitors and MonValueFromSecret from options, else check passed in rbdVolume for
// MonValueFromSecret key in credentials
monInSecret := ""
if options != nil {
if rbdVol.Monitors, ok = options["monitors"]; !ok {
rbdVol.Monitors = ""
}
if monInSecret, ok = options["monValueFromSecret"]; !ok {
monInSecret = ""
}
} else {
monInSecret = rbdVol.MonValueFromSecret
}
// if monitors are present in secrets and we have the credentials, use monitors from the
// credentials overriding monitors from other sources
if monInSecret != "" && credentials != nil {
monsFromSecret, ok := credentials[monInSecret]
if ok {
rbdVol.Monitors = monsFromSecret
}
}
if rbdVol.Monitors == "" {
return errors.New("either monitors or monValueFromSecret must be set")
}
return nil
}
func genVolFromVolumeOptions(volOptions, credentials map[string]string, disableInUseChecks, isLegacyVolume bool) (*rbdVolume, error) {
var (
ok bool
err error
@ -348,10 +409,17 @@ func genVolFromVolumeOptions(volOptions map[string]string, disableInUseChecks bo
return nil, errors.New("missing required parameter pool")
}
if isLegacyVolume {
err = updateMons(rbdVol, volOptions, credentials)
if err != nil {
return nil, err
}
} else {
rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(volOptions)
if err != nil {
return nil, err
}
}
rbdVol.ImageFormat, ok = volOptions["imageFormat"]
if !ok {
@ -372,7 +440,6 @@ func genVolFromVolumeOptions(volOptions map[string]string, disableInUseChecks bo
}
rbdVol.ImageFeatures = imageFeatures
}
}
klog.V(3).Infof("setting disableInUseChecks on rbd volume to: %v", disableInUseChecks)