mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
Removed config maps and replaced with rados omaps
Existing config maps are now replaced with rados omaps that help store information regarding the requested volume names and the rbd image names backing the same. Further to detect cluster, pool and which image a volume ID refers to, changes to volume ID encoding has been done as per provided design specification in the stateless ceph-csi proposal. Additional changes and updates, - Updated documentation - Updated manifests - Updated Helm chart - Addressed a few csi-test failures Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
committed by
mergify[bot]
parent
f60a07ae82
commit
d02e50aa9b
@ -18,20 +18,12 @@ package rbd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
csicommon "github.com/ceph/ceph-csi/pkg/csi-common"
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
||||
"github.com/pborman/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -46,33 +38,6 @@ const (
|
||||
// controller server spec.
|
||||
type ControllerServer struct {
|
||||
*csicommon.DefaultControllerServer
|
||||
MetadataStore util.CachePersister
|
||||
}
|
||||
|
||||
var (
|
||||
rbdVolumes = map[string]rbdVolume{}
|
||||
rbdSnapshots = map[string]rbdSnapshot{}
|
||||
)
|
||||
|
||||
// LoadExDataFromMetadataStore loads the rbd volume and snapshot
|
||||
// info from metadata store
|
||||
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
|
||||
vol := &rbdVolume{}
|
||||
// nolint
|
||||
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
|
||||
rbdVolumes[identifier] = *vol
|
||||
return nil
|
||||
})
|
||||
|
||||
snap := &rbdSnapshot{}
|
||||
// nolint
|
||||
cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
|
||||
rbdSnapshots[identifier] = *snap
|
||||
return nil
|
||||
})
|
||||
|
||||
klog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) error {
|
||||
@ -87,10 +52,17 @@ func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) erro
|
||||
if req.VolumeCapabilities == nil {
|
||||
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
||||
}
|
||||
options := req.GetParameters()
|
||||
if value, ok := options["clusterID"]; !ok || len(value) == 0 {
|
||||
return status.Error(codes.InvalidArgument, "Missing or empty cluster ID to provision volume from")
|
||||
}
|
||||
if value, ok := options["pool"]; !ok || len(value) == 0 {
|
||||
return status.Error(codes.InvalidArgument, "Missing or empty pool name to provision volume from")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
|
||||
func (cs *ControllerServer) parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
|
||||
// TODO (sbezverk) Last check for not exceeding total storage capacity
|
||||
|
||||
isMultiNode := false
|
||||
@ -111,38 +83,28 @@ func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
|
||||
}
|
||||
|
||||
// if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks
|
||||
rbdVol, err := getRBDVolumeOptions(req.GetParameters(), (isMultiNode && isBlock))
|
||||
rbdVol, err := genVolFromVolumeOptions(req.GetParameters(), (isMultiNode && isBlock))
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Generating Volume Name and Volume ID, as according to CSI spec they MUST be different
|
||||
volName := req.GetName()
|
||||
uniqueID := uuid.NewUUID().String()
|
||||
rbdVol.VolName = volName
|
||||
volumeID := "csi-rbd-vol-" + uniqueID
|
||||
rbdVol.VolID = volumeID
|
||||
rbdVol.RequestName = req.GetName()
|
||||
|
||||
// Volume Size - Default is 1 GiB
|
||||
volSizeBytes := int64(oneGB)
|
||||
if req.GetCapacityRange() != nil {
|
||||
volSizeBytes = req.GetCapacityRange().GetRequiredBytes()
|
||||
}
|
||||
|
||||
rbdVol.VolSize = util.RoundUpToMiB(volSizeBytes)
|
||||
// always round up the request size in bytes to the nearest MiB
|
||||
rbdVol.VolSize = util.MiB * util.RoundUpToMiB(volSizeBytes)
|
||||
|
||||
// NOTE: rbdVol does not contain VolID and RbdImageName populated, everything
|
||||
// else is populated post create request parsing
|
||||
return rbdVol, nil
|
||||
}
|
||||
|
||||
func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error {
|
||||
if err := cp.Create(vol.VolID, vol); err != nil {
|
||||
klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateVolume creates the volume in backend and store the volume metadata
|
||||
// CreateVolume creates the volume in backend
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
|
||||
if err := cs.validateVolumeReq(req); err != nil {
|
||||
@ -155,50 +117,46 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
}
|
||||
}()
|
||||
|
||||
// Need to check for already existing volume name, and if found
|
||||
// check for the requested capacity and already allocated capacity
|
||||
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
||||
// Since err is nil, it means the volume with the same name already exists
|
||||
// need to check if the size of existing volume is the same as in new
|
||||
// request
|
||||
if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() {
|
||||
// existing volume is compatible with new request and should be reused.
|
||||
rbdVol, err := cs.parseVolCreateRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = storeVolumeMetadata(exVol, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: exVol.VolID,
|
||||
CapacityBytes: exVol.VolSize,
|
||||
VolumeContext: req.GetParameters(),
|
||||
},
|
||||
}, nil
|
||||
found, err := checkVolExists(rbdVol, req.GetSecrets())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrVolNameConflict); ok {
|
||||
return nil, status.Error(codes.AlreadyExists, err.Error())
|
||||
}
|
||||
return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName())
|
||||
}
|
||||
|
||||
rbdVol, err := parseVolCreateRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check if there is already RBD image with requested name
|
||||
err = cs.checkRBDStatus(rbdVol, req, int(rbdVol.VolSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// store volume size in bytes (snapshot and check existing volume needs volume
|
||||
// size in bytes)
|
||||
rbdVol.VolSize = rbdVol.VolSize * util.MiB
|
||||
|
||||
rbdVolumes[rbdVol.VolID] = *rbdVol
|
||||
|
||||
if err = storeVolumeMetadata(rbdVol, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if found {
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: rbdVol.VolID,
|
||||
CapacityBytes: rbdVol.VolSize,
|
||||
VolumeContext: req.GetParameters(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
err = reserveVol(rbdVol, req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := unreserveVol(rbdVol, req.GetSecrets())
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of volume: %s (%s)", req.GetName(), errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = cs.createBackingImage(rbdVol, req, util.RoundUpToMiB(rbdVol.VolSize))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
@ -209,27 +167,24 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) checkRBDStatus(rbdVol *rbdVolume, req *csi.CreateVolumeRequest, volSizeMiB int) error {
|
||||
func (cs *ControllerServer) createBackingImage(rbdVol *rbdVolume, req *csi.CreateVolumeRequest, volSizeMiB int64) error {
|
||||
var err error
|
||||
// Check if there is already RBD image with requested name
|
||||
//nolint
|
||||
found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets())
|
||||
if !found {
|
||||
// if VolumeContentSource is not nil, this request is for snapshot
|
||||
if req.VolumeContentSource != nil {
|
||||
if err = cs.checkSnapshot(req, rbdVol); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = createRBDImage(rbdVol, volSizeMiB, rbdVol.AdminID, req.GetSecrets())
|
||||
if err != nil {
|
||||
klog.Warningf("failed to create volume: %v", err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.V(4).Infof("create volume %s", rbdVol.VolName)
|
||||
// if VolumeContentSource is not nil, this request is for snapshot
|
||||
if req.VolumeContentSource != nil {
|
||||
if err = cs.checkSnapshot(req, rbdVol); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = createImage(rbdVol, volSizeMiB, rbdVol.AdminID, req.GetSecrets())
|
||||
if err != nil {
|
||||
klog.Warningf("failed to create volume: %v", err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
klog.V(4).Infof("created image %s", rbdVol.RbdImageName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error {
|
||||
@ -244,15 +199,18 @@ func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *
|
||||
}
|
||||
|
||||
rbdSnap := &rbdSnapshot{}
|
||||
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
|
||||
return status.Error(codes.NotFound, err.Error())
|
||||
if err := genSnapFromSnapID(rbdSnap, snapshotID, req.GetSecrets()); err != nil {
|
||||
if _, ok := err.(ErrSnapNotFound); !ok {
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return status.Error(codes.InvalidArgument, "Missing requested Snapshot ID")
|
||||
}
|
||||
|
||||
err := restoreSnapshot(rbdVol, rbdSnap, rbdVol.AdminID, req.GetSecrets())
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
klog.V(4).Infof("create volume %s from snapshot %s", req.GetName(), rbdSnap.SnapName)
|
||||
klog.V(4).Infof("create volume %s from snapshot %s", req.GetName(), rbdSnap.RbdSnapName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -265,8 +223,10 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}
|
||||
// For now the image get unconditionally deleted, but here retention policy can be checked
|
||||
volumeID := req.GetVolumeId()
|
||||
if volumeID == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request")
|
||||
}
|
||||
volumeIDMutex.LockKey(volumeID)
|
||||
|
||||
defer func() {
|
||||
if err := volumeIDMutex.UnlockKey(volumeID); err != nil {
|
||||
klog.Warningf("failed to unlock mutex volume:%s %v", volumeID, err)
|
||||
@ -274,84 +234,66 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}()
|
||||
|
||||
rbdVol := &rbdVolume{}
|
||||
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.V(3).Infof("metadata for volume %s not found, assuming the volume to be already deleted (%v)", volumeID, err)
|
||||
if err := genVolFromVolID(rbdVol, volumeID, req.GetSecrets()); err != nil {
|
||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||
// or partially complete (image and imageOMap are garbage collected already), hence return
|
||||
// success as deletion is complete
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
// All errors other than ErrImageNotFound should return an error back to the caller
|
||||
if _, ok := err.(ErrImageNotFound); !ok {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
|
||||
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
|
||||
// unreserve for the same
|
||||
volumeNameMutex.LockKey(rbdVol.RequestName)
|
||||
defer func() {
|
||||
if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil {
|
||||
klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := unreserveVol(rbdVol, req.GetSecrets()); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
volName := rbdVol.VolName
|
||||
// lock out parallel create requests against the same volume name as we
|
||||
// cleanup the image and associated omaps for the same
|
||||
volumeNameMutex.LockKey(rbdVol.RequestName)
|
||||
defer func() {
|
||||
if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil {
|
||||
klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Deleting rbd image
|
||||
klog.V(4).Infof("deleting volume %s", volName)
|
||||
if err := deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
|
||||
// TODO: can we detect "already deleted" situations here and proceed?
|
||||
klog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, volName, err)
|
||||
klog.V(4).Infof("deleting image %s", rbdVol.RbdImageName)
|
||||
if err := deleteImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
|
||||
klog.Errorf("failed to delete rbd image: %s/%s with error: %v",
|
||||
rbdVol.Pool, rbdVol.RbdImageName, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err := cs.MetadataStore.Delete(volumeID); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
delete(rbdVolumes, volumeID)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// ListVolumes returns a list of volumes stored in memory
|
||||
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
|
||||
var startToken int
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_VOLUMES); err != nil {
|
||||
klog.Warningf("invalid list volume req: %v", req)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//validate starting token if present
|
||||
if len(req.GetStartingToken()) > 0 {
|
||||
i, parseErr := strconv.ParseUint(req.StartingToken, 10, 32)
|
||||
if parseErr != nil {
|
||||
return nil, status.Errorf(codes.Aborted, "invalid starting token %s", parseErr.Error())
|
||||
}
|
||||
//check starting Token is greater than list of rbd volumes
|
||||
if len(rbdVolumes) < int(i) {
|
||||
return nil, status.Errorf(codes.Aborted, "invalid starting token %s", parseErr.Error())
|
||||
}
|
||||
startToken = int(i)
|
||||
}
|
||||
|
||||
var entries []*csi.ListVolumesResponse_Entry
|
||||
|
||||
keys := make([]string, 0)
|
||||
for k := range rbdVolumes {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for index, k := range keys {
|
||||
if index < startToken {
|
||||
continue
|
||||
}
|
||||
entries = append(entries, &csi.ListVolumesResponse_Entry{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: rbdVolumes[k].VolID,
|
||||
CapacityBytes: rbdVolumes[k].VolSize,
|
||||
VolumeContext: extractStoredVolOpt(rbdVolumes[k]),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
resp := &csi.ListVolumesResponse{
|
||||
Entries: entries,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ValidateVolumeCapabilities checks whether the volume capabilities requested
|
||||
// are supported.
|
||||
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
||||
if req.GetVolumeId() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request")
|
||||
}
|
||||
|
||||
if len(req.VolumeCapabilities) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume capabilities in request")
|
||||
}
|
||||
|
||||
for _, cap := range req.VolumeCapabilities {
|
||||
if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
|
||||
return &csi.ValidateVolumeCapabilitiesResponse{Message: ""}, nil
|
||||
@ -368,100 +310,90 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
||||
// in store
|
||||
// nolint: gocyclo
|
||||
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
||||
|
||||
if err := cs.validateSnapshotReq(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
snapshotNameMutex.LockKey(req.GetName())
|
||||
|
||||
snapshotNameMutex.LockKey(req.GetName())
|
||||
defer func() {
|
||||
if err := snapshotNameMutex.UnlockKey(req.GetName()); err != nil {
|
||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", req.GetName(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Need to check for already existing snapshot name, and if found
|
||||
// check for the requested source volume id and already allocated source volume id
|
||||
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
||||
if req.SourceVolumeId == exSnap.SourceVolumeID {
|
||||
if err = storeSnapshotMetadata(exSnap, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: exSnap.SizeBytes,
|
||||
SnapshotId: exSnap.SnapID,
|
||||
SourceVolumeId: exSnap.SourceVolumeID,
|
||||
CreationTime: ×tamp.Timestamp{
|
||||
Seconds: exSnap.CreatedAt,
|
||||
},
|
||||
ReadyToUse: true,
|
||||
},
|
||||
}, nil
|
||||
// Fetch source volume information
|
||||
rbdVol := new(rbdVolume)
|
||||
err := genVolFromVolID(rbdVol, req.GetSourceVolumeId(), req.GetSecrets())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrImageNotFound); ok {
|
||||
return nil, status.Errorf(codes.NotFound, "Source Volume ID %s not found", req.GetSourceVolumeId())
|
||||
}
|
||||
return nil, status.Errorf(codes.AlreadyExists, "Snapshot with the same name: %s but with different source volume id already exist", req.GetName())
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
rbdSnap, err := getRBDSnapshotOptions(req.GetParameters())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Generating Snapshot Name and Snapshot ID, as according to CSI spec they MUST be different
|
||||
snapName := req.GetName()
|
||||
uniqueID := uuid.NewUUID().String()
|
||||
rbdVolume, err := getRBDVolumeByID(req.GetSourceVolumeId())
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "Source Volume ID %s cannot found", req.GetSourceVolumeId())
|
||||
}
|
||||
if !hasSnapshotFeature(rbdVolume.ImageFeatures) {
|
||||
// Check if source volume was created with required image features for snaps
|
||||
if !hasSnapshotFeature(rbdVol.ImageFeatures) {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "volume(%s) has not snapshot feature(layering)", req.GetSourceVolumeId())
|
||||
}
|
||||
|
||||
rbdSnap.VolName = rbdVolume.VolName
|
||||
rbdSnap.SnapName = snapName
|
||||
snapshotID := "csi-rbd-" + rbdVolume.VolName + "-snap-" + uniqueID
|
||||
rbdSnap.SnapID = snapshotID
|
||||
// Create snap volume
|
||||
rbdSnap := genSnapFromOptions(rbdVol, req.GetParameters())
|
||||
rbdSnap.RbdImageName = rbdVol.RbdImageName
|
||||
rbdSnap.SizeBytes = rbdVol.VolSize
|
||||
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
||||
rbdSnap.SizeBytes = rbdVolume.VolSize
|
||||
rbdSnap.RequestName = req.GetName()
|
||||
|
||||
err = cs.doSnapshot(rbdSnap, req.GetSecrets())
|
||||
// if we already have the snapshot, return the snapshot
|
||||
// Need to check for already existing snapshot name, and if found
|
||||
// check for the requested source volume id and already allocated source volume id
|
||||
found, err := checkSnapExists(rbdSnap, req.GetSecrets())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrSnapNameConflict); ok {
|
||||
return nil, status.Error(codes.AlreadyExists, err.Error())
|
||||
}
|
||||
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
if found {
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: rbdSnap.SizeBytes,
|
||||
SnapshotId: rbdSnap.SnapID,
|
||||
SourceVolumeId: rbdSnap.SourceVolumeID,
|
||||
CreationTime: rbdSnap.CreatedAt,
|
||||
ReadyToUse: true,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
err = reserveSnap(rbdSnap, req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := unreserveSnap(rbdSnap, req.GetSecrets())
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of snapshot: %s %v", req.GetName(), errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds()
|
||||
|
||||
rbdSnapshots[snapshotID] = *rbdSnap
|
||||
|
||||
if err = storeSnapshotMetadata(rbdSnap, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
err = cs.doSnapshot(rbdSnap, req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: rbdSnap.SizeBytes,
|
||||
SnapshotId: snapshotID,
|
||||
SnapshotId: rbdSnap.SnapID,
|
||||
SourceVolumeId: req.GetSourceVolumeId(),
|
||||
CreationTime: ×tamp.Timestamp{
|
||||
Seconds: rbdSnap.CreatedAt,
|
||||
},
|
||||
ReadyToUse: true,
|
||||
CreationTime: rbdSnap.CreatedAt,
|
||||
ReadyToUse: true,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func storeSnapshotMetadata(rbdSnap *rbdSnapshot, cp util.CachePersister) error {
|
||||
if err := cp.Create(rbdSnap.SnapID, rbdSnap); err != nil {
|
||||
klog.Errorf("failed to store metadata for snapshot %s: %v", rbdSnap.SnapID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
|
||||
klog.Warningf("invalid create snapshot req: %v", protosanitizer.StripSecrets(req))
|
||||
@ -475,41 +407,53 @@ func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest)
|
||||
if len(req.SourceVolumeId) == 0 {
|
||||
return status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) doSnapshot(rbdSnap *rbdSnapshot, secret map[string]string) error {
|
||||
err := createSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
// if we already have the snapshot, return the snapshot
|
||||
func (cs *ControllerServer) doSnapshot(rbdSnap *rbdSnapshot, secret map[string]string) (err error) {
|
||||
err = createSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
// If snap creation fails, even due to snapname already used, fail, next attempt will get a new
|
||||
// uuid for use as the snap name
|
||||
if err != nil {
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||
if status.ExitStatus() == int(syscall.EEXIST) {
|
||||
klog.Warningf("Snapshot with the same name: %s, we return this.", rbdSnap.SnapName)
|
||||
} else {
|
||||
klog.Warningf("failed to create snapshot: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
klog.Warningf("failed to create snapshot: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
klog.Warningf("failed to create snapshot: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
klog.V(4).Infof("create snapshot %s", rbdSnap.SnapName)
|
||||
err = protectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
|
||||
if err != nil {
|
||||
err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if err != nil {
|
||||
return fmt.Errorf("snapshot is created but failed to protect and delete snapshot: %v", err)
|
||||
}
|
||||
return errors.New("snapshot is created but failed to protect snapshot")
|
||||
}
|
||||
klog.Errorf("failed to create snapshot: %v", err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if errDefer != nil {
|
||||
klog.Errorf("failed to delete snapshot: %v", errDefer)
|
||||
err = fmt.Errorf("snapshot created but failed to delete snapshot due to"+
|
||||
" other failures: %v", err)
|
||||
}
|
||||
err = status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
err = protectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to protect snapshot: %v", err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
errDefer := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if errDefer != nil {
|
||||
klog.Errorf("failed to unprotect snapshot: %v", errDefer)
|
||||
err = fmt.Errorf("snapshot created but failed to unprotect snapshot due to"+
|
||||
" other failures: %v", err)
|
||||
}
|
||||
err = status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
err = getSnapshotMetadata(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to fetch snapshot metadata: %v", err)
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -525,8 +469,8 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
if len(snapshotID) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
||||
}
|
||||
snapshotIDMutex.LockKey(snapshotID)
|
||||
|
||||
snapshotIDMutex.LockKey(snapshotID)
|
||||
defer func() {
|
||||
if err := snapshotIDMutex.UnlockKey(snapshotID); err != nil {
|
||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", snapshotID, err)
|
||||
@ -534,95 +478,41 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
}()
|
||||
|
||||
rbdSnap := &rbdSnapshot{}
|
||||
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
|
||||
if err, ok := err.(*util.CacheEntryNotFound); ok {
|
||||
klog.V(3).Infof("metadata for snapshot %s not found, assuming the snapshot to be already deleted (%v)", snapshotID, err)
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
if err := genSnapFromSnapID(rbdSnap, snapshotID, req.GetSecrets()); err != nil {
|
||||
// Consider missing snap as already deleted, and proceed to remove the omap values
|
||||
if _, ok := err.(ErrSnapNotFound); !ok {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return nil, err
|
||||
if err := unreserveSnap(rbdSnap, req.GetSecrets()); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
}
|
||||
|
||||
// lock out parallel create requests against the same snap name as we
|
||||
// cleanup the image and associated omaps for the same
|
||||
snapshotNameMutex.LockKey(rbdSnap.RequestName)
|
||||
defer func() {
|
||||
if err := snapshotNameMutex.UnlockKey(rbdSnap.RequestName); err != nil {
|
||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", rbdSnap.RequestName, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Unprotect snapshot
|
||||
err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||
return nil, status.Errorf(codes.FailedPrecondition,
|
||||
"failed to unprotect snapshot: %s/%s with error: %v",
|
||||
rbdSnap.Pool, rbdSnap.RbdSnapName, err)
|
||||
}
|
||||
|
||||
// Deleting snapshot
|
||||
klog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
|
||||
klog.V(4).Infof("deleting Snaphot %s", rbdSnap.RbdSnapName)
|
||||
if err := deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()); err != nil {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||
return nil, status.Errorf(codes.FailedPrecondition,
|
||||
"failed to delete snapshot: %s/%s with error: %v",
|
||||
rbdSnap.Pool, rbdSnap.RbdSnapName, err)
|
||||
}
|
||||
|
||||
if err := cs.MetadataStore.Delete(snapshotID); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
delete(rbdSnapshots, snapshotID)
|
||||
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
}
|
||||
|
||||
// ListSnapshots lists the snapshots in the store
|
||||
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil {
|
||||
klog.Warningf("invalid list snapshot req: %v", req)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sourceVolumeID := req.GetSourceVolumeId()
|
||||
|
||||
// TODO (sngchlko) list with token
|
||||
// TODO (#94) protect concurrent access to global data structures
|
||||
|
||||
// list only a specific snapshot which has snapshot ID
|
||||
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
|
||||
if rbdSnap, ok := rbdSnapshots[snapshotID]; ok {
|
||||
// if source volume ID also set, check source volume id on the cache.
|
||||
if len(sourceVolumeID) != 0 && rbdSnap.SourceVolumeID != sourceVolumeID {
|
||||
return nil, status.Errorf(codes.Unknown, "Requested Source Volume ID %s is different from %s", sourceVolumeID, rbdSnap.SourceVolumeID)
|
||||
}
|
||||
return &csi.ListSnapshotsResponse{
|
||||
Entries: []*csi.ListSnapshotsResponse_Entry{
|
||||
{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: rbdSnap.SizeBytes,
|
||||
SnapshotId: rbdSnap.SnapID,
|
||||
SourceVolumeId: rbdSnap.SourceVolumeID,
|
||||
CreationTime: ×tamp.Timestamp{
|
||||
Seconds: rbdSnap.CreatedAt,
|
||||
},
|
||||
ReadyToUse: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return nil, status.Errorf(codes.NotFound, "Snapshot ID %s cannot found", snapshotID)
|
||||
|
||||
}
|
||||
|
||||
entries := []*csi.ListSnapshotsResponse_Entry{}
|
||||
for _, rbdSnap := range rbdSnapshots {
|
||||
// if source volume ID also set, check source volume id on the cache.
|
||||
if len(sourceVolumeID) != 0 && rbdSnap.SourceVolumeID != sourceVolumeID {
|
||||
continue
|
||||
}
|
||||
entries = append(entries, &csi.ListSnapshotsResponse_Entry{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: rbdSnap.SizeBytes,
|
||||
SnapshotId: rbdSnap.SnapID,
|
||||
SourceVolumeId: rbdSnap.SourceVolumeID,
|
||||
CreationTime: ×tamp.Timestamp{
|
||||
Seconds: rbdSnap.CreatedAt,
|
||||
},
|
||||
ReadyToUse: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return &csi.ListSnapshotsResponse{
|
||||
Entries: entries,
|
||||
}, nil
|
||||
}
|
||||
|
@ -1,81 +0,0 @@
|
||||
package rbd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
)
|
||||
|
||||
type testCachePersister struct {
|
||||
volumes map[string]rbdVolume
|
||||
snapshots map[string]rbdSnapshot
|
||||
}
|
||||
|
||||
func (t *testCachePersister) Create(identifier string, data interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testCachePersister) Get(identifier string, data interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testCachePersister) ForAll(pattern string, destObj interface{}, f util.ForAllFunc) error {
|
||||
|
||||
switch pattern {
|
||||
case "csi-rbd-vol-":
|
||||
for identifier, vol := range t.volumes {
|
||||
*destObj.(*rbdVolume) = vol
|
||||
if err := f(identifier); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case "csi-rbd-(.*)-snap-":
|
||||
for identifier, snap := range t.snapshots {
|
||||
*destObj.(*rbdSnapshot) = snap
|
||||
if err := f(identifier); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testCachePersister) Delete(identifier string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLoadExDataFromMetadataStore(t *testing.T) {
|
||||
cs := &ControllerServer{
|
||||
MetadataStore: &testCachePersister{
|
||||
volumes: map[string]rbdVolume{
|
||||
"item1": {
|
||||
VolID: "1",
|
||||
},
|
||||
"item2": {
|
||||
VolID: "2",
|
||||
},
|
||||
},
|
||||
snapshots: map[string]rbdSnapshot{
|
||||
"item1": {
|
||||
SnapID: "1",
|
||||
},
|
||||
"item2": {
|
||||
SnapID: "2",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := cs.LoadExDataFromMetadataStore(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if rbdVolumes["item1"] == rbdVolumes["item2"] {
|
||||
t.Error("rbd volume entries contain pointer to same volume")
|
||||
}
|
||||
|
||||
if rbdSnapshots["item1"] == rbdSnapshots["item2"] {
|
||||
t.Error("rbd snapshot entries contain pointer to same snapshot")
|
||||
}
|
||||
}
|
60
pkg/rbd/errors.go
Normal file
60
pkg/rbd/errors.go
Normal file
@ -0,0 +1,60 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package rbd
|
||||
|
||||
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool
|
||||
type ErrImageNotFound struct {
|
||||
imageName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrImageNotFound) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
|
||||
// given image
|
||||
type ErrSnapNotFound struct {
|
||||
snapName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrSnapNotFound) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// ErrSnapNameConflict is generated when a requested CSI snap name already exists on RBD but with
|
||||
// different properties, and hence is in conflict with the passed in CSI volume name
|
||||
type ErrSnapNameConflict struct {
|
||||
requestName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrSnapNameConflict) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// ErrVolNameConflict is generated when a requested CSI volume name already exists on RBD but with
|
||||
// different properties, and hence is in conflict with the passed in CSI volume name
|
||||
type ErrVolNameConflict struct {
|
||||
requestName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e ErrVolNameConflict) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
@ -24,6 +24,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/csi-common"
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"golang.org/x/net/context"
|
||||
@ -47,14 +48,25 @@ type NodeServer struct {
|
||||
// path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||
targetPath := req.GetTargetPath()
|
||||
targetPathMutex.LockKey(targetPath)
|
||||
disableInUseChecks := false
|
||||
if targetPath == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty target path in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume capability in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request")
|
||||
}
|
||||
|
||||
targetPathMutex.LockKey(targetPath)
|
||||
defer func() {
|
||||
if err := targetPathMutex.UnlockKey(targetPath); err != nil {
|
||||
klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err)
|
||||
}
|
||||
}()
|
||||
disableInUseChecks := false
|
||||
|
||||
volName, err := ns.getVolumeName(req)
|
||||
if err != nil {
|
||||
@ -82,11 +94,11 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
}
|
||||
}
|
||||
|
||||
volOptions, err := getRBDVolumeOptions(req.GetVolumeContext(), disableInUseChecks)
|
||||
volOptions, err := genVolFromVolumeOptions(req.GetVolumeContext(), disableInUseChecks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volOptions.VolName = volName
|
||||
volOptions.RbdImageName = volName
|
||||
// Mapping RBD image
|
||||
devicePath, err := attachRBDImage(volOptions, volOptions.UserID, req.GetSecrets())
|
||||
if err != nil {
|
||||
@ -103,22 +115,15 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
}
|
||||
|
||||
func (ns *NodeServer) getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
|
||||
var volName string
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
targetPath := req.GetTargetPath()
|
||||
if isBlock {
|
||||
// Get volName from targetPath
|
||||
s := strings.Split(targetPath, "/")
|
||||
volName = s[len(s)-1]
|
||||
} else {
|
||||
// Get volName from targetPath
|
||||
if !strings.HasSuffix(targetPath, "/mount") {
|
||||
return "", fmt.Errorf("rbd: malformed the value of target path: %s", targetPath)
|
||||
}
|
||||
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
||||
volName = s[len(s)-1]
|
||||
var vi util.CSIIdentifier
|
||||
|
||||
err := vi.DecomposeCSIID(req.GetVolumeId())
|
||||
if err != nil {
|
||||
klog.Errorf("error decoding volume ID (%s) (%s)", err, req.GetVolumeId())
|
||||
return "", status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
return volName, nil
|
||||
|
||||
return rbdImgNamePrefix + vi.ObjectUUID, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
|
||||
@ -187,6 +192,14 @@ func (ns *NodeServer) createTargetPath(targetPath string, isBlock bool) (bool, e
|
||||
// NodeUnpublishVolume unmounts the volume from the target path
|
||||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||
targetPath := req.GetTargetPath()
|
||||
if targetPath == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty target path in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "Empty volume ID in request")
|
||||
}
|
||||
|
||||
targetPathMutex.LockKey(targetPath)
|
||||
|
||||
defer func() {
|
||||
|
135
pkg/rbd/rbd.go
135
pkg/rbd/rbd.go
@ -27,10 +27,101 @@ import (
|
||||
"k8s.io/utils/nsenter"
|
||||
)
|
||||
|
||||
// PluginFolder defines the location of rbdplugin
|
||||
/*
|
||||
RADOS omaps usage:
|
||||
|
||||
This note details how we preserve idempotent nature of create requests and retain the relationship
|
||||
between orchestrator (CO) generated Names and plugin generated names for images and snapshots
|
||||
|
||||
The implementation uses Ceph RADOS omaps to preserve the relationship between request name and
|
||||
generated image (or snapshot) name. There are 4 types of omaps in use,
|
||||
- A "csi.volumes.[csi-id]" (or "csi.volumes"+.+CSIInstanceID), we call this the csiVolsDirectory
|
||||
- stores keys named using the CO generated names for volume requests
|
||||
- keys are named "csi.volume."+[CO generated VolName]
|
||||
- Key value contains the RBD image uuid that is created or will be created, for the CO provided
|
||||
name
|
||||
|
||||
- A "csi.snaps.[csi-id]" (or "csi.snaps"+.+CSIInstanceID), we refer to this as the csiSnapsDirectory
|
||||
- stores keys named using the CO generated names for snapshot requests
|
||||
- keys are named "csi.snap."+[CO generated SnapName]
|
||||
- Key value contains the RBD snapshot uuid that is created or will be created, for the CO
|
||||
provided name
|
||||
|
||||
- A per image omap named "rbd.csi.volume."+[RBD image uuid], we refer to this as the rbdImageOMap
|
||||
- stores a single key named "csi.volname", that has the value of the CO generated VolName that
|
||||
this image refers to
|
||||
|
||||
- A per snapshot omap named "rbd.csi.snap."+[RBD snapshot uuid], we refer to this as the snapOMap
|
||||
- stores a key named "csi.snapname", that has the value of the CO generated SnapName that this
|
||||
snapshot refers to
|
||||
- also stores another key named "csi.source", that has the value of the image name that is the
|
||||
source of the snapshot
|
||||
|
||||
Creation of omaps:
|
||||
When a volume create request is received (or a snapshot create, the snapshot is not detailed in this
|
||||
comment further as the process is similar),
|
||||
- The csiVolsDirectory is consulted to find if there is already a key with the CO VolName, and if present,
|
||||
it is used to read its references to reach the RBD image that backs this VolName, to check if the
|
||||
RBD image can satisfy the requirements for the request
|
||||
- If during the process of checking the same, it is found that some linking information is stale
|
||||
or missing, the corresponding keys upto the key in the csiVolsDirectory is cleaned up, to start afresh
|
||||
- If the key with the CO VolName is not found, or was cleaned up, the request is treated as a
|
||||
new create request, and an rbdImageOMap is created first with a generated uuid, this ensures that we
|
||||
do not use a uuid that is already in use
|
||||
- Next, a key with the VolName is created in the csiVolsDirectory, and its value is updated to store the
|
||||
generated uuid
|
||||
- This is followed by updating the rbdImageOMap with the VolName in the rbdImageCSIVolNameKey
|
||||
- Finally, the image is created (or promoted from a snapshot, if content source was provided) using
|
||||
the uuid and a corresponding image name prefix (rbdImgNamePrefix or rbdSnapNamePrefix)
|
||||
|
||||
The entire operation is locked based on VolName hash, to ensure there is only ever a single entity
|
||||
modifying the related omaps for a given VolName.
|
||||
|
||||
This ensures idempotent nature of creates, as the same CO generated VolName would attempt to use
|
||||
the same RBD image name to serve the request, as the relations are saved in the respective omaps.
|
||||
|
||||
Deletion of omaps:
|
||||
Delete requests would not contain the VolName, hence deletion uses the volume ID, which is encoded
|
||||
with the image name in it, to find the image and the rbdImageOMap. The rbdImageOMap is read to get
|
||||
the VolName that this image points to. This VolName can be further used to read and delete the key
|
||||
from the csiVolsDirectory.
|
||||
|
||||
As we trace back and find the VolName, we also take a hash based lock on the VolName before
|
||||
proceeding with deleting the image and the related omap entries, to ensure there is only ever a
|
||||
single entity modifying the related omaps for a given VolName.
|
||||
*/
|
||||
|
||||
const (
|
||||
rbdDefaultAdminID = "admin"
|
||||
rbdDefaultUserID = rbdDefaultAdminID
|
||||
// volIDVersion is the version number of volume ID encoding scheme
|
||||
volIDVersion uint16 = 1
|
||||
rbdDefaultAdminID = "admin"
|
||||
rbdDefaultUserID = rbdDefaultAdminID
|
||||
|
||||
// csiConfigFile is the location of the CSI config file
|
||||
csiConfigFile = "/etc/ceph-csi-config/config.json"
|
||||
|
||||
// CSI volume-name keyname prefix, for key in csiVolsDirectory, suffix is the CSI passed volume name
|
||||
csiVolNameKeyPrefix = "csi.volume."
|
||||
// Per RBD image object map name prefix, suffix is the RBD image uuid
|
||||
rbdImageOMapPrefix = "csi.volume."
|
||||
// CSI volume-name key in per RBD image object map, containing CSI volume-name for which the
|
||||
// image was created
|
||||
rbdImageCSIVolNameKey = "csi.volname"
|
||||
// RBD image name prefix, suffix is a uuid generated per image
|
||||
rbdImgNamePrefix = "csi-vol-"
|
||||
|
||||
//CSI snap-name keyname prefix, for key in csiSnapsDirectory, suffix is the CSI passed snapshot name
|
||||
csiSnapNameKeyPrefix = "csi.snap."
|
||||
// Per RBD snapshot object map name prefix, suffix is the RBD image uuid
|
||||
rbdSnapOMapPrefix = "csi.snap."
|
||||
// CSI snap-name key in per RBD snapshot object map, containing CSI snapshot-name for which the
|
||||
// snapshot was created
|
||||
rbdSnapCSISnapNameKey = "csi.snapname"
|
||||
// source image name key in per RBD snapshot object map, containing RBD source image name for
|
||||
// which the snapshot was created
|
||||
rbdSnapSourceImageKey = "csi.source"
|
||||
// RBD snapshot name prefix, suffix is a uuid generated per snapshot
|
||||
rbdSnapNamePrefix = "csi-snap-"
|
||||
)
|
||||
|
||||
// PluginFolder defines the location of ceph plugin
|
||||
@ -47,8 +138,14 @@ type Driver struct {
|
||||
|
||||
var (
|
||||
version = "1.0.0"
|
||||
// confStore is the global config store
|
||||
confStore *util.ConfigStore
|
||||
// CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing
|
||||
// ceph clusters across CSI instances, to differentiate omap names per CSI instance
|
||||
CSIInstanceID = "default"
|
||||
// csiVolsDirectory is the name of the CSI volumes object map that contains CSI volume-name
|
||||
// based keys
|
||||
csiVolsDirectory = "csi.volumes"
|
||||
// csiSnapsDirectory is the name of the CSI snapshots object map that contains CSI snapshot-name based keys
|
||||
csiSnapsDirectory = "csi.snaps"
|
||||
)
|
||||
|
||||
// NewDriver returns new rbd driver
|
||||
@ -64,10 +161,9 @@ func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
||||
}
|
||||
|
||||
// NewControllerServer initialize a controller server for rbd CSI driver
|
||||
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
||||
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
|
||||
return &ControllerServer{
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
MetadataStore: cachePersister,
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,16 +185,23 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
// rbd CSI driver which can serve multiple parallel requests
|
||||
func (r *Driver) Run(driverName, nodeID, endpoint, configRoot string, containerized bool, cachePersister util.CachePersister) {
|
||||
func (r *Driver) Run(driverName, nodeID, endpoint, instanceID string, containerized bool) {
|
||||
var err error
|
||||
|
||||
klog.Infof("Driver: %v version: %v", driverName, version)
|
||||
|
||||
// Initialize config store
|
||||
confStore, err = util.NewConfigStore(configRoot)
|
||||
if err != nil {
|
||||
klog.Fatalln("Failed to initialize config store.")
|
||||
// Create ceph.conf for use with CLI commands
|
||||
if err = util.WriteCephConfig(); err != nil {
|
||||
klog.Fatalf("failed to write ceph configuration file (%v)", err)
|
||||
}
|
||||
|
||||
// Use passed in instance ID, if provided for omap suffix naming
|
||||
if instanceID != "" {
|
||||
CSIInstanceID = instanceID
|
||||
}
|
||||
csiVolsDirectory = csiVolsDirectory + "." + CSIInstanceID
|
||||
csiSnapsDirectory = csiSnapsDirectory + "." + CSIInstanceID
|
||||
|
||||
// Initialize default library driver
|
||||
r.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
|
||||
if r.cd == nil {
|
||||
@ -106,9 +209,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, configRoot string, containeri
|
||||
}
|
||||
r.cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
|
||||
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
|
||||
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
|
||||
})
|
||||
|
||||
@ -127,11 +228,7 @@ func (r *Driver) Run(driverName, nodeID, endpoint, configRoot string, containeri
|
||||
klog.Fatalf("failed to start node server, err %v\n", err)
|
||||
}
|
||||
|
||||
r.cs = NewControllerServer(r.cd, cachePersister)
|
||||
|
||||
if err = r.cs.LoadExDataFromMetadataStore(); err != nil {
|
||||
klog.Fatalf("failed to load metadata from store, err %v\n", err)
|
||||
}
|
||||
r.cs = NewControllerServer(r.cd)
|
||||
|
||||
s := csicommon.NewNonBlockingGRPCServer()
|
||||
s.Start(endpoint, r.ids, r.cs, r.ns)
|
||||
|
@ -227,7 +227,7 @@ func checkRbdNbdTools() bool {
|
||||
func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string]string) (string, error) {
|
||||
var err error
|
||||
|
||||
image := volOptions.VolName
|
||||
image := volOptions.RbdImageName
|
||||
imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image)
|
||||
|
||||
useNBD := false
|
||||
@ -271,16 +271,11 @@ func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string
|
||||
}
|
||||
|
||||
func createPath(volOpt *rbdVolume, userID string, creds map[string]string) (string, error) {
|
||||
image := volOpt.VolName
|
||||
image := volOpt.RbdImageName
|
||||
imagePath := fmt.Sprintf("%s/%s", volOpt.Pool, image)
|
||||
|
||||
mon, err := getMon(volOpt, creds)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
klog.V(5).Infof("rbd: map mon %s", mon)
|
||||
key, err := getRBDKey(volOpt.ClusterID, userID, creds)
|
||||
klog.V(5).Infof("rbd: map mon %s", volOpt.Monitors)
|
||||
key, err := getKey(userID, creds)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -293,7 +288,7 @@ func createPath(volOpt *rbdVolume, userID string, creds map[string]string) (stri
|
||||
}
|
||||
|
||||
output, err := execCommand(cmdName, []string{
|
||||
"map", imagePath, "--id", userID, "-m", mon, "--key=" + key})
|
||||
"map", imagePath, "--id", userID, "-m", volOpt.Monitors, "--key=" + key})
|
||||
if err != nil {
|
||||
klog.Warningf("rbd: map error %v, rbd output: %s", err, string(output))
|
||||
return "", fmt.Errorf("rbd: map failed %v, rbd output: %s", err, string(output))
|
||||
@ -306,7 +301,7 @@ func createPath(volOpt *rbdVolume, userID string, creds map[string]string) (stri
|
||||
}
|
||||
|
||||
func waitForrbdImage(backoff wait.Backoff, volOptions *rbdVolume, userID string, credentials map[string]string) error {
|
||||
image := volOptions.VolName
|
||||
image := volOptions.RbdImageName
|
||||
imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image)
|
||||
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
|
@ -17,11 +17,16 @@ limitations under the License.
|
||||
package rbd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog"
|
||||
@ -39,35 +44,45 @@ const (
|
||||
rbdDefaultMounter = "rbd"
|
||||
)
|
||||
|
||||
// rbdVolume represents a CSI volume and its RBD image specifics
|
||||
type rbdVolume struct {
|
||||
VolName string `json:"volName"`
|
||||
VolID string `json:"volID"`
|
||||
Monitors string `json:"monitors"`
|
||||
MonValueFromSecret string `json:"monValueFromSecret"`
|
||||
Pool string `json:"pool"`
|
||||
ImageFormat string `json:"imageFormat"`
|
||||
ImageFeatures string `json:"imageFeatures"`
|
||||
VolSize int64 `json:"volSize"`
|
||||
AdminID string `json:"adminId"`
|
||||
UserID string `json:"userId"`
|
||||
Mounter string `json:"mounter"`
|
||||
DisableInUseChecks bool `json:"disableInUseChecks"`
|
||||
ClusterID string `json:"clusterId"`
|
||||
// RbdImageName is the name of the RBD image backing this rbdVolume
|
||||
// VolID is the volume ID that is exchanged with CSI drivers, identifying this rbdVol
|
||||
// RequestName is the CSI generated volume name for the rbdVolume
|
||||
RbdImageName string
|
||||
VolID string
|
||||
Monitors string
|
||||
Pool string
|
||||
ImageFormat string
|
||||
ImageFeatures string
|
||||
VolSize int64
|
||||
AdminID string
|
||||
UserID string
|
||||
Mounter string
|
||||
DisableInUseChecks bool
|
||||
ClusterID string
|
||||
RequestName string
|
||||
}
|
||||
|
||||
// rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics
|
||||
type rbdSnapshot struct {
|
||||
SourceVolumeID string `json:"sourceVolumeID"`
|
||||
VolName string `json:"volName"`
|
||||
SnapName string `json:"snapName"`
|
||||
SnapID string `json:"sanpID"`
|
||||
Monitors string `json:"monitors"`
|
||||
MonValueFromSecret string `json:"monValueFromSecret"`
|
||||
Pool string `json:"pool"`
|
||||
CreatedAt int64 `json:"createdAt"`
|
||||
SizeBytes int64 `json:"sizeBytes"`
|
||||
AdminID string `json:"adminId"`
|
||||
UserID string `json:"userId"`
|
||||
ClusterID string `json:"clusterId"`
|
||||
// SourceVolumeID is the volume ID of RbdImageName, that is exchanged with CSI drivers
|
||||
// RbdImageName is the name of the RBD image, that is this rbdSnapshot's source image
|
||||
// RbdSnapName is the name of the RBD snapshot backing this rbdSnapshot
|
||||
// SnapID is the snapshot ID that is exchanged with CSI drivers, identifying this rbdSnapshot
|
||||
// RequestName is the CSI generated snapshot name for the rbdSnapshot
|
||||
SourceVolumeID string
|
||||
RbdImageName string
|
||||
RbdSnapName string
|
||||
SnapID string
|
||||
Monitors string
|
||||
Pool string
|
||||
CreatedAt *timestamp.Timestamp
|
||||
SizeBytes int64
|
||||
AdminID string
|
||||
UserID string
|
||||
ClusterID string
|
||||
RequestName string
|
||||
}
|
||||
|
||||
var (
|
||||
@ -87,67 +102,36 @@ var (
|
||||
supportedFeatures = sets.NewString("layering")
|
||||
)
|
||||
|
||||
func getRBDKey(clusterid, id string, credentials map[string]string) (string, error) {
|
||||
func getKey(id string, credentials map[string]string) (string, error) {
|
||||
var (
|
||||
ok bool
|
||||
err error
|
||||
key string
|
||||
ok bool
|
||||
)
|
||||
|
||||
if key, ok = credentials[id]; !ok {
|
||||
if clusterid != "" {
|
||||
key, err = confStore.KeyForUser(clusterid, id)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("RBD key for ID: %s not found in config store of clusterID (%s)", id, clusterid)
|
||||
}
|
||||
} else {
|
||||
return "", fmt.Errorf("RBD key for ID: %s not found", id)
|
||||
}
|
||||
return "", fmt.Errorf("RBD key for ID: %s not found", id)
|
||||
}
|
||||
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func getMon(pOpts *rbdVolume, credentials map[string]string) (string, error) {
|
||||
mon := pOpts.Monitors
|
||||
if len(mon) == 0 {
|
||||
// if mons are set in secret, retrieve them
|
||||
if len(pOpts.MonValueFromSecret) == 0 {
|
||||
// yet another sanity check
|
||||
return "", errors.New("either monitors or monValueFromSecret must be set")
|
||||
}
|
||||
val, ok := credentials[pOpts.MonValueFromSecret]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("mon data %s is not set in secret", pOpts.MonValueFromSecret)
|
||||
}
|
||||
mon = val
|
||||
|
||||
}
|
||||
return mon, nil
|
||||
}
|
||||
|
||||
// CreateImage creates a new ceph image with provision and volume options.
|
||||
func createRBDImage(pOpts *rbdVolume, volSz int, adminID string, credentials map[string]string) error {
|
||||
// createImage creates a new ceph image with provision and volume options.
|
||||
func createImage(pOpts *rbdVolume, volSz int64, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
mon, err := getMon(pOpts, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
image := pOpts.VolName
|
||||
image := pOpts.RbdImageName
|
||||
volSzMiB := fmt.Sprintf("%dM", volSz)
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pOpts.ImageFormat == rbdImageFormat2 {
|
||||
klog.V(4).Infof("rbd: create %s size %s format %s (features: %s) using mon %s, pool %s ", image, volSzMiB, pOpts.ImageFormat, pOpts.ImageFeatures, mon, pOpts.Pool)
|
||||
klog.V(4).Infof("rbd: create %s size %s format %s (features: %s) using mon %s, pool %s ", image, volSzMiB, pOpts.ImageFormat, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool)
|
||||
} else {
|
||||
klog.V(4).Infof("rbd: create %s size %s format %s using mon %s, pool %s", image, volSzMiB, pOpts.ImageFormat, mon, pOpts.Pool)
|
||||
klog.V(4).Infof("rbd: create %s size %s format %s using mon %s, pool %s", image, volSzMiB, pOpts.ImageFormat, pOpts.Monitors, pOpts.Pool)
|
||||
}
|
||||
args := []string{"create", image, "--size", volSzMiB, "--pool", pOpts.Pool, "--id", adminID, "-m", mon, "--key=" + key, "--image-format", pOpts.ImageFormat}
|
||||
args := []string{"create", image, "--size", volSzMiB, "--pool", pOpts.Pool, "--id", adminID, "-m", pOpts.Monitors, "--key=" + key, "--image-format", pOpts.ImageFormat}
|
||||
if pOpts.ImageFormat == rbdImageFormat2 {
|
||||
args = append(args, "--image-feature", pOpts.ImageFeatures)
|
||||
}
|
||||
@ -166,21 +150,16 @@ func rbdStatus(pOpts *rbdVolume, userID string, credentials map[string]string) (
|
||||
var output string
|
||||
var cmd []byte
|
||||
|
||||
image := pOpts.VolName
|
||||
image := pOpts.RbdImageName
|
||||
// If we don't have admin id/secret (e.g. attaching), fallback to user id/secret.
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, userID, credentials)
|
||||
key, err := getKey(userID, credentials)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
|
||||
mon, err := getMon(pOpts, credentials)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("rbd: status %s using mon %s, pool %s", image, mon, pOpts.Pool)
|
||||
args := []string{"status", image, "--pool", pOpts.Pool, "-m", mon, "--id", userID, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: status %s using mon %s, pool %s", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"status", image, "--pool", pOpts.Pool, "-m", pOpts.Monitors, "--id", userID, "--key=" + key}
|
||||
cmd, err = execCommand("rbd", args)
|
||||
output = string(cmd)
|
||||
|
||||
@ -205,10 +184,11 @@ func rbdStatus(pOpts *rbdVolume, userID string, credentials map[string]string) (
|
||||
return false, output, nil
|
||||
}
|
||||
|
||||
// DeleteImage deletes a ceph image with provision and volume options.
|
||||
func deleteRBDImage(pOpts *rbdVolume, adminID string, credentials map[string]string) error {
|
||||
// deleteImage deletes a ceph image with provision and volume options.
|
||||
func deleteImage(pOpts *rbdVolume, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
image := pOpts.VolName
|
||||
|
||||
image := pOpts.RbdImageName
|
||||
found, _, err := rbdStatus(pOpts, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -217,22 +197,190 @@ func deleteRBDImage(pOpts *rbdVolume, adminID string, credentials map[string]str
|
||||
klog.Info("rbd is still being used ", image)
|
||||
return fmt.Errorf("rbd %s is still being used", image)
|
||||
}
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mon, err := getMon(pOpts, credentials)
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("rbd: rm %s using mon %s, pool %s", image, mon, pOpts.Pool)
|
||||
args := []string{"rm", image, "--pool", pOpts.Pool, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: rm %s using mon %s, pool %s", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"rm", image, "--pool", pOpts.Pool, "--id", adminID, "-m", pOpts.Monitors,
|
||||
"--key=" + key}
|
||||
output, err = execCommand("rbd", args)
|
||||
if err == nil {
|
||||
return nil
|
||||
if err != nil {
|
||||
klog.Errorf("failed to delete rbd image: %v, command output: %s", err, string(output))
|
||||
return err
|
||||
}
|
||||
klog.Errorf("failed to delete rbd image: %v, command output: %s", err, string(output))
|
||||
|
||||
err = unreserveVol(pOpts, credentials)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to remove reservation for volume (%s) with backing image (%s) (%s)",
|
||||
pOpts.RequestName, pOpts.RbdImageName, err)
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// updateSnapWithImageInfo updates provided rbdSnapshot with information from on-disk data
|
||||
// regarding the same
|
||||
func updateSnapWithImageInfo(rbdSnap *rbdSnapshot, credentials map[string]string) error {
|
||||
key, err := getKey(rbdSnap.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapInfo, err := getSnapInfo(rbdSnap.Monitors, rbdSnap.AdminID, key,
|
||||
rbdSnap.Pool, rbdSnap.RbdImageName, rbdSnap.RbdSnapName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.SizeBytes = snapInfo.Size
|
||||
|
||||
tm, err := time.Parse(time.ANSIC, snapInfo.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.CreatedAt, err = ptypes.TimestampProto(tm)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// updateVolWithImageInfo updates provided rbdVolume with information from on-disk data
|
||||
// regarding the same
|
||||
func updateVolWithImageInfo(rbdVol *rbdVolume, credentials map[string]string) error {
|
||||
key, err := getKey(rbdVol.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imageInfo, err := getImageInfo(rbdVol.Monitors, rbdVol.AdminID, key,
|
||||
rbdVol.Pool, rbdVol.RbdImageName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if imageInfo.Format != 2 {
|
||||
return fmt.Errorf("unknown or unsupported image format (%d) returned for image (%s)",
|
||||
imageInfo.Format, rbdVol.RbdImageName)
|
||||
}
|
||||
rbdVol.ImageFormat = rbdImageFormat2
|
||||
|
||||
rbdVol.VolSize = imageInfo.Size
|
||||
rbdVol.ImageFeatures = strings.Join(imageInfo.Features, ",")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// genSnapFromSnapID generates a rbdSnapshot structure from the provided identifier, updating
|
||||
// the structure with elements from on-disk snapshot metadata as well
|
||||
func genSnapFromSnapID(rbdSnap *rbdSnapshot, snapshotID string, credentials map[string]string) error {
|
||||
var (
|
||||
options map[string]string
|
||||
vi util.CSIIdentifier
|
||||
)
|
||||
options = make(map[string]string)
|
||||
|
||||
rbdSnap.SnapID = snapshotID
|
||||
|
||||
err := vi.DecomposeCSIID(rbdSnap.SnapID)
|
||||
if err != nil {
|
||||
klog.Errorf("error decoding snapshot ID (%s) (%s)", err, rbdSnap.SnapID)
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.ClusterID = vi.ClusterID
|
||||
options["clusterID"] = rbdSnap.ClusterID
|
||||
rbdSnap.RbdSnapName = rbdSnapNamePrefix + vi.ObjectUUID
|
||||
|
||||
rbdSnap.Monitors, _, err = getMonsAndClusterID(options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.AdminID, rbdSnap.UserID = getIDs(options)
|
||||
|
||||
key, err := getKey(rbdSnap.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.PoolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: fetch all omap vals in one call, than make multiple listomapvals
|
||||
snapUUID := strings.TrimPrefix(rbdSnap.RbdSnapName, rbdSnapNamePrefix)
|
||||
rbdSnap.RequestName, err = util.GetOMapValue(rbdSnap.Monitors, rbdSnap.AdminID,
|
||||
key, rbdSnap.Pool, rbdSnapOMapPrefix+snapUUID, rbdSnapCSISnapNameKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdSnap.RbdImageName, err = util.GetOMapValue(rbdSnap.Monitors, rbdSnap.AdminID,
|
||||
key, rbdSnap.Pool, rbdSnapOMapPrefix+snapUUID, rbdSnapSourceImageKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = updateSnapWithImageInfo(rbdSnap, credentials)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating
|
||||
// the structure with elements from on-disk image metadata as well
|
||||
func genVolFromVolID(rbdVol *rbdVolume, volumeID string, credentials map[string]string) error {
|
||||
var (
|
||||
options map[string]string
|
||||
vi util.CSIIdentifier
|
||||
)
|
||||
options = make(map[string]string)
|
||||
|
||||
// rbdVolume fields that are not filled up in this function are:
|
||||
// Mounter, MultiNodeWritable
|
||||
rbdVol.VolID = volumeID
|
||||
|
||||
err := vi.DecomposeCSIID(rbdVol.VolID)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("error decoding volume ID (%s) (%s)", err, rbdVol.VolID)
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
rbdVol.ClusterID = vi.ClusterID
|
||||
options["clusterID"] = rbdVol.ClusterID
|
||||
rbdVol.RbdImageName = rbdImgNamePrefix + vi.ObjectUUID
|
||||
|
||||
rbdVol.Monitors, _, err = getMonsAndClusterID(options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdVol.AdminID, rbdVol.UserID = getIDs(options)
|
||||
|
||||
key, err := getKey(rbdVol.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, rbdVol.AdminID, key,
|
||||
vi.PoolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imageUUID := strings.TrimPrefix(rbdVol.RbdImageName, rbdImgNamePrefix)
|
||||
rbdVol.RequestName, err = util.GetOMapValue(rbdVol.Monitors, rbdVol.AdminID,
|
||||
key, rbdVol.Pool, rbdImageOMapPrefix+imageUUID, rbdImageCSIVolNameKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = updateVolWithImageInfo(rbdVol, credentials)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -242,41 +390,29 @@ func execCommand(command string, args []string) ([]byte, error) {
|
||||
return cmd.CombinedOutput()
|
||||
}
|
||||
|
||||
func getMonsAndClusterID(options map[string]string) (monitors, clusterID, monInSecret string, err error) {
|
||||
func getMonsAndClusterID(options map[string]string) (monitors, clusterID string, err error) {
|
||||
var ok bool
|
||||
|
||||
monitors, ok = options["monitors"]
|
||||
if !ok {
|
||||
// if mons are not set in options, check if they are set in secret
|
||||
if monInSecret, ok = options["monValueFromSecret"]; !ok {
|
||||
// if mons are not in secret, check if we have a cluster-id
|
||||
if clusterID, ok = options["clusterID"]; !ok {
|
||||
err = errors.New("either monitors or monValueFromSecret or clusterID must be set")
|
||||
return
|
||||
}
|
||||
if clusterID, ok = options["clusterID"]; !ok {
|
||||
err = errors.New("clusterID must be set")
|
||||
return
|
||||
}
|
||||
|
||||
if monitors, err = confStore.Mons(clusterID); err != nil {
|
||||
klog.Errorf("failed getting mons (%s)", err)
|
||||
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", clusterID)
|
||||
return
|
||||
}
|
||||
}
|
||||
if monitors, err = util.Mons(csiConfigFile, clusterID); err != nil {
|
||||
klog.Errorf("failed getting mons (%s)", err)
|
||||
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", clusterID)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getIDs(options map[string]string, clusterID string) (adminID, userID string, err error) {
|
||||
func getIDs(options map[string]string) (adminID, userID string) {
|
||||
var ok bool
|
||||
|
||||
adminID, ok = options["adminid"]
|
||||
switch {
|
||||
case ok:
|
||||
case clusterID != "":
|
||||
if adminID, err = confStore.AdminID(clusterID); err != nil {
|
||||
klog.Errorf("failed getting adminID (%s)", err)
|
||||
return "", "", fmt.Errorf("failed to fetch adminID for clusterID (%s)", clusterID)
|
||||
}
|
||||
default:
|
||||
adminID = rbdDefaultAdminID
|
||||
}
|
||||
@ -284,19 +420,14 @@ func getIDs(options map[string]string, clusterID string) (adminID, userID string
|
||||
userID, ok = options["userid"]
|
||||
switch {
|
||||
case ok:
|
||||
case clusterID != "":
|
||||
if userID, err = confStore.UserID(clusterID); err != nil {
|
||||
klog.Errorf("failed getting userID (%s)", err)
|
||||
return "", "", fmt.Errorf("failed to fetch userID using clusterID (%s)", clusterID)
|
||||
}
|
||||
default:
|
||||
userID = rbdDefaultUserID
|
||||
}
|
||||
|
||||
return adminID, userID, err
|
||||
return adminID, userID
|
||||
}
|
||||
|
||||
func getRBDVolumeOptions(volOptions map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
|
||||
func genVolFromVolumeOptions(volOptions map[string]string, disableInUseChecks bool) (*rbdVolume, error) {
|
||||
var (
|
||||
ok bool
|
||||
err error
|
||||
@ -308,7 +439,7 @@ func getRBDVolumeOptions(volOptions map[string]string, disableInUseChecks bool)
|
||||
return nil, errors.New("missing required parameter pool")
|
||||
}
|
||||
|
||||
rbdVol.Monitors, rbdVol.ClusterID, rbdVol.MonValueFromSecret, err = getMonsAndClusterID(volOptions)
|
||||
rbdVol.Monitors, rbdVol.ClusterID, err = getMonsAndClusterID(volOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -326,7 +457,8 @@ func getRBDVolumeOptions(volOptions map[string]string, disableInUseChecks bool)
|
||||
arr := strings.Split(imageFeatures, ",")
|
||||
for _, f := range arr {
|
||||
if !supportedFeatures.Has(f) {
|
||||
return nil, fmt.Errorf("invalid feature %q for volume csi-rbdplugin, supported features are: %v", f, supportedFeatures)
|
||||
return nil, fmt.Errorf("invalid feature %q for volume csi-rbdplugin, supported"+
|
||||
" features are: %v", f, supportedFeatures)
|
||||
}
|
||||
}
|
||||
rbdVol.ImageFeatures = imageFeatures
|
||||
@ -337,55 +469,43 @@ func getRBDVolumeOptions(volOptions map[string]string, disableInUseChecks bool)
|
||||
klog.V(3).Infof("setting disableInUseChecks on rbd volume to: %v", disableInUseChecks)
|
||||
rbdVol.DisableInUseChecks = disableInUseChecks
|
||||
|
||||
err = getCredsFromVol(rbdVol, volOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getCredsFromVol(rbdVol, volOptions)
|
||||
|
||||
return rbdVol, nil
|
||||
}
|
||||
|
||||
func getCredsFromVol(rbdVol *rbdVolume, volOptions map[string]string) error {
|
||||
var (
|
||||
ok bool
|
||||
err error
|
||||
)
|
||||
func getCredsFromVol(rbdVol *rbdVolume, volOptions map[string]string) {
|
||||
var ok bool
|
||||
|
||||
rbdVol.AdminID, rbdVol.UserID, err = getIDs(volOptions, rbdVol.ClusterID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rbdVol.AdminID, rbdVol.UserID = getIDs(volOptions)
|
||||
|
||||
rbdVol.Mounter, ok = volOptions["mounter"]
|
||||
if !ok {
|
||||
rbdVol.Mounter = rbdDefaultMounter
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func getRBDSnapshotOptions(snapOptions map[string]string) (*rbdSnapshot, error) {
|
||||
func genSnapFromOptions(rbdVol *rbdVolume, snapOptions map[string]string) *rbdSnapshot {
|
||||
var (
|
||||
ok bool
|
||||
err error
|
||||
ok bool
|
||||
)
|
||||
|
||||
rbdSnap := &rbdSnapshot{}
|
||||
rbdSnap.Pool, ok = snapOptions["pool"]
|
||||
if !ok {
|
||||
return nil, errors.New("missing required parameter pool")
|
||||
rbdSnap.Pool = rbdVol.Pool
|
||||
}
|
||||
|
||||
rbdSnap.Monitors, rbdSnap.ClusterID, rbdSnap.MonValueFromSecret, err = getMonsAndClusterID(snapOptions)
|
||||
rbdSnap.Monitors, rbdSnap.ClusterID, err = getMonsAndClusterID(snapOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
rbdSnap.Monitors = rbdVol.Monitors
|
||||
rbdSnap.ClusterID = rbdVol.ClusterID
|
||||
}
|
||||
|
||||
rbdSnap.AdminID, rbdSnap.UserID, err = getIDs(snapOptions, rbdSnap.ClusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rbdSnap, nil
|
||||
rbdSnap.AdminID, rbdSnap.UserID = getIDs(snapOptions)
|
||||
|
||||
return rbdSnap
|
||||
}
|
||||
|
||||
func hasSnapshotFeature(imageFeatures string) bool {
|
||||
@ -398,67 +518,20 @@ func hasSnapshotFeature(imageFeatures string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func getRBDVolumeByID(volumeID string) (*rbdVolume, error) {
|
||||
if rbdVol, ok := rbdVolumes[volumeID]; ok {
|
||||
return &rbdVol, nil
|
||||
}
|
||||
return nil, fmt.Errorf("volume id %s does not exit in the volumes list", volumeID)
|
||||
}
|
||||
|
||||
func getRBDVolumeByName(volName string) (*rbdVolume, error) {
|
||||
for _, rbdVol := range rbdVolumes {
|
||||
if rbdVol.VolName == volName {
|
||||
v := rbdVol
|
||||
return &v, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("volume name %s does not exit in the volumes list", volName)
|
||||
}
|
||||
|
||||
func getRBDSnapshotByName(snapName string) (*rbdSnapshot, error) {
|
||||
for _, rbdSnap := range rbdSnapshots {
|
||||
if rbdSnap.SnapName == snapName {
|
||||
s := rbdSnap
|
||||
return &s, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("snapshot name %s does not exit in the snapshots list", snapName)
|
||||
}
|
||||
|
||||
func getSnapMon(pOpts *rbdSnapshot, credentials map[string]string) (string, error) {
|
||||
mon := pOpts.Monitors
|
||||
if len(mon) == 0 {
|
||||
// if mons are set in secret, retrieve them
|
||||
if len(pOpts.MonValueFromSecret) == 0 {
|
||||
// yet another sanity check
|
||||
return "", errors.New("either monitors or monValueFromSecret must be set")
|
||||
}
|
||||
val, ok := credentials[pOpts.MonValueFromSecret]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("mon data %s is not set in secret", pOpts.MonValueFromSecret)
|
||||
}
|
||||
mon = val
|
||||
}
|
||||
return mon, nil
|
||||
}
|
||||
|
||||
func protectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
image := pOpts.VolName
|
||||
snapID := pOpts.SnapID
|
||||
image := pOpts.RbdImageName
|
||||
snapName := pOpts.RbdSnapName
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mon, err := getSnapMon(pOpts, credentials)
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("rbd: snap protect %s using mon %s, pool %s ", image, mon, pOpts.Pool)
|
||||
args := []string{"snap", "protect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: snap protect %s using mon %s, pool %s ", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"snap", "protect", "--pool", pOpts.Pool, "--snap", snapName, image, "--id",
|
||||
adminID, "-m", pOpts.Monitors, "--key=" + key}
|
||||
|
||||
output, err = execCommand("rbd", args)
|
||||
|
||||
@ -469,54 +542,19 @@ func protectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractStoredVolOpt(r rbdVolume) map[string]string {
|
||||
volOptions := make(map[string]string)
|
||||
volOptions["pool"] = r.Pool
|
||||
|
||||
if len(r.Monitors) > 0 {
|
||||
volOptions["monitors"] = r.Monitors
|
||||
}
|
||||
|
||||
if len(r.MonValueFromSecret) > 0 {
|
||||
volOptions["monValueFromSecret"] = r.MonValueFromSecret
|
||||
}
|
||||
|
||||
volOptions["imageFormat"] = r.ImageFormat
|
||||
|
||||
if len(r.ImageFeatures) > 0 {
|
||||
volOptions["imageFeatures"] = r.ImageFeatures
|
||||
}
|
||||
|
||||
if len(r.AdminID) > 0 {
|
||||
volOptions["adminId"] = r.AdminID
|
||||
}
|
||||
|
||||
if len(r.UserID) > 0 {
|
||||
volOptions["userId"] = r.UserID
|
||||
}
|
||||
if len(r.Mounter) > 0 {
|
||||
volOptions["mounter"] = r.Mounter
|
||||
}
|
||||
return volOptions
|
||||
}
|
||||
|
||||
func createSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
mon, err := getSnapMon(pOpts, credentials)
|
||||
image := pOpts.RbdImageName
|
||||
snapName := pOpts.RbdSnapName
|
||||
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
image := pOpts.VolName
|
||||
snapID := pOpts.SnapID
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("rbd: snap create %s using mon %s, pool %s", image, mon, pOpts.Pool)
|
||||
args := []string{"snap", "create", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: snap create %s using mon %s, pool %s", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"snap", "create", "--pool", pOpts.Pool, "--snap", snapName, image,
|
||||
"--id", adminID, "-m", pOpts.Monitors, "--key=" + key}
|
||||
|
||||
output, err = execCommand("rbd", args)
|
||||
|
||||
@ -530,20 +568,16 @@ func createSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]s
|
||||
func unprotectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
mon, err := getSnapMon(pOpts, credentials)
|
||||
image := pOpts.RbdImageName
|
||||
snapName := pOpts.RbdSnapName
|
||||
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
image := pOpts.VolName
|
||||
snapID := pOpts.SnapID
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("rbd: snap unprotect %s using mon %s, pool %s", image, mon, pOpts.Pool)
|
||||
args := []string{"snap", "unprotect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: snap unprotect %s using mon %s, pool %s", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"snap", "unprotect", "--pool", pOpts.Pool, "--snap", snapName, image, "--id",
|
||||
adminID, "-m", pOpts.Monitors, "--key=" + key}
|
||||
|
||||
output, err = execCommand("rbd", args)
|
||||
|
||||
@ -557,20 +591,16 @@ func unprotectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[strin
|
||||
func deleteSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
mon, err := getSnapMon(pOpts, credentials)
|
||||
image := pOpts.RbdImageName
|
||||
snapName := pOpts.RbdSnapName
|
||||
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
image := pOpts.VolName
|
||||
snapID := pOpts.SnapID
|
||||
|
||||
key, err := getRBDKey(pOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("rbd: snap rm %s using mon %s, pool %s", image, mon, pOpts.Pool)
|
||||
args := []string{"snap", "rm", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: snap rm %s using mon %s, pool %s", image, pOpts.Monitors, pOpts.Pool)
|
||||
args := []string{"snap", "rm", "--pool", pOpts.Pool, "--snap", snapName, image, "--id",
|
||||
adminID, "-m", pOpts.Monitors, "--key=" + key}
|
||||
|
||||
output, err = execCommand("rbd", args)
|
||||
|
||||
@ -578,26 +608,27 @@ func deleteSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]s
|
||||
return errors.Wrapf(err, "failed to delete snapshot, command output: %s", string(output))
|
||||
}
|
||||
|
||||
if err := unreserveSnap(pOpts, credentials); err != nil {
|
||||
klog.Errorf("failed to remove reservation for snapname (%s) with backing snap (%s) on image (%s) (%s)",
|
||||
pOpts.RequestName, pOpts.RbdSnapName, pOpts.RbdImageName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreSnapshot(pVolOpts *rbdVolume, pSnapOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
var output []byte
|
||||
|
||||
mon, err := getMon(pVolOpts, credentials)
|
||||
image := pVolOpts.RbdImageName
|
||||
snapName := pSnapOpts.RbdSnapName
|
||||
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
image := pVolOpts.VolName
|
||||
snapID := pSnapOpts.SnapID
|
||||
|
||||
key, err := getRBDKey(pVolOpts.ClusterID, adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("rbd: clone %s using mon %s, pool %s", image, mon, pVolOpts.Pool)
|
||||
args := []string{"clone", pSnapOpts.Pool + "/" + pSnapOpts.VolName + "@" + snapID, pVolOpts.Pool + "/" + image, "--id", adminID, "-m", mon, "--key=" + key}
|
||||
klog.V(4).Infof("rbd: clone %s using mon %s, pool %s", image, pVolOpts.Monitors, pVolOpts.Pool)
|
||||
args := []string{"clone", pSnapOpts.Pool + "/" + pSnapOpts.RbdImageName + "@" + snapName,
|
||||
pVolOpts.Pool + "/" + image, "--id", adminID, "-m", pVolOpts.Monitors, "--key=" + key}
|
||||
|
||||
output, err = execCommand("rbd", args)
|
||||
|
||||
@ -607,3 +638,135 @@ func restoreSnapshot(pVolOpts *rbdVolume, pSnapOpts *rbdSnapshot, adminID string
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getSnapshotMetadata fetches on-disk metadata about the snapshot and populates the passed in
|
||||
// rbdSnapshot structure
|
||||
func getSnapshotMetadata(pSnapOpts *rbdSnapshot, adminID string, credentials map[string]string) error {
|
||||
imageName := pSnapOpts.RbdImageName
|
||||
snapName := pSnapOpts.RbdSnapName
|
||||
|
||||
key, err := getKey(adminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapInfo, err := getSnapInfo(pSnapOpts.Monitors, adminID, key, pSnapOpts.Pool, imageName, snapName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pSnapOpts.SizeBytes = snapInfo.Size
|
||||
|
||||
tm, err := time.Parse(time.ANSIC, snapInfo.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pSnapOpts.CreatedAt, err = ptypes.TimestampProto(tm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// imageInfo strongly typed JSON spec for image info
|
||||
type imageInfo struct {
|
||||
ObjectUUID string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
Format int64 `json:"format"`
|
||||
Features []string `json:"features"`
|
||||
CreatedAt string `json:"create_timestamp"`
|
||||
}
|
||||
|
||||
// getImageInfo queries rbd about the given image and returns its metadata, and returns
|
||||
// ErrImageNotFound if provided image is not found
|
||||
func getImageInfo(monitors, adminID, key, poolName, imageName string) (imageInfo, error) {
|
||||
// rbd --format=json info [image-spec | snap-spec]
|
||||
|
||||
var imgInfo imageInfo
|
||||
|
||||
stdout, _, err := util.ExecCommand(
|
||||
"rbd",
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"-c", util.CephConfigPath,
|
||||
"--format="+"json",
|
||||
"info", poolName+"/"+imageName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed getting information for image (%s): (%s)", poolName+"/"+imageName, err)
|
||||
if strings.Contains(string(stdout), "rbd: error opening image "+imageName+
|
||||
": (2) No such file or directory") {
|
||||
return imgInfo, ErrImageNotFound{imageName, err}
|
||||
}
|
||||
return imgInfo, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(stdout, &imgInfo)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to parse JSON output of image info (%s): (%s)",
|
||||
poolName+"/"+imageName, err)
|
||||
return imgInfo, fmt.Errorf("unmarshal failed: %+v. raw buffer response: %s",
|
||||
err, string(stdout))
|
||||
}
|
||||
|
||||
return imgInfo, nil
|
||||
}
|
||||
|
||||
// snapInfo strongly typed JSON spec for snap ls rbd output
|
||||
type snapInfo struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
/*
|
||||
getSnapInfo queries rbd about the snapshots of the given image and returns its metadata, and
|
||||
returns ErrImageNotFound if provided image is not found, and ErrSnapNotFound if provided snap
|
||||
is not found in the images snapshot list
|
||||
*/
|
||||
func getSnapInfo(monitors, adminID, key, poolName, imageName, snapName string) (snapInfo, error) {
|
||||
// rbd --format=json snap ls [image-spec]
|
||||
|
||||
var (
|
||||
snpInfo snapInfo
|
||||
snaps []snapInfo
|
||||
)
|
||||
|
||||
stdout, _, err := util.ExecCommand(
|
||||
"rbd",
|
||||
"-m", monitors,
|
||||
"--id", adminID,
|
||||
"--key="+key,
|
||||
"-c", util.CephConfigPath,
|
||||
"--format="+"json",
|
||||
"snap", "ls", poolName+"/"+imageName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed getting snap (%s) information from image (%s): (%s)",
|
||||
snapName, poolName+"/"+imageName, err)
|
||||
if strings.Contains(string(stdout), "rbd: error opening image "+imageName+
|
||||
": (2) No such file or directory") {
|
||||
return snpInfo, ErrImageNotFound{imageName, err}
|
||||
}
|
||||
return snpInfo, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(stdout, &snaps)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to parse JSON output of image snap list (%s): (%s)",
|
||||
poolName+"/"+imageName, err)
|
||||
return snpInfo, fmt.Errorf("unmarshal failed: %+v. raw buffer response: %s",
|
||||
err, string(stdout))
|
||||
}
|
||||
|
||||
for _, snap := range snaps {
|
||||
if snap.Name == snapName {
|
||||
return snap, nil
|
||||
}
|
||||
}
|
||||
|
||||
return snpInfo, ErrSnapNotFound{snapName, fmt.Errorf("snap (%s) for image (%s) not found",
|
||||
snapName, poolName+"/"+imageName)}
|
||||
}
|
||||
|
554
pkg/rbd/voljournal.go
Normal file
554
pkg/rbd/voljournal.go
Normal file
@ -0,0 +1,554 @@
|
||||
/*
|
||||
Copyright 2019 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package rbd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
|
||||
"github.com/pborman/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
func validateNonEmptyField(field, fieldName, structName string) error {
|
||||
if field == "" {
|
||||
return fmt.Errorf("value '%s' in '%s' structure cannot be empty", fieldName, structName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateRbdSnap(rbdSnap *rbdSnapshot) error {
|
||||
if err := validateNonEmptyField(rbdSnap.RequestName, "RequestName", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdSnap.Monitors, "Monitors", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdSnap.AdminID, "AdminID", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdSnap.Pool, "Pool", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdSnap.RbdImageName, "RbdImageName", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdSnap.ClusterID, "ClusterID", "rbdSnapshot"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateRbdVol(rbdVol *rbdVolume) error {
|
||||
if err := validateNonEmptyField(rbdVol.RequestName, "RequestName", "rbdVolume"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdVol.Monitors, "Monitors", "rbdVolume"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdVol.AdminID, "AdminID", "rbdVolume"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdVol.Pool, "Pool", "rbdVolume"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := validateNonEmptyField(rbdVol.ClusterID, "ClusterID", "rbdVolume"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rbdVol.VolSize == 0 {
|
||||
return errors.New("value 'VolSize' in 'rbdVolume' structure cannot be 0")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
checkSnapExists, and its counterpart checkVolExists, function as checks to determine if passed
|
||||
in rbdSnapshot or rbdVolume exists on the backend.
|
||||
|
||||
**NOTE:** These functions manipulate the rados omaps that hold information regarding
|
||||
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
|
||||
respective CSI driver generated snapshot or volume name based locks are held, as otherwise racy
|
||||
access to these omaps may end up leaving them in an inconsistent state.
|
||||
|
||||
These functions need enough information about cluster and pool (ie, Monitors, Pool, IDs filled in)
|
||||
to operate. They further require that the RequestName element of the structure have a valid value
|
||||
to operate on and determine if the said RequestName already exists on the backend.
|
||||
|
||||
These functions populate the snapshot or the image name, its attributes and the CSI snapshot/volume
|
||||
ID for the same when succesful.
|
||||
|
||||
These functions also cleanup omap reservations that are stale. I.e when omap entries exist and
|
||||
backing images or snapshots are missing, or one of the omaps exist and the next is missing. This is
|
||||
because, the order of omap creation and deletion are inverse of each other, and protected by the
|
||||
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
|
||||
hence safe to garbage collect.
|
||||
*/
|
||||
func checkSnapExists(rbdSnap *rbdSnapshot, credentials map[string]string) (found bool, err error) {
|
||||
if err = validateRbdSnap(rbdSnap); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
key, err := getKey(rbdSnap.AdminID, credentials)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check if request name is already part of the snaps omap
|
||||
snapUUID, err := util.GetOMapValue(rbdSnap.Monitors, rbdSnap.AdminID,
|
||||
key, rbdSnap.Pool, csiSnapsDirectory, csiSnapNameKeyPrefix+rbdSnap.RequestName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for image to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
rbdSnap.RbdSnapName = rbdSnapNamePrefix + snapUUID
|
||||
|
||||
// TODO: use listomapvals to dump all keys instead of reading them one-by-one
|
||||
// check if the snapshot image omap is present
|
||||
savedSnapName, err := util.GetOMapValue(rbdSnap.Monitors, rbdSnap.AdminID,
|
||||
key, rbdSnap.Pool, rbdSnapOMapPrefix+snapUUID, rbdSnapCSISnapNameKey)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
err = unreserveSnap(rbdSnap, credentials)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check if snapshot image omap points back to the request name
|
||||
if savedSnapName != rbdSnap.RequestName {
|
||||
// NOTE: This should never be possible, hence no cleanup, but log error
|
||||
// and return, as cleanup may need to occur manually!
|
||||
return false, fmt.Errorf("internal state inconsistent, omap snap"+
|
||||
" names disagree, request name (%s) snap name (%s) image omap"+
|
||||
" snap name (%s)", rbdSnap.RequestName, rbdSnap.RbdSnapName, savedSnapName)
|
||||
}
|
||||
|
||||
// check if the snapshot source image omap is present
|
||||
savedVolName, err := util.GetOMapValue(rbdSnap.Monitors, rbdSnap.AdminID,
|
||||
key, rbdSnap.Pool, rbdSnapOMapPrefix+snapUUID, rbdSnapSourceImageKey)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
err = unreserveSnap(rbdSnap, credentials)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check if snapshot source image omap points back to the source volume passed in
|
||||
if savedVolName != rbdSnap.RbdImageName {
|
||||
// NOTE: This can happen if there is a snapname conflict, and we alerady have a snapshot
|
||||
// with the same name pointing to a different RBD image as the source
|
||||
err = fmt.Errorf("snapname points to different image, request name (%s)"+
|
||||
" image name (%s) image omap"+" volume name (%s)",
|
||||
rbdSnap.RequestName, rbdSnap.RbdImageName, savedVolName)
|
||||
return false, ErrSnapNameConflict{rbdSnap.RequestName, err}
|
||||
}
|
||||
|
||||
// Fetch on-disk image attributes
|
||||
err = updateSnapWithImageInfo(rbdSnap, credentials)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrSnapNotFound); ok {
|
||||
err = unreserveSnap(rbdSnap, credentials)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
// found a snapshot already available, process and return its information
|
||||
poolID, err := util.GetPoolID(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
vi := util.CSIIdentifier{
|
||||
PoolID: poolID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: rbdSnap.ClusterID,
|
||||
ObjectUUID: snapUUID,
|
||||
}
|
||||
rbdSnap.SnapID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Found existing snap (%s) with snap name (%s) for request (%s)",
|
||||
rbdSnap.SnapID, rbdSnap.RbdSnapName, rbdSnap.RequestName)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
/*
|
||||
Check comment on checkSnapExists, to understand how this function behaves
|
||||
|
||||
**NOTE:** These functions manipulate the rados omaps that hold information regarding
|
||||
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
|
||||
respective CSI snapshot or volume name based locks are held, as otherwise racy access to these
|
||||
omaps may end up leaving the omaps in an inconsistent state.
|
||||
*/
|
||||
func checkVolExists(rbdVol *rbdVolume, credentials map[string]string) (found bool, err error) {
|
||||
var vi util.CSIIdentifier
|
||||
|
||||
if err = validateRbdVol(rbdVol); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
key, err := getKey(rbdVol.AdminID, credentials)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check if request name is already part of the volumes omap
|
||||
imageUUID, err := util.GetOMapValue(rbdVol.Monitors, rbdVol.AdminID,
|
||||
key, rbdVol.Pool, csiVolsDirectory, csiVolNameKeyPrefix+rbdVol.RequestName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for image to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
rbdVol.RbdImageName = rbdImgNamePrefix + imageUUID
|
||||
|
||||
// check if the image omap is present
|
||||
savedVolName, err := util.GetOMapValue(rbdVol.Monitors, rbdVol.AdminID,
|
||||
key, rbdVol.Pool, rbdImageOMapPrefix+imageUUID, rbdImageCSIVolNameKey)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
err = unreserveVol(rbdVol, credentials)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// check if image omap points back to the request name
|
||||
if savedVolName != rbdVol.RequestName {
|
||||
// NOTE: This should never be possible, hence no cleanup, but log error
|
||||
// and return, as cleanup may need to occur manually!
|
||||
return false, fmt.Errorf("internal state inconsistent, omap volume"+
|
||||
" names disagree, request name (%s) image name (%s) image omap"+
|
||||
" volume name (%s)", rbdVol.RequestName, rbdVol.RbdImageName, savedVolName)
|
||||
}
|
||||
|
||||
// NOTE: Return volsize should be on-disk volsize, not request vol size, so
|
||||
// save it for size checks before fetching image data
|
||||
requestSize := rbdVol.VolSize
|
||||
// Fetch on-disk image attributes and compare against request
|
||||
err = updateVolWithImageInfo(rbdVol, credentials)
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrImageNotFound); ok {
|
||||
err = unreserveVol(rbdVol, credentials)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
// size checks
|
||||
if rbdVol.VolSize < requestSize {
|
||||
err = fmt.Errorf("image with the same name (%s) but with different size already exists",
|
||||
rbdVol.RbdImageName)
|
||||
return false, ErrVolNameConflict{rbdVol.RbdImageName, err}
|
||||
}
|
||||
// TODO: We should also ensure image features and format is the same
|
||||
|
||||
// found a volume already available, process and return it!
|
||||
poolID, err := util.GetPoolID(rbdVol.Monitors, rbdVol.AdminID, key, rbdVol.Pool)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
vi = util.CSIIdentifier{
|
||||
PoolID: poolID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: rbdVol.ClusterID,
|
||||
ObjectUUID: imageUUID,
|
||||
}
|
||||
rbdVol.VolID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Found existng volume (%s) with image name (%s) for request (%s)",
|
||||
rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
/*
|
||||
unreserveSnap and unreserveVol remove omaps associated with the snapshot and the image name,
|
||||
and also remove the corresponding request name key in the snaps or volumes omaps respectively.
|
||||
|
||||
This is performed within the request name lock, to ensure that requests with the same name do not
|
||||
manipulate the omap entries concurrently.
|
||||
*/
|
||||
func unreserveSnap(rbdSnap *rbdSnapshot, credentials map[string]string) error {
|
||||
key, err := getKey(rbdSnap.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// delete snap image omap (first, inverse of create order)
|
||||
snapUUID := strings.TrimPrefix(rbdSnap.RbdSnapName, rbdSnapNamePrefix)
|
||||
err = util.RemoveObject(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool, rbdSnapOMapPrefix+snapUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrObjectNotFound); !ok {
|
||||
klog.Errorf("failed removing oMap %s (%s)", rbdSnapOMapPrefix+snapUUID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete the request name omap key (last, inverse of create order)
|
||||
err = util.RemoveOMapKey(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool,
|
||||
csiSnapsDirectory, csiSnapNameKeyPrefix+rbdSnap.RequestName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing oMap key %s (%s)", csiSnapNameKeyPrefix+rbdSnap.RequestName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func unreserveVol(rbdVol *rbdVolume, credentials map[string]string) error {
|
||||
key, err := getKey(rbdVol.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// delete image omap (first, inverse of create order)
|
||||
imageUUID := strings.TrimPrefix(rbdVol.RbdImageName, rbdImgNamePrefix)
|
||||
err = util.RemoveObject(rbdVol.Monitors, rbdVol.AdminID, key, rbdVol.Pool, rbdImageOMapPrefix+imageUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrObjectNotFound); !ok {
|
||||
klog.Errorf("failed removing oMap %s (%s)", rbdImageOMapPrefix+imageUUID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete the request name omap key (last, inverse of create order)
|
||||
err = util.RemoveOMapKey(rbdVol.Monitors, rbdVol.AdminID, key, rbdVol.Pool,
|
||||
csiVolsDirectory, csiVolNameKeyPrefix+rbdVol.RequestName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed removing oMap key %s (%s)", csiVolNameKeyPrefix+rbdVol.RequestName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
|
||||
// It ensures generated omap name does not already exist and if conflicts are detected, a set
|
||||
// number of retires with newer uuids are attempted before returning an error
|
||||
func reserveOMapName(monitors, adminID, key, poolName, oMapNamePrefix string) (string, error) {
|
||||
var iterUUID string
|
||||
|
||||
maxAttempts := 5
|
||||
attempt := 1
|
||||
for attempt <= maxAttempts {
|
||||
// generate a uuid for the image name
|
||||
iterUUID = uuid.NewUUID().String()
|
||||
|
||||
err := util.CreateObject(monitors, adminID, key, poolName, oMapNamePrefix+iterUUID)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrObjectExists); ok {
|
||||
attempt++
|
||||
// try again with a different uuid, for maxAttempts tries
|
||||
klog.V(4).Infof("uuid (%s) conflict detected, retrying (attempt %d of %d)",
|
||||
iterUUID, attempt, maxAttempts)
|
||||
continue
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if attempt > maxAttempts {
|
||||
return "", errors.New("uuid conflicts exceeds retry threshold")
|
||||
}
|
||||
|
||||
return iterUUID, nil
|
||||
}
|
||||
|
||||
/*
|
||||
reserveSnap and reserveVol add respective entries to the volumes and snapshots omaps, post
|
||||
generating a target snapshot or image name for use. Further, these functions create the snapshot or
|
||||
image name omaps, to store back pointers to the CSI generated request names.
|
||||
|
||||
This is performed within the request name lock, to ensure that requests with the same name do not
|
||||
manipulate the omap entries concurrently.
|
||||
*/
|
||||
func reserveSnap(rbdSnap *rbdSnapshot, credentials map[string]string) error {
|
||||
var vi util.CSIIdentifier
|
||||
|
||||
key, err := getKey(rbdSnap.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
poolID, err := util.GetPoolID(rbdSnap.Monitors, rbdSnap.AdminID, key,
|
||||
rbdSnap.Pool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the snapUUID based omap first, to reserve the same and avoid conflicts
|
||||
// NOTE: If any service loss occurs post creation of the snap omap, and before
|
||||
// setting the omap key (rbdSnapCSISnapNameKey) to point back to the snaps omap, the
|
||||
// snap omap key will leak
|
||||
snapUUID, err := reserveOMapName(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool,
|
||||
rbdSnapOMapPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create request snapUUID key in csi snaps omap and store the uuid based
|
||||
// snap name into it
|
||||
err = util.SetOMapKeyValue(rbdSnap.Monitors, rbdSnap.AdminID, key,
|
||||
rbdSnap.Pool, csiSnapsDirectory, csiSnapNameKeyPrefix+rbdSnap.RequestName, snapUUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
klog.Warningf("reservation failed for volume: %s", rbdSnap.RequestName)
|
||||
errDefer := unreserveSnap(rbdSnap, credentials)
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of snapshot: %s (%v)",
|
||||
rbdSnap.RequestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create snap name based omap and store CSI request name key and source information
|
||||
err = util.SetOMapKeyValue(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool,
|
||||
rbdSnapOMapPrefix+snapUUID, rbdSnapCSISnapNameKey, rbdSnap.RequestName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = util.SetOMapKeyValue(rbdSnap.Monitors, rbdSnap.AdminID, key, rbdSnap.Pool,
|
||||
rbdSnapOMapPrefix+snapUUID, rbdSnapSourceImageKey, rbdSnap.RbdImageName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// generate the volume ID to return to the CO system
|
||||
vi = util.CSIIdentifier{
|
||||
PoolID: poolID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: rbdSnap.ClusterID,
|
||||
ObjectUUID: snapUUID,
|
||||
}
|
||||
rbdSnap.SnapID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rbdSnap.RbdSnapName = rbdSnapNamePrefix + snapUUID
|
||||
klog.V(4).Infof("Generated Volume ID (%s) and image name (%s) for request name (%s)",
|
||||
rbdSnap.SnapID, rbdSnap.RbdImageName, rbdSnap.RequestName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func reserveVol(rbdVol *rbdVolume, credentials map[string]string) error {
|
||||
var vi util.CSIIdentifier
|
||||
|
||||
key, err := getKey(rbdVol.AdminID, credentials)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
poolID, err := util.GetPoolID(rbdVol.Monitors, rbdVol.AdminID, key,
|
||||
rbdVol.Pool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the imageUUID based omap first, to reserve the same and avoid conflicts
|
||||
// NOTE: If any service loss occurs post creation of the image omap, and before
|
||||
// setting the omap key (rbdImageCSIVolNameKey) to point back to the volumes omap,
|
||||
// the image omap key will leak
|
||||
imageUUID, err := reserveOMapName(rbdVol.Monitors, rbdVol.AdminID, key, rbdVol.Pool, rbdImageOMapPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create request volName key in csi volumes omap and store the uuid based
|
||||
// image name into it
|
||||
err = util.SetOMapKeyValue(rbdVol.Monitors, rbdVol.AdminID, key,
|
||||
rbdVol.Pool, csiVolsDirectory, csiVolNameKeyPrefix+rbdVol.RequestName, imageUUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
klog.Warningf("reservation failed for volume: %s", rbdVol.RequestName)
|
||||
errDefer := unreserveVol(rbdVol, credentials)
|
||||
if errDefer != nil {
|
||||
klog.Warningf("failed undoing reservation of volume: %s (%v)",
|
||||
rbdVol.RequestName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create image name based omap and store CSI request volume name key and data
|
||||
err = util.SetOMapKeyValue(rbdVol.Monitors, rbdVol.AdminID, key, rbdVol.Pool,
|
||||
rbdImageOMapPrefix+imageUUID, rbdImageCSIVolNameKey, rbdVol.RequestName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// generate the volume ID to return to the CO system
|
||||
vi = util.CSIIdentifier{
|
||||
PoolID: poolID,
|
||||
EncodingVersion: volIDVersion,
|
||||
ClusterID: rbdVol.ClusterID,
|
||||
ObjectUUID: imageUUID,
|
||||
}
|
||||
rbdVol.VolID, err = vi.ComposeCSIID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rbdVol.RbdImageName = rbdImgNamePrefix + imageUUID
|
||||
klog.V(4).Infof("Generated Volume ID (%s) and image name (%s) for request name (%s)",
|
||||
rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName)
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user