internal: reformat long lines in internal/rbd package to 120 chars

We have many declarations and invocations..etc with long lines which are
very difficult to follow while doing code reading. This address the issues
in 'internal/rbd/*server.go' and 'internal/rbd/driver.go' files to restrict
the line length to 120 chars.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2021-06-25 17:09:42 +05:30 committed by mergify[bot]
parent 3dc8c5b516
commit e829308249
4 changed files with 193 additions and 50 deletions

View File

@ -53,7 +53,8 @@ type ControllerServer struct {
} }
func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error { func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
util.ErrorLog(ctx, "invalid create volume req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid create volume req: %v", protosanitizer.StripSecrets(req))
return err return err
} }
@ -91,7 +92,9 @@ func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.Crea
return nil return nil
} }
func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi.CreateVolumeRequest) (*rbdVolume, error) { func (cs *ControllerServer) parseVolCreateRequest(
ctx context.Context,
req *csi.CreateVolumeRequest) (*rbdVolume, error) {
// TODO (sbezverk) Last check for not exceeding total storage capacity // TODO (sbezverk) Last check for not exceeding total storage capacity
isMultiNode := false isMultiNode := false
@ -108,7 +111,9 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi.
// We want to fail early if the user is trying to create a RWX on a non-block type device // We want to fail early if the user is trying to create a RWX on a non-block type device
if isMultiNode && !isBlock { if isMultiNode && !isBlock {
return nil, status.Error(codes.InvalidArgument, "multi node access modes are only supported on rbd `block` type volumes") return nil, status.Error(
codes.InvalidArgument,
"multi node access modes are only supported on rbd `block` type volumes")
} }
if imageFeatures, ok := req.GetParameters()["imageFeatures"]; checkImageFeatures(imageFeatures, ok, true) { if imageFeatures, ok := req.GetParameters()["imageFeatures"]; checkImageFeatures(imageFeatures, ok, true) {
@ -203,12 +208,20 @@ func validateRequestedVolumeSize(rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnaps
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
if rbdVol.VolSize != vol.VolSize { if rbdVol.VolSize != vol.VolSize {
return status.Errorf(codes.InvalidArgument, "size mismatches, requested volume size %d and source snapshot size %d", rbdVol.VolSize, vol.VolSize) return status.Errorf(
codes.InvalidArgument,
"size mismatches, requested volume size %d and source snapshot size %d",
rbdVol.VolSize,
vol.VolSize)
} }
} }
if parentVol != nil { if parentVol != nil {
if rbdVol.VolSize != parentVol.VolSize { if rbdVol.VolSize != parentVol.VolSize {
return status.Errorf(codes.InvalidArgument, "size mismatches, requested volume size %d and source volume size %d", rbdVol.VolSize, parentVol.VolSize) return status.Errorf(
codes.InvalidArgument,
"size mismatches, requested volume size %d and source volume size %d",
rbdVol.VolSize,
parentVol.VolSize)
} }
} }
return nil return nil
@ -246,7 +259,9 @@ func checkValidCreateVolumeRequest(rbdVol, parentVol *rbdVolume, rbdSnap *rbdSna
} }
// CreateVolume creates the volume in backend. // CreateVolume creates the volume in backend.
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *ControllerServer) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateVolumeReq(ctx, req); err != nil { if err := cs.validateVolumeReq(ctx, req); err != nil {
return nil, err return nil, err
} }
@ -374,7 +389,11 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
if isThickProvisionRequest(req.GetParameters()) { if isThickProvisionRequest(req.GetParameters()) {
thick, err := rbdVol.isThickProvisioned() thick, err := rbdVol.isThickProvisioned()
if err != nil { if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to verify thick-provisioned volume %q: %s", rbdVol, err) return nil, status.Errorf(
codes.Aborted,
"failed to verify thick-provisioned volume %q: %s",
rbdVol,
err)
} else if !thick { } else if !thick {
err = deleteImage(ctx, rbdVol, cr) err = deleteImage(ctx, rbdVol, cr)
if err != nil { if err != nil {
@ -384,7 +403,9 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
if err != nil { if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to remove volume %q from journal: %s", rbdVol, err) return nil, status.Errorf(codes.Aborted, "failed to remove volume %q from journal: %s", rbdVol, err)
} }
return nil, status.Errorf(codes.Aborted, "restoring thick-provisioned volume %q has been interrupted, please retry", rbdVol) return nil, status.Errorf(
codes.Aborted,
"restoring thick-provisioned volume %q has been interrupted, please retry", rbdVol)
} }
} }
// restore from snapshot imploes rbdSnap != nil // restore from snapshot imploes rbdSnap != nil
@ -409,7 +430,11 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
if isThickProvisionRequest(req.GetParameters()) { if isThickProvisionRequest(req.GetParameters()) {
thick, err := rbdVol.isThickProvisioned() thick, err := rbdVol.isThickProvisioned()
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "failed to verify thick-provisioned volume %q: %s", rbdVol, err) return nil, status.Errorf(
codes.Internal,
"failed to verify thick-provisioned volume %q: %s",
rbdVol,
err)
} else if !thick { } else if !thick {
err = cleanUpSnapshot(ctx, parentVol, rbdSnap, rbdVol, cr) err = cleanUpSnapshot(ctx, parentVol, rbdSnap, rbdVol, cr)
if err != nil { if err != nil {
@ -419,7 +444,9 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "failed to remove volume %q from journal: %s", rbdVol, err) return nil, status.Errorf(codes.Internal, "failed to remove volume %q from journal: %s", rbdVol, err)
} }
return nil, status.Errorf(codes.Internal, "cloning thick-provisioned volume %q has been interrupted, please retry", rbdVol) return nil, status.Errorf(
codes.Internal,
"cloning thick-provisioned volume %q has been interrupted, please retry", rbdVol)
} }
} }
} }
@ -448,7 +475,12 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
} }
if len(snaps) > int(maxSnapshotsOnImage) { if len(snaps) > int(maxSnapshotsOnImage) {
util.DebugLog(ctx, "snapshots count %d on image: %s reached configured hard limit %d", len(snaps), rbdVol, maxSnapshotsOnImage) util.DebugLog(
ctx,
"snapshots count %d on image: %s reached configured hard limit %d",
len(snaps),
rbdVol,
maxSnapshotsOnImage)
err = flattenClonedRbdImages( err = flattenClonedRbdImages(
ctx, ctx,
snaps, snaps,
@ -463,7 +495,12 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
} }
if len(snaps) > int(minSnapshotsOnImageToStartFlatten) { if len(snaps) > int(minSnapshotsOnImageToStartFlatten) {
util.DebugLog(ctx, "snapshots count %d on image: %s reached configured soft limit %d", len(snaps), rbdVol, minSnapshotsOnImageToStartFlatten) util.DebugLog(
ctx,
"snapshots count %d on image: %s reached configured soft limit %d",
len(snaps),
rbdVol,
minSnapshotsOnImageToStartFlatten)
// If we start flattening all the snapshots at one shot the volume // If we start flattening all the snapshots at one shot the volume
// creation time will be affected,so we will flatten only the extra // creation time will be affected,so we will flatten only the extra
// snapshots. // snapshots.
@ -505,7 +542,12 @@ func checkFlatten(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials)
return nil return nil
} }
func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *util.Credentials, secrets map[string]string, rbdVol *rbdVolume, snapshotID string) error { func (cs *ControllerServer) createVolumeFromSnapshot(
ctx context.Context,
cr *util.Credentials,
secrets map[string]string,
rbdVol *rbdVolume,
snapshotID string) error {
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired { if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
util.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID) util.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID)
@ -550,7 +592,12 @@ func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *ut
return nil return nil
} }
func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, secrets map[string]string, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot) error { func (cs *ControllerServer) createBackingImage(
ctx context.Context,
cr *util.Credentials,
secrets map[string]string,
rbdVol, parentVol *rbdVolume,
rbdSnap *rbdSnapshot) error {
var err error var err error
var j = &journal.Connection{} var j = &journal.Connection{}
@ -614,7 +661,10 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
return nil return nil
} }
func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *util.Credentials) (*rbdVolume, *rbdSnapshot, error) { func checkContentSource(
ctx context.Context,
req *csi.CreateVolumeRequest,
cr *util.Credentials) (*rbdVolume, *rbdSnapshot, error) {
if req.VolumeContentSource == nil { if req.VolumeContentSource == nil {
return nil, nil, nil return nil, nil, nil
} }
@ -663,8 +713,11 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u
// DeleteVolume deletes the volume in backend and removes the volume metadata // DeleteVolume deletes the volume in backend and removes the volume metadata
// from store // from store
// TODO: make this function less complex. // TODO: make this function less complex.
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { func (cs *ControllerServer) DeleteVolume(
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { ctx context.Context,
req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
util.ErrorLog(ctx, "invalid delete volume req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid delete volume req: %v", protosanitizer.StripSecrets(req))
return nil, err return nil, err
} }
@ -781,7 +834,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// ValidateVolumeCapabilities checks whether the volume capabilities requested // ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported. // are supported.
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { func (cs *ControllerServer) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if req.GetVolumeId() == "" { if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -803,7 +858,9 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
} }
// CreateSnapshot creates the snapshot in backend and stores metadata in store. // CreateSnapshot creates the snapshot in backend and stores metadata in store.
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { func (cs *ControllerServer) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.validateSnapshotReq(ctx, req); err != nil { if err := cs.validateSnapshotReq(ctx, req); err != nil {
return nil, err return nil, err
} }
@ -833,7 +890,10 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// Check if source volume was created with required image features for snaps // Check if source volume was created with required image features for snaps
if !rbdVol.hasSnapshotFeature() { if !rbdVol.hasSnapshotFeature() {
return nil, status.Errorf(codes.InvalidArgument, "volume(%s) has not snapshot feature(layering)", req.GetSourceVolumeId()) return nil, status.Errorf(
codes.InvalidArgument,
"volume(%s) has not snapshot feature(layering)",
req.GetSourceVolumeId())
} }
rbdSnap, err := genSnapFromOptions(ctx, rbdVol, req.GetParameters()) rbdSnap, err := genSnapFromOptions(ctx, rbdVol, req.GetParameters())
@ -910,7 +970,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// cloneFromSnapshot is a helper for CreateSnapshot that continues creating an // cloneFromSnapshot is a helper for CreateSnapshot that continues creating an
// RBD image from an RBD snapshot if the process was interrupted at one point. // RBD image from an RBD snapshot if the process was interrupted at one point.
func cloneFromSnapshot(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (*csi.CreateSnapshotResponse, error) { func cloneFromSnapshot(
ctx context.Context,
rbdVol *rbdVolume,
rbdSnap *rbdSnapshot,
cr *util.Credentials) (*csi.CreateSnapshotResponse, error) {
vol := generateVolFromSnap(rbdSnap) vol := generateVolFromSnap(rbdSnap)
err := vol.Connect(cr) err := vol.Connect(cr)
if err != nil { if err != nil {
@ -981,7 +1045,8 @@ func cloneFromSnapshot(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnaps
} }
func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error { func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
util.ErrorLog(ctx, "invalid create snapshot req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid create snapshot req: %v", protosanitizer.StripSecrets(req))
return err return err
} }
@ -1004,7 +1069,11 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr
return nil return nil
} }
func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (bool, *rbdVolume, error) { func (cs *ControllerServer) doSnapshotClone(
ctx context.Context,
parentVol *rbdVolume,
rbdSnap *rbdSnapshot,
cr *util.Credentials) (bool, *rbdVolume, error) {
// generate cloned volume details from snapshot // generate cloned volume details from snapshot
cloneRbd := generateVolFromSnap(rbdSnap) cloneRbd := generateVolFromSnap(rbdSnap)
defer cloneRbd.Destroy() defer cloneRbd.Destroy()
@ -1104,8 +1173,11 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV
// DeleteSnapshot deletes the snapshot in backend and removes the // DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store. // snapshot metadata from store.
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { func (cs *ControllerServer) DeleteSnapshot(
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { ctx context.Context,
req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
util.ErrorLog(ctx, "invalid delete snapshot req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid delete snapshot req: %v", protosanitizer.StripSecrets(req))
return nil, err return nil, err
} }
@ -1199,7 +1271,9 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
} }
// ControllerExpandVolume expand RBD Volumes on demand based on resizer request. // ControllerExpandVolume expand RBD Volumes on demand based on resizer request.
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { func (cs *ControllerServer) ControllerExpandVolume(
ctx context.Context,
req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
util.ErrorLog(ctx, "invalid expand volume req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid expand volume req: %v", protosanitizer.StripSecrets(req))
return nil, err return nil, err

View File

@ -50,10 +50,12 @@ var (
// VolumeName to backing RBD images // VolumeName to backing RBD images
volJournal *journal.Config volJournal *journal.Config
snapJournal *journal.Config snapJournal *journal.Config
// rbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before a flatten occurs // rbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before a flatten
// occurs
rbdHardMaxCloneDepth uint rbdHardMaxCloneDepth uint
// rbdSoftMaxCloneDepth is the soft limit for maximum number of nested volume clones that are taken before a flatten occurs // rbdSoftMaxCloneDepth is the soft limit for maximum number of nested volume clones that are taken before a flatten
// occurs
rbdSoftMaxCloneDepth uint rbdSoftMaxCloneDepth uint
maxSnapshotsOnImage uint maxSnapshotsOnImage uint
minSnapshotsOnImageToStartFlatten uint minSnapshotsOnImageToStartFlatten uint
@ -134,7 +136,8 @@ func (r *Driver) Run(conf *util.Config) {
csi.ControllerServiceCapability_RPC_CLONE_VOLUME, csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
}) })
// We only support the multi-writer option when using block, but it's a supported capability for the plugin in general // We only support the multi-writer option when using block, but it's a supported capability for the plugin in
// general
// In addition, we want to add the remaining modes like MULTI_NODE_READER_ONLY, // In addition, we want to add the remaining modes like MULTI_NODE_READER_ONLY,
// MULTI_NODE_SINGLE_WRITER etc, but need to do some verification of RO modes first // MULTI_NODE_SINGLE_WRITER etc, but need to do some verification of RO modes first
// will work those as follow up features // will work those as follow up features

View File

@ -31,7 +31,9 @@ type IdentityServer struct {
} }
// GetPluginCapabilities returns available capabilities of the rbd driver. // GetPluginCapabilities returns available capabilities of the rbd driver.
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { func (is *IdentityServer) GetPluginCapabilities(
ctx context.Context,
req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{ return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{ Capabilities: []*csi.PluginCapability{
{ {

View File

@ -111,7 +111,9 @@ var (
// - Create the staging file/directory under staging path // - Create the staging file/directory under staging path
// - Stage the device (mount the device mapped for image) // - Stage the device (mount the device mapped for image)
// TODO: make this function less complex. // TODO: make this function less complex.
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { func (ns *NodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if err := util.ValidateNodeStageVolumeRequest(req); err != nil { if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err return nil, err
} }
@ -121,8 +123,16 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes // MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
if !isBlock { if !isBlock {
util.WarningLog(ctx, "MULTI_NODE_MULTI_WRITER currently only supported with volumes of access type `block`, invalid AccessMode for volume: %v", req.GetVolumeId()) util.WarningLog(
return nil, status.Error(codes.InvalidArgument, "rbd: RWX access mode request is only valid for volumes with access type `block`") ctx,
"MULTI_NODE_MULTI_WRITER currently only supported with volumes of access type `block`,"+
"invalid AccessMode for volume: %v",
req.GetVolumeId(),
)
return nil, status.Error(
codes.InvalidArgument,
"rbd: RWX access mode request is only valid for volumes with access type `block`",
)
} }
disableInUseChecks = true disableInUseChecks = true
@ -231,12 +241,20 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
util.DebugLog(ctx, "rbd: successfully mounted volume %s to stagingTargetPath %s", req.GetVolumeId(), stagingTargetPath) util.DebugLog(
ctx,
"rbd: successfully mounted volume %s to stagingTargetPath %s",
req.GetVolumeId(),
stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVolumeRequest, volOptions *rbdVolume, staticVol bool) (stageTransaction, error) { func (ns *NodeServer) stageTransaction(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
volOptions *rbdVolume,
staticVol bool) (stageTransaction, error) {
transaction := stageTransaction{} transaction := stageTransaction{}
var err error var err error
@ -330,7 +348,11 @@ func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVo
return transaction, err return transaction, err
} }
func (ns *NodeServer) undoStagingTransaction(ctx context.Context, req *csi.NodeStageVolumeRequest, transaction stageTransaction, volOptions *rbdVolume) { func (ns *NodeServer) undoStagingTransaction(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
transaction stageTransaction,
volOptions *rbdVolume) {
var err error var err error
stagingTargetPath := getStagingTargetPath(req) stagingTargetPath := getStagingTargetPath(req)
@ -347,7 +369,8 @@ func (ns *NodeServer) undoStagingTransaction(ctx context.Context, req *csi.NodeS
err = os.Remove(stagingTargetPath) err = os.Remove(stagingTargetPath)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed to remove stagingtargetPath: %s with error: %v", stagingTargetPath, err) util.ErrorLog(ctx, "failed to remove stagingtargetPath: %s with error: %v", stagingTargetPath, err)
// continue on failure to unmap the image, as leaving stale images causes more issues than a stale file/directory // continue on failure to unmap the image, as leaving stale images causes more issues than a stale
// file/directory
} }
} }
@ -357,8 +380,14 @@ func (ns *NodeServer) undoStagingTransaction(ctx context.Context, req *csi.NodeS
if transaction.devicePath != "" { if transaction.devicePath != "" {
err = detachRBDDevice(ctx, transaction.devicePath, volID, volOptions.UnmapOptions, transaction.isEncrypted) err = detachRBDDevice(ctx, transaction.devicePath, volID, volOptions.UnmapOptions, transaction.isEncrypted)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed to unmap rbd device: %s for volume %s with error: %v", transaction.devicePath, volID, err) util.ErrorLog(
// continue on failure to delete the stash file, as kubernetes will fail to delete the staging path otherwise ctx,
"failed to unmap rbd device: %s for volume %s with error: %v",
transaction.devicePath,
volID,
err)
// continue on failure to delete the stash file, as kubernetes will fail to delete the staging path
// otherwise
} }
} }
@ -398,7 +427,9 @@ func (ns *NodeServer) createStageMountPoint(ctx context.Context, mountPath strin
// NodePublishVolume mounts the volume mounted to the device path to the target // NodePublishVolume mounts the volume mounted to the device path to the target
// path. // path.
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { func (ns *NodeServer) NodePublishVolume(
ctx context.Context,
req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
err := util.ValidateNodePublishVolumeRequest(req) err := util.ValidateNodePublishVolumeRequest(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -435,7 +466,11 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil return &csi.NodePublishVolumeResponse{}, nil
} }
func (ns *NodeServer) mountVolumeToStagePath(ctx context.Context, req *csi.NodeStageVolumeRequest, staticVol bool, stagingPath, devicePath string) (bool, error) { func (ns *NodeServer) mountVolumeToStagePath(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
staticVol bool,
stagingPath, devicePath string) (bool, error) {
readOnly := false readOnly := false
fsType := req.GetVolumeCapability().GetMount().GetFsType() fsType := req.GetVolumeCapability().GetMount().GetFsType()
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()} diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
@ -568,7 +603,9 @@ func (ns *NodeServer) createTargetMountPath(ctx context.Context, mountPath strin
} }
// NodeUnpublishVolume unmounts the volume from the target path. // NodeUnpublishVolume unmounts the volume from the target path.
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { func (ns *NodeServer) NodeUnpublishVolume(
ctx context.Context,
req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
err := util.ValidateNodeUnpublishVolumeRequest(req) err := util.ValidateNodeUnpublishVolumeRequest(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -626,7 +663,9 @@ func getStagingTargetPath(req interface{}) string {
} }
// NodeUnstageVolume unstages the volume from the staging path. // NodeUnstageVolume unstages the volume from the staging path.
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { func (ns *NodeServer) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
var err error var err error
if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil { if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil {
return nil, err return nil, err
@ -693,8 +732,19 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// Unmapping rbd device // Unmapping rbd device
imageSpec := imgInfo.String() imageSpec := imgInfo.String()
if err = detachRBDImageOrDeviceSpec(ctx, imageSpec, true, imgInfo.NbdAccess, imgInfo.Encrypted, req.GetVolumeId(), imgInfo.UnmapOptions); err != nil { if err = detachRBDImageOrDeviceSpec(
util.ErrorLog(ctx, "error unmapping volume (%s) from staging path (%s): (%v)", req.GetVolumeId(), stagingTargetPath, err) ctx, imageSpec,
true,
imgInfo.NbdAccess,
imgInfo.Encrypted,
req.GetVolumeId(),
imgInfo.UnmapOptions); err != nil {
util.ErrorLog(
ctx,
"error unmapping volume (%s) from staging path (%s): (%v)",
req.GetVolumeId(),
stagingTargetPath,
err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -709,7 +759,9 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
} }
// NodeExpandVolume resizes rbd volumes. // NodeExpandVolume resizes rbd volumes.
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { func (ns *NodeServer) NodeExpandVolume(
ctx context.Context,
req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId() volumeID := req.GetVolumeId()
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID must be provided") return nil, status.Error(codes.InvalidArgument, "volume ID must be provided")
@ -755,7 +807,12 @@ func getDevicePath(ctx context.Context, volumePath string) (string, error) {
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed to find image metadata: %v", err) util.ErrorLog(ctx, "failed to find image metadata: %v", err)
} }
device, found := findDeviceMappingImage(ctx, imgInfo.Pool, imgInfo.RadosNamespace, imgInfo.ImageName, imgInfo.NbdAccess) device, found := findDeviceMappingImage(
ctx,
imgInfo.Pool,
imgInfo.RadosNamespace,
imgInfo.ImageName,
imgInfo.NbdAccess)
if found { if found {
return device, nil return device, nil
} }
@ -763,7 +820,9 @@ func getDevicePath(ctx context.Context, volumePath string) (string, error) {
} }
// NodeGetCapabilities returns the supported capabilities of the node server. // NodeGetCapabilities returns the supported capabilities of the node server.
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { func (ns *NodeServer) NodeGetCapabilities(
ctx context.Context,
req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{ return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{ Capabilities: []*csi.NodeServiceCapability{
{ {
@ -791,7 +850,10 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
}, nil }, nil
} }
func (ns *NodeServer) processEncryptedDevice(ctx context.Context, volOptions *rbdVolume, devicePath string) (string, error) { func (ns *NodeServer) processEncryptedDevice(
ctx context.Context,
volOptions *rbdVolume,
devicePath string) (string, error) {
imageSpec := volOptions.String() imageSpec := volOptions.String()
encrypted, err := volOptions.checkRbdImageEncrypted(ctx) encrypted, err := volOptions.checkRbdImageEncrypted(ctx)
if err != nil { if err != nil {
@ -881,7 +943,9 @@ func (ns *NodeServer) xfsSupportsReflink() bool {
} }
// NodeGetVolumeStats returns volume stats. // NodeGetVolumeStats returns volume stats.
func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { func (ns *NodeServer) NodeGetVolumeStats(
ctx context.Context,
req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
var err error var err error
targetPath := req.GetVolumePath() targetPath := req.GetVolumePath()
if targetPath == "" { if targetPath == "" {