diff --git a/cephfs/main.go b/cephfs/main.go index 7d53f5515..3d54b87c4 100644 --- a/cephfs/main.go +++ b/cephfs/main.go @@ -33,7 +33,7 @@ func init() { var ( endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver") - nodeId = flag.String("nodeid", "", "node id") + nodeID = flag.String("nodeid", "", "node id") volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]") ) @@ -57,8 +57,8 @@ func main() { os.Exit(1) } - driver := cephfs.NewCephFSDriver() - driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter, cp) + driver := cephfs.NewDriver() + driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp) os.Exit(0) } diff --git a/pkg/cephfs/cephconf.go b/pkg/cephfs/cephconf.go index 44df828f9..9277e63a4 100644 --- a/pkg/cephfs/cephconf.go +++ b/pkg/cephfs/cephconf.go @@ -33,7 +33,7 @@ auth_client_required = cephx fuse_set_user_groups = false ` -const cephKeyring = `[client.{{.UserId}}] +const cephKeyring = `[client.{{.UserID}}] key = {{.Key}} ` @@ -68,10 +68,6 @@ func init() { cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret)) } -type cephConfigWriter interface { - writeToFile() error -} - type cephConfigData struct { Monitors string VolumeID volumeID @@ -100,31 +96,31 @@ func (d *cephConfigData) writeToFile() error { } type cephKeyringData struct { - UserId, Key string + UserID, Key string VolumeID volumeID } func (d *cephKeyringData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeID, d.UserId), 0600, cephKeyringTempl, d) + return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeID, d.UserID), 0600, cephKeyringTempl, d) } type cephSecretData struct { - UserId, Key string + UserID, Key string VolumeID volumeID } func (d *cephSecretData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeID, d.UserId), 0600, cephSecretTempl, d) + return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeID, d.UserID), 0600, cephSecretTempl, d) } -func getCephSecretPath(volId volumeID, userId string) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volId, userId)) +func getCephSecretPath(volID volumeID, userID string) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volID, userID)) } -func getCephKeyringPath(volId volumeID, userId string) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volId, userId)) +func getCephKeyringPath(volID volumeID, userID string) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volID, userID)) } -func getCephConfPath(volId volumeID) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volId)) +func getCephConfPath(volID volumeID) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volID)) } diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index 57201cc6e..48675e054 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -47,16 +47,16 @@ func (ent *cephEntity) toCredentials() *credentials { } } -func getCephUserName(volId volumeID) string { - return cephUserPrefix + string(volId) +func getCephUserName(volID volumeID) string { + return cephUserPrefix + string(volID) } -func getCephUser(adminCr *credentials, volId volumeID) (*cephEntity, error) { - entityName := cephEntityClientPrefix + getCephUserName(volId) +func getCephUser(adminCr *credentials, volID volumeID) (*cephEntity, error) { + entityName := cephEntityClientPrefix + getCephUserName(volID) var ents []cephEntity args := [...]string{ - "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, + "auth", "-f", "json", "-c", getCephConfPath(volID), "-n", cephEntityClientPrefix + adminCr.id, "get", entityName, } @@ -69,7 +69,7 @@ func getCephUser(adminCr *credentials, volId volumeID) (*cephEntity, error) { // Contains non-json data: "exported keyring for ENTITY\n\n" offset := bytes.Index(out, []byte("[{")) - if json.NewDecoder(bytes.NewReader(out[offset:])).Decode(&ents); err != nil { + if err = json.NewDecoder(bytes.NewReader(out[offset:])).Decode(&ents); err != nil { return nil, fmt.Errorf("failed to decode json: %v", err) } @@ -80,43 +80,43 @@ func getCephUser(adminCr *credentials, volId volumeID) (*cephEntity, error) { return &ents[0], nil } -func createCephUser(volOptions *volumeOptions, adminCr *credentials, volId volumeID) (*cephEntity, error) { +func createCephUser(volOptions *volumeOptions, adminCr *credentials, volID volumeID) (*cephEntity, error) { caps := cephEntityCaps{ - Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volId)), + Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volID)), Mon: "allow r", - Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volId)), + Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)), } var ents []cephEntity args := [...]string{ - "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, - "get-or-create", cephEntityClientPrefix + getCephUserName(volId), + "auth", "-f", "json", "-c", getCephConfPath(volID), "-n", cephEntityClientPrefix + adminCr.id, + "get-or-create", cephEntityClientPrefix + getCephUserName(volID), "mds", caps.Mds, "mon", caps.Mon, "osd", caps.Osd, } - if err := execCommandJson(&ents, "ceph", args[:]...); err != nil { + if err := execCommandJSON(&ents, args[:]...); err != nil { return nil, fmt.Errorf("error creating ceph user: %v", err) } return &ents[0], nil } -func deleteCephUser(adminCr *credentials, volId volumeID) error { - userId := getCephUserName(volId) +func deleteCephUser(adminCr *credentials, volID volumeID) error { + userID := getCephUserName(volID) args := [...]string{ - "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, - "auth", "rm", cephEntityClientPrefix + userId, + "-c", getCephConfPath(volID), "-n", cephEntityClientPrefix + adminCr.id, + "auth", "rm", cephEntityClientPrefix + userID, } if err := execCommandAndValidate("ceph", args[:]...); err != nil { return err } - os.Remove(getCephKeyringPath(volId, userId)) - os.Remove(getCephSecretPath(volId, userId)) + os.Remove(getCephKeyringPath(volID, userID)) + os.Remove(getCephSecretPath(volID, userID)) return nil } diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 7d775ada3..0bb027f31 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -28,7 +28,7 @@ import ( "github.com/ceph/ceph-csi/pkg/util" ) -type controllerServer struct { +type ControllerServer struct { *csicommon.DefaultControllerServer MetadataStore util.CachePersister } @@ -38,7 +38,7 @@ type controllerCacheEntry struct { VolumeID volumeID } -func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { glog.Errorf("CreateVolumeRequest validation failed: %v", err) return nil, err @@ -51,10 +51,10 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - volId := makeVolumeID(req.GetName()) - conf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId} + volID := makeVolumeID(req.GetName()) + conf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volID} if err = conf.writeToFile(); err != nil { - glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId), err) + glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volID), err) return nil, status.Error(codes.Internal, err.Error()) } @@ -67,61 +67,61 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - if err = storeCephCredentials(volId, cr); err != nil { + if err = storeCephCredentials(volID, cr); err != nil { glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = createVolume(volOptions, cr, volId, req.GetCapacityRange().GetRequiredBytes()); err != nil { + if err = createVolume(volOptions, cr, volID, req.GetCapacityRange().GetRequiredBytes()); err != nil { glog.Errorf("failed to create volume %s: %v", req.GetName(), err) return nil, status.Error(codes.Internal, err.Error()) } - if _, err = createCephUser(volOptions, cr, volId); err != nil { + if _, err = createCephUser(volOptions, cr, volID); err != nil { glog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err) return nil, status.Error(codes.Internal, err.Error()) } - glog.Infof("cephfs: successfully created volume %s", volId) + glog.Infof("cephfs: successfully created volume %s", volID) } else { - glog.Infof("cephfs: volume %s is provisioned statically", volId) + glog.Infof("cephfs: volume %s is provisioned statically", volID) } - ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId} - if err := cs.MetadataStore.Create(string(volId), ce); err != nil { - glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err) + ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volID} + if err := cs.MetadataStore.Create(string(volID), ce); err != nil { + glog.Errorf("failed to store a cache entry for volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: string(volId), + VolumeId: string(volID), CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), VolumeContext: req.GetParameters(), }, }, nil } -func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - if err := cs.validateDeleteVolumeRequest(req); err != nil { +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + if err := cs.validateDeleteVolumeRequest(); err != nil { glog.Errorf("DeleteVolumeRequest validation failed: %v", err) return nil, err } var ( - volId = volumeID(req.GetVolumeId()) + volID = volumeID(req.GetVolumeId()) err error ) ce := &controllerCacheEntry{} - if err := cs.MetadataStore.Get(string(volId), ce); err != nil { + if err = cs.MetadataStore.Get(string(volID), ce); err != nil { return nil, status.Error(codes.Internal, err.Error()) } if !ce.VolOptions.ProvisionVolume { // DeleteVolume() is forbidden for statically provisioned volumes! - glog.Warningf("volume %s is provisioned statically, aborting delete", volId) + glog.Warningf("volume %s is provisioned statically, aborting delete", volID) return &csi.DeleteVolumeResponse{}, nil } // mons may have changed since create volume, @@ -140,26 +140,26 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - if err = purgeVolume(volId, cr, &ce.VolOptions); err != nil { - glog.Errorf("failed to delete volume %s: %v", volId, err) + if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { + glog.Errorf("failed to delete volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = deleteCephUser(cr, volId); err != nil { - glog.Errorf("failed to delete ceph user for volume %s: %v", volId, err) + if err = deleteCephUser(cr, volID); err != nil { + glog.Errorf("failed to delete ceph user for volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } - if err := cs.MetadataStore.Delete(string(volId)); err != nil { + if err = cs.MetadataStore.Delete(string(volID)); err != nil { return nil, status.Error(codes.Internal, err.Error()) } - glog.Infof("cephfs: successfully deleted volume %s", volId) + glog.Infof("cephfs: successfully deleted volume %s", volID) return &csi.DeleteVolumeResponse{}, nil } -func (cs *controllerServer) ValidateVolumeCapabilities( +func (cs *ControllerServer) ValidateVolumeCapabilities( ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { // Cephfs doesn't support Block volume diff --git a/pkg/cephfs/credentials.go b/pkg/cephfs/credentials.go index 343e9a12d..0cf866ce5 100644 --- a/pkg/cephfs/credentials.go +++ b/pkg/cephfs/credentials.go @@ -19,9 +19,9 @@ package cephfs import "fmt" const ( - credUserId = "userID" + credUserID = "userID" credUserKey = "userKey" - credAdminId = "adminID" + credAdminID = "adminID" credAdminKey = "adminKey" credMonitors = "monitors" ) @@ -49,11 +49,11 @@ func getCredentials(idField, keyField string, secrets map[string]string) (*crede } func getUserCredentials(secrets map[string]string) (*credentials, error) { - return getCredentials(credUserId, credUserKey, secrets) + return getCredentials(credUserID, credUserKey, secrets) } func getAdminCredentials(secrets map[string]string) (*credentials, error) { - return getCredentials(credAdminId, credAdminKey, secrets) + return getCredentials(credAdminID, credAdminKey, secrets) } func getMonValFromSecret(secrets map[string]string) (string, error) { diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index a4e39005e..ed98ec44b 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -30,46 +30,42 @@ const ( Version = "1.0.0" ) -type cephfsDriver struct { - driver *csicommon.CSIDriver +type Driver struct { + cd *csicommon.CSIDriver - is *identityServer - ns *nodeServer - cs *controllerServer - - caps []*csi.VolumeCapability_AccessMode - cscaps []*csi.ControllerServiceCapability + is *IdentityServer + ns *NodeServer + cs *ControllerServer } var ( - driver *cephfsDriver DefaultVolumeMounter string ) -func NewCephFSDriver() *cephfsDriver { - return &cephfsDriver{} +func NewDriver() *Driver { + return &Driver{} } -func NewIdentityServer(d *csicommon.CSIDriver) *identityServer { - return &identityServer{ +func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { + return &IdentityServer{ DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d), } } -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer { - return &controllerServer{ +func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { + return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), MetadataStore: cachePersister, } } -func NewNodeServer(d *csicommon.CSIDriver) *nodeServer { - return &nodeServer{ +func NewNodeServer(d *csicommon.CSIDriver) *NodeServer { + return &NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d), } } -func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string, cachePersister util.CachePersister) { +func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) { glog.Infof("Driver: %v version: %v", driverName, Version) // Configuration @@ -95,25 +91,25 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string, // Initialize default library driver - fs.driver = csicommon.NewCSIDriver(driverName, Version, nodeId) - if fs.driver == nil { + fs.cd = csicommon.NewCSIDriver(driverName, Version, nodeID) + if fs.cd == nil { glog.Fatalln("Failed to initialize CSI driver") } - fs.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + fs.cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, }) - fs.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ + fs.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, }) // Create gRPC servers - fs.is = NewIdentityServer(fs.driver) - fs.ns = NewNodeServer(fs.driver) + fs.is = NewIdentityServer(fs.cd) + fs.ns = NewNodeServer(fs.cd) - fs.cs = NewControllerServer(fs.driver, cachePersister) + fs.cs = NewControllerServer(fs.cd, cachePersister) server := csicommon.NewNonBlockingGRPCServer() server.Start(endpoint, fs.is, fs.cs, fs.ns) diff --git a/pkg/cephfs/identityserver.go b/pkg/cephfs/identityserver.go index c506c9299..5f0a60e76 100644 --- a/pkg/cephfs/identityserver.go +++ b/pkg/cephfs/identityserver.go @@ -23,11 +23,11 @@ import ( "github.com/kubernetes-csi/drivers/pkg/csi-common" ) -type identityServer struct { +type IdentityServer struct { *csicommon.DefaultIdentityServer } -func (is *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { +func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { return &csi.GetPluginCapabilitiesResponse{ Capabilities: []*csi.PluginCapability{ { diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index e33f38f8b..4b6309169 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -29,14 +29,13 @@ import ( "github.com/kubernetes-csi/drivers/pkg/csi-common" ) -type nodeServer struct { +type NodeServer struct { *csicommon.DefaultNodeServer } -func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { +func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( userCr *credentials - err error ) secret := req.GetSecrets() if volOptions.ProvisionVolume { @@ -49,13 +48,13 @@ func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err) } - if err = storeCephCredentials(volId, adminCr); err != nil { + if err = storeCephCredentials(volID, adminCr); err != nil { return nil, fmt.Errorf("failed to store ceph admin credentials: %v", err) } // Then get the ceph user - entity, err := getCephUser(adminCr, volId) + entity, err := getCephUser(adminCr, volID) if err != nil { return nil, fmt.Errorf("failed to get ceph user: %v", err) } @@ -64,20 +63,21 @@ func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi } else { // The volume is pre-made, credentials are in node stage secrets - userCr, err = getUserCredentials(secret) + uCr, err := getUserCredentials(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %v", err) } + userCr = uCr } - if err = storeCephCredentials(volId, userCr); err != nil { + if err := storeCephCredentials(volID, userCr); err != nil { return nil, fmt.Errorf("failed to store ceph user credentials: %v", err) } return userCr, nil } -func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { if err := validateNodeStageVolumeRequest(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -85,28 +85,28 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // Configuration stagingTargetPath := req.GetStagingTargetPath() - volId := volumeID(req.GetVolumeId()) + volID := volumeID(req.GetVolumeId()) secret := req.GetSecrets() volOptions, err := newVolumeOptions(req.GetVolumeContext(), secret) if err != nil { - glog.Errorf("error reading volume options for volume %s: %v", volId, err) + glog.Errorf("error reading volume options for volume %s: %v", volID, err) return nil, status.Error(codes.InvalidArgument, err.Error()) } if volOptions.ProvisionVolume { // Dynamically provisioned volumes don't have their root path set, do it here - volOptions.RootPath = getVolumeRootPathCeph(volId) + volOptions.RootPath = getVolumeRootPathCeph(volID) } if err = createMountPoint(stagingTargetPath); err != nil { - glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) + glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volID, err) return nil, status.Error(codes.Internal, err.Error()) } - cephConf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId} + cephConf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volID} if err = cephConf.writeToFile(); err != nil { - glog.Errorf("failed to write ceph config file to %s for volume %s: %v", getCephConfPath(volId), volId, err) + glog.Errorf("failed to write ceph config file to %s for volume %s: %v", getCephConfPath(volID), volID, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -120,36 +120,36 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } if isMnt { - glog.Infof("cephfs: volume %s is already mounted to %s, skipping", volId, stagingTargetPath) + glog.Infof("cephfs: volume %s is already mounted to %s, skipping", volID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } // It's not, mount now - cr, err := getCredentialsForVolume(volOptions, volId, req) + cr, err := getCredentialsForVolume(volOptions, volID, req) if err != nil { - glog.Errorf("failed to get ceph credentials for volume %s: %v", volId, err) + glog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } m, err := newMounter(volOptions) if err != nil { - glog.Errorf("failed to create mounter for volume %s: %v", volId, err) + glog.Errorf("failed to create mounter for volume %s: %v", volID, err) } - glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) + glog.V(4).Infof("cephfs: mounting volume %s with %s", volID, m.name()) - if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { - glog.Errorf("failed to mount volume %s: %v", volId, err) + if err = m.mount(stagingTargetPath, cr, volOptions, volID); err != nil { + glog.Errorf("failed to mount volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } - glog.Infof("cephfs: successfully mounted volume %s to %s", volId, stagingTargetPath) + glog.Infof("cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } -func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { if err := validateNodePublishVolumeRequest(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -157,7 +157,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // Configuration targetPath := req.GetTargetPath() - volId := req.GetVolumeId() + volID := req.GetVolumeId() if err := createMountPoint(targetPath); err != nil { glog.Errorf("failed to create mount point at %s: %v", targetPath, err) @@ -174,23 +174,23 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if isMnt { - glog.Infof("cephfs: volume %s is already bind-mounted to %s", volId, targetPath) + glog.Infof("cephfs: volume %s is already bind-mounted to %s", volID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } // It's not, mount now if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil { - glog.Errorf("failed to bind-mount volume %s: %v", volId, err) + glog.Errorf("failed to bind-mount volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) } - glog.Infof("cephfs: successfully bind-mounted volume %s to %s", volId, targetPath) + glog.Infof("cephfs: successfully bind-mounted volume %s to %s", volID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { +func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { if err := validateNodeUnpublishVolumeRequest(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -209,7 +209,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { if err := validateNodeUnstageVolumeRequest(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -228,7 +228,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return &csi.NodeUnstageVolumeResponse{}, nil } -func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { +func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ { @@ -242,6 +242,6 @@ func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, nil } -func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { +func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { return ns.DefaultNodeServer.NodeGetInfo(ctx, req) } diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index d5b8a1360..44b93cc13 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -53,7 +53,8 @@ func execCommandAndValidate(program string, args ...string) error { return nil } -func execCommandJson(v interface{}, program string, args ...string) error { +func execCommandJSON(v interface{}, args ...string) error { + program := "ceph" out, err := execCommand(program, args...) if err != nil { @@ -75,11 +76,11 @@ func isMountPoint(p string) (bool, error) { return !notMnt, nil } -func storeCephCredentials(volId volumeID, cr *credentials) error { +func storeCephCredentials(volID volumeID, cr *credentials) error { keyringData := cephKeyringData{ - UserId: cr.id, + UserID: cr.id, Key: cr.key, - VolumeID: volId, + VolumeID: volID, } if err := keyringData.writeToFile(); err != nil { @@ -87,23 +88,20 @@ func storeCephCredentials(volId volumeID, cr *credentials) error { } secret := cephSecretData{ - UserId: cr.id, + UserID: cr.id, Key: cr.key, - VolumeID: volId, + VolumeID: volID, } - if err := secret.writeToFile(); err != nil { - return err - } - - return nil + err := secret.writeToFile() + return err } // // Controller service request validation // -func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { +func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { return fmt.Errorf("invalid CreateVolumeRequest: %v", err) } @@ -126,7 +124,7 @@ func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeReq return nil } -func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error { +func (cs *ControllerServer) validateDeleteVolumeRequest() error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { return fmt.Errorf("invalid DeleteVolumeRequest: %v", err) } diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index a92b70ea3..ce6f01a17 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -29,28 +29,28 @@ const ( namespacePrefix = "ns-" ) -func getCephRootPathLocal(volId volumeID) string { - return cephRootPrefix + string(volId) +func getCephRootPathLocal(volID volumeID) string { + return cephRootPrefix + string(volID) } -func getCephRootVolumePathLocal(volId volumeID) string { - return path.Join(getCephRootPathLocal(volId), cephVolumesRoot, string(volId)) +func getCephRootVolumePathLocal(volID volumeID) string { + return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID)) } -func getVolumeRootPathCeph(volId volumeID) string { - return path.Join("/", cephVolumesRoot, string(volId)) +func getVolumeRootPathCeph(volID volumeID) string { + return path.Join("/", cephVolumesRoot, string(volID)) } -func getVolumeNamespace(volId volumeID) string { - return namespacePrefix + string(volId) +func getVolumeNamespace(volID volumeID) string { + return namespacePrefix + string(volID) } func setVolumeAttribute(root, attrName, attrValue string) error { return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root) } -func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeID, bytesQuota int64) error { - cephRoot := getCephRootPathLocal(volId) +func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error { + cephRoot := getCephRootPathLocal(volID) if err := createMountPoint(cephRoot); err != nil { return err @@ -65,7 +65,7 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeI return fmt.Errorf("failed to create mounter: %v", err) } - if err = m.mount(cephRoot, adminCr, volOptions, volId); err != nil { + if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { return fmt.Errorf("error mounting ceph root: %v", err) } @@ -74,8 +74,8 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeI os.Remove(cephRoot) }() - volOptions.RootPath = getVolumeRootPathCeph(volId) - localVolRoot := getCephRootVolumePathLocal(volId) + volOptions.RootPath = getVolumeRootPathCeph(volID) + localVolRoot := getCephRootVolumePathLocal(volID) if err := createMountPoint(localVolRoot); err != nil { return err @@ -91,17 +91,17 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeI return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) } - if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volId)); err != nil { + if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { return err } return nil } -func purgeVolume(volId volumeID, adminCr *credentials, volOptions *volumeOptions) error { +func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error { var ( - cephRoot = getCephRootPathLocal(volId) - volRoot = getCephRootVolumePathLocal(volId) + cephRoot = getCephRootPathLocal(volID) + volRoot = getCephRootVolumePathLocal(volID) volRootDeleting = volRoot + "-deleting" ) @@ -118,7 +118,7 @@ func purgeVolume(volId volumeID, adminCr *credentials, volOptions *volumeOptions return fmt.Errorf("failed to create mounter: %v", err) } - if err = m.mount(cephRoot, adminCr, volOptions, volId); err != nil { + if err = m.mount(cephRoot, adminCr, volOptions, volID); err != nil { return fmt.Errorf("error mounting ceph root: %v", err) } @@ -128,11 +128,11 @@ func purgeVolume(volId volumeID, adminCr *credentials, volOptions *volumeOptions }() if err := os.Rename(volRoot, volRootDeleting); err != nil { - return fmt.Errorf("coudln't mark volume %s for deletion: %v", volId, err) + return fmt.Errorf("coudln't mark volume %s for deletion: %v", volID, err) } if err := os.RemoveAll(volRootDeleting); err != nil { - return fmt.Errorf("failed to delete volume %s: %v", volId, err) + return fmt.Errorf("failed to delete volume %s: %v", volID, err) } return nil diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index ead40fabe..58ff48abd 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -55,7 +55,7 @@ func loadAvailableMounters() error { } type volumeMounter interface { - mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error + mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volID volumeID) error name() string } @@ -99,12 +99,12 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) { type fuseMounter struct{} -func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { +func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volID volumeID) error { args := [...]string{ mountPoint, - "-c", getCephConfPath(volId), + "-c", getCephConfPath(volID), "-n", cephEntityClientPrefix + cr.id, - "--keyring", getCephKeyringPath(volId, cr.id), + "--keyring", getCephKeyringPath(volID, cr.id), "-r", volOptions.RootPath, "-o", "nonempty", } @@ -121,19 +121,19 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, vo return nil } -func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { +func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volID volumeID) error { if err := createMountPoint(mountPoint); err != nil { return err } - return mountFuse(mountPoint, cr, volOptions, volId) + return mountFuse(mountPoint, cr, volOptions, volID) } func (m *fuseMounter) name() string { return "Ceph FUSE driver" } type kernelMounter struct{} -func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { +func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volID volumeID) error { if err := execCommandAndValidate("modprobe", "ceph"); err != nil { return err } @@ -143,16 +143,16 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), mountPoint, "-o", - fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volId, cr.id)), + fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volID, cr.id)), ) } -func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { +func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volID volumeID) error { if err := createMountPoint(mountPoint); err != nil { return err } - return mountKernel(mountPoint, cr, volOptions, volId) + return mountKernel(mountPoint, cr, volOptions, volID) } func (m *kernelMounter) name() string { return "Ceph kernel client" } diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 9b0636485..c731675fe 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -39,7 +39,7 @@ const ( oneGB = 1073741824 ) -type controllerServer struct { +type ControllerServer struct { *csicommon.DefaultControllerServer MetadataStore util.CachePersister } @@ -49,7 +49,7 @@ var ( rbdSnapshots = map[string]*rbdSnapshot{} ) -func (cs *controllerServer) LoadExDataFromMetadataStore() error { +func (cs *ControllerServer) LoadExDataFromMetadataStore() error { vol := &rbdVolume{} cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error { rbdVolumes[identifier] = vol @@ -65,19 +65,26 @@ func (cs *controllerServer) LoadExDataFromMetadataStore() error { return nil } -func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { +func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("invalid create volume req: %v", req) - return nil, err + return err } // Check sanity of request Name, Volume Capabilities if len(req.Name) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty") + return status.Error(codes.InvalidArgument, "Volume Name cannot be empty") } if req.VolumeCapabilities == nil { - return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") + return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") } + return nil +} +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + + if err := cs.validateVolumeReq(req); err != nil { + return nil, err + } volumeNameMutex.LockKey(req.GetName()) defer volumeNameMutex.UnlockKey(req.GetName()) @@ -87,13 +94,13 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // Since err is nil, it means the volume with the same name already exists // need to check if the size of exisiting volume is the same as in new // request - if exVol.VolSize >= int64(req.GetCapacityRange().GetRequiredBytes()) { + if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() { // exisiting volume is compatible with new request and should be reused. // TODO (sbezverk) Do I need to make sure that RBD volume still exists? return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: exVol.VolID, - CapacityBytes: int64(exVol.VolSize), + CapacityBytes: exVol.VolSize, VolumeContext: req.GetParameters(), }, }, nil @@ -120,49 +127,32 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // Volume Size - Default is 1 GiB volSizeBytes := int64(oneGB) if req.GetCapacityRange() != nil { - volSizeBytes = int64(req.GetCapacityRange().GetRequiredBytes()) + volSizeBytes = req.GetCapacityRange().GetRequiredBytes() } rbdVol.VolSize = volSizeBytes volSizeGB := int(volSizeBytes / 1024 / 1024 / 1024) // Check if there is already RBD image with requested name - found, _, _ := rbdStatus(rbdVol, rbdVol.UserId, req.GetSecrets()) + found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets()) if !found { // if VolumeContentSource is not nil, this request is for snapshot if req.VolumeContentSource != nil { - snapshot := req.VolumeContentSource.GetSnapshot() - if snapshot == nil { - return nil, status.Error(codes.InvalidArgument, "Volume Snapshot cannot be empty") - } - - snapshotID := snapshot.GetSnapshotId() - if len(snapshotID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume Snapshot ID cannot be empty") - } - - rbdSnap := &rbdSnapshot{} - if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil { + if err = cs.checkSnapshot(req, rbdVol); err != nil { return nil, err } - - err = restoreSnapshot(rbdVol, rbdSnap, rbdVol.AdminId, req.GetSecrets()) - if err != nil { - return nil, err - } - glog.V(4).Infof("create volume %s from snapshot %s", volName, rbdSnap.SnapName) } else { - if err := createRBDImage(rbdVol, volSizeGB, rbdVol.AdminId, req.GetSecrets()); err != nil { - if err != nil { - glog.Warningf("failed to create volume: %v", err) - return nil, err - } + err = createRBDImage(rbdVol, volSizeGB, rbdVol.AdminID, req.GetSecrets()) + if err != nil { + glog.Warningf("failed to create volume: %v", err) + return nil, err } + glog.V(4).Infof("create volume %s", volName) } } - if err := cs.MetadataStore.Create(volumeID, rbdVol); err != nil { + if err = cs.MetadataStore.Create(volumeID, rbdVol); err != nil { glog.Warningf("failed to store volume metadata with error: %v", err) - if err := deleteRBDImage(rbdVol, rbdVol.AdminId, req.GetSecrets()); err != nil { + if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil { glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err) return nil, err } @@ -173,13 +163,37 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: volumeID, - CapacityBytes: int64(volSizeBytes), + CapacityBytes: volSizeBytes, VolumeContext: req.GetParameters(), }, }, nil } -func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { +func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error { + snapshot := req.VolumeContentSource.GetSnapshot() + if snapshot == nil { + return status.Error(codes.InvalidArgument, "Volume Snapshot cannot be empty") + } + + snapshotID := snapshot.GetSnapshotId() + if len(snapshotID) == 0 { + return status.Error(codes.InvalidArgument, "Volume Snapshot ID cannot be empty") + } + + rbdSnap := &rbdSnapshot{} + if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil { + return err + } + + err := restoreSnapshot(rbdVol, rbdSnap, rbdVol.AdminID, req.GetSecrets()) + if err != nil { + return err + } + glog.V(4).Infof("create volume %s from snapshot %s", req.GetName(), rbdSnap.SnapName) + return nil +} + +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.Warningf("invalid delete volume req: %v", req) return nil, err @@ -199,7 +213,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol volName := rbdVol.VolName // Deleting rbd image glog.V(4).Infof("deleting volume %s", volName) - if err := deleteRBDImage(rbdVol, rbdVol.AdminId, req.GetSecrets()); err != nil { + if err := deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil { // TODO: can we detect "already deleted" situations here and proceed? glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, volName, err) return nil, err @@ -213,7 +227,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } -func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { +func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { for _, cap := range req.VolumeCapabilities { if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER { return &csi.ValidateVolumeCapabilitiesResponse{Message: ""}, nil @@ -226,15 +240,15 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req }, nil } -func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { return &csi.ControllerUnpublishVolumeResponse{}, nil } -func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { return &csi.ControllerPublishVolumeResponse{}, nil } -func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { +func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { glog.Warningf("invalid create snapshot req: %v", req) return nil, err @@ -293,7 +307,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.SourceVolumeID = req.GetSourceVolumeId() rbdSnap.SizeBytes = rbdVolume.VolSize - err = createSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()) + err = createSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()) // if we already have the snapshot, return the snapshot if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { @@ -314,10 +328,10 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS } } else { glog.V(4).Infof("create snapshot %s", snapName) - err = protectSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()) + err = protectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()) if err != nil { - err = deleteSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()) + err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()) if err != nil { return nil, fmt.Errorf("snapshot is created but failed to protect and delete snapshot: %v", err) } @@ -327,16 +341,16 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds() - if err := cs.MetadataStore.Create(snapshotID, rbdSnap); err != nil { + if err = cs.MetadataStore.Create(snapshotID, rbdSnap); err != nil { glog.Warningf("rbd: failed to store snapInfo with error: %v", err) // Unprotect snapshot - err := unprotectSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()) + err = unprotectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()) if err != nil { return nil, status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) } // Deleting snapshot glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName) - if err := deleteSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()); err != nil { + if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()); err != nil { return nil, status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) } return nil, err @@ -356,7 +370,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS }, nil } -func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { +func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { glog.Warningf("invalid delete snapshot req: %v", req) return nil, err @@ -375,14 +389,14 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS } // Unprotect snapshot - err := unprotectSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()) + err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()) if err != nil { return nil, status.Errorf(codes.FailedPrecondition, "failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) } // Deleting snapshot glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName) - if err := deleteSnapshot(rbdSnap, rbdSnap.AdminId, req.GetSecrets()); err != nil { + if err := deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()); err != nil { return nil, status.Errorf(codes.FailedPrecondition, "failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) } @@ -395,13 +409,13 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return &csi.DeleteSnapshotResponse{}, nil } -func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { +func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil { glog.Warningf("invalid list snapshot req: %v", req) return nil, err } - sourceVolumeId := req.GetSourceVolumeId() + sourceVolumeID := req.GetSourceVolumeId() // TODO (sngchlko) list with token // TODO (#94) protect concurrent access to global data structures @@ -410,8 +424,8 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 { if rbdSnap, ok := rbdSnapshots[snapshotID]; ok { // if source volume ID also set, check source volume id on the cache. - if len(sourceVolumeId) != 0 && rbdSnap.SourceVolumeID != sourceVolumeId { - return nil, status.Errorf(codes.Unknown, "Requested Source Volume ID %s is different from %s", sourceVolumeId, rbdSnap.SourceVolumeID) + if len(sourceVolumeID) != 0 && rbdSnap.SourceVolumeID != sourceVolumeID { + return nil, status.Errorf(codes.Unknown, "Requested Source Volume ID %s is different from %s", sourceVolumeID, rbdSnap.SourceVolumeID) } return &csi.ListSnapshotsResponse{ Entries: []*csi.ListSnapshotsResponse_Entry{ @@ -436,7 +450,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap entries := []*csi.ListSnapshotsResponse_Entry{} for _, rbdSnap := range rbdSnapshots { // if source volume ID also set, check source volume id on the cache. - if len(sourceVolumeId) != 0 && rbdSnap.SourceVolumeID != sourceVolumeId { + if len(sourceVolumeID) != 0 && rbdSnap.SourceVolumeID != sourceVolumeID { continue } entries = append(entries, &csi.ListSnapshotsResponse_Entry{ diff --git a/pkg/rbd/identityserver.go b/pkg/rbd/identityserver.go index 3e977dbcc..759903aa6 100644 --- a/pkg/rbd/identityserver.go +++ b/pkg/rbd/identityserver.go @@ -23,11 +23,11 @@ import ( "github.com/kubernetes-csi/drivers/pkg/csi-common" ) -type identityServer struct { +type IdentityServer struct { *csicommon.DefaultIdentityServer } -func (is *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { +func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { return &csi.GetPluginCapabilitiesResponse{ Capabilities: []*csi.PluginCapability{ { diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index 2eeaf0530..e5696f5d5 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -35,12 +35,12 @@ import ( "github.com/kubernetes-csi/drivers/pkg/csi-common" ) -type nodeServer struct { +type NodeServer struct { *csicommon.DefaultNodeServer mounter mount.Interface } -func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { targetPath := req.GetTargetPath() targetPathMutex.LockKey(targetPath) defer targetPathMutex.UnlockKey(targetPath) @@ -97,7 +97,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } volOptions.VolName = volName // Mapping RBD image - devicePath, err := attachRBDImage(volOptions, volOptions.UserId, req.GetSecrets()) + devicePath, err := attachRBDImage(volOptions, volOptions.UserID, req.GetSecrets()) if err != nil { return nil, err } @@ -132,7 +132,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { +func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { targetPath := req.GetTargetPath() targetPathMutex.LockKey(targetPath) defer targetPathMutex.UnlockKey(targetPath) @@ -202,7 +202,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeStageVolume( +func (ns *NodeServer) NodeStageVolume( ctx context.Context, req *csi.NodeStageVolumeRequest) ( *csi.NodeStageVolumeResponse, error) { @@ -210,7 +210,7 @@ func (ns *nodeServer) NodeStageVolume( return nil, status.Error(codes.Unimplemented, "") } -func (ns *nodeServer) NodeUnstageVolume( +func (ns *NodeServer) NodeUnstageVolume( ctx context.Context, req *csi.NodeUnstageVolumeRequest) ( *csi.NodeUnstageVolumeResponse, error) { @@ -218,7 +218,7 @@ func (ns *nodeServer) NodeUnstageVolume( return nil, status.Error(codes.Unimplemented, "") } -func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { +func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { return ns.DefaultNodeServer.NodeGetInfo(ctx, req) } diff --git a/pkg/rbd/rbd.go b/pkg/rbd/rbd.go index 014a9802b..0aea74764 100644 --- a/pkg/rbd/rbd.go +++ b/pkg/rbd/rbd.go @@ -31,44 +31,40 @@ import ( // PluginFolder defines the location of rbdplugin const ( PluginFolder = "/var/lib/kubelet/plugins/csi-rbdplugin" - rbdDefaultAdminId = "admin" - rbdDefaultUserId = rbdDefaultAdminId + rbdDefaultAdminID = "admin" + rbdDefaultUserID = rbdDefaultAdminID ) -type rbd struct { - driver *csicommon.CSIDriver +type Driver struct { + cd *csicommon.CSIDriver - ids *identityServer - ns *nodeServer - cs *controllerServer - - cap []*csi.VolumeCapability_AccessMode - cscap []*csi.ControllerServiceCapability + ids *IdentityServer + ns *NodeServer + cs *ControllerServer } var ( - rbdDriver *rbd - version = "1.0.0" + version = "1.0.0" ) -func GetRBDDriver() *rbd { - return &rbd{} +func GetDriver() *Driver { + return &Driver{} } -func NewIdentityServer(d *csicommon.CSIDriver) *identityServer { - return &identityServer{ +func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer { + return &IdentityServer{ DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d), } } -func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *controllerServer { - return &controllerServer{ +func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer { + return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), MetadataStore: cachePersister, } } -func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*nodeServer, error) { +func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) { mounter := mount.New("") if containerized { ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New()) @@ -77,40 +73,40 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*nodeServer, err } mounter = mount.NewNsenterMounter("", ne) } - return &nodeServer{ + return &NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d), mounter: mounter, }, nil } -func (rbd *rbd) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) { +func (r *Driver) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) { var err error glog.Infof("Driver: %v version: %v", driverName, version) // Initialize default library driver - rbd.driver = csicommon.NewCSIDriver(driverName, version, nodeID) - if rbd.driver == nil { + r.cd = csicommon.NewCSIDriver(driverName, version, nodeID) + if r.cd == nil { glog.Fatalln("Failed to initialize CSI Driver.") } - rbd.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + r.cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, }) - rbd.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}) + r.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}) // Create GRPC servers - rbd.ids = NewIdentityServer(rbd.driver) - rbd.ns, err = NewNodeServer(rbd.driver, containerized) + r.ids = NewIdentityServer(r.cd) + r.ns, err = NewNodeServer(r.cd, containerized) if err != nil { glog.Fatalf("failed to start node server, err %v\n", err) } - rbd.cs = NewControllerServer(rbd.driver, cachePersister) - rbd.cs.LoadExDataFromMetadataStore() + r.cs = NewControllerServer(r.cd, cachePersister) + r.cs.LoadExDataFromMetadataStore() s := csicommon.NewNonBlockingGRPCServer() - s.Start(endpoint, rbd.ids, rbd.cs, rbd.ns) + s.Start(endpoint, r.ids, r.cs, r.ns) s.Wait() } diff --git a/pkg/rbd/rbd_attach.go b/pkg/rbd/rbd_attach.go index e24ca02a7..ee7b39a2c 100644 --- a/pkg/rbd/rbd_attach.go +++ b/pkg/rbd/rbd_attach.go @@ -31,6 +31,9 @@ import ( const ( envHostRootFS = "HOST_ROOTFS" + rbdTonbd = "rbd-nbd" + rbd = "rbd" + nbd = "nbd" ) var ( @@ -46,18 +49,6 @@ func init() { hasNBD = checkRbdNbdTools() } -func getDevFromImageAndPool(pool, image string) (string, bool) { - device, found := getRbdDevFromImageAndPool(pool, image) - if found { - return device, true - } - device, found = getNbdDevFromImageAndPool(pool, image) - if found { - return device, true - } - return "", false -} - // Search /sys/bus for rbd device that matches given pool and image. func getRbdDevFromImageAndPool(pool string, image string) (string, bool) { // /sys/bus/rbd/devices/X/name and /sys/bus/rbd/devices/X/pool @@ -166,7 +157,7 @@ func getNbdDevFromImageAndPool(pool string, image string) (string, bool) { // Check if this process is mapping a rbd device. // Only accepted pattern of cmdline is from execRbdMap: // rbd-nbd map pool/image ... - if len(cmdlineArgs) < 3 || cmdlineArgs[0] != "rbd-nbd" || cmdlineArgs[1] != "map" { + if len(cmdlineArgs) < 3 || cmdlineArgs[0] != rbdTonbd || cmdlineArgs[1] != "map" { glog.V(4).Infof("nbd device %s is not used by rbd", nbdPath) continue } @@ -211,7 +202,7 @@ func checkRbdNbdTools() bool { glog.V(3).Infof("rbd-nbd: nbd modprobe failed with error %v", err) return false } - if _, err := execCommand("rbd-nbd", []string{"--version"}); err != nil { + if _, err := execCommand(rbdTonbd, []string{"--version"}); err != nil { glog.V(3).Infof("rbd-nbd: running rbd-nbd --version failed with error %v", err) return false } @@ -219,7 +210,7 @@ func checkRbdNbdTools() bool { return true } -func attachRBDImage(volOptions *rbdVolume, userId string, credentials map[string]string) (string, error) { +func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string]string) (string, error) { var err error var output []byte @@ -227,18 +218,18 @@ func attachRBDImage(volOptions *rbdVolume, userId string, credentials map[string imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image) useNBD := false - cmdName := "rbd" - moduleName := "rbd" - if volOptions.Mounter == "rbd-nbd" && hasNBD { + cmdName := rbd + moduleName := rbd + if volOptions.Mounter == rbdTonbd && hasNBD { useNBD = true - cmdName = "rbd-nbd" - moduleName = "nbd" + cmdName = rbdTonbd + moduleName = nbd } devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD) if !found { - attachdetachMutex.LockKey(string(imagePath)) - defer attachdetachMutex.UnlockKey(string(imagePath)) + attachdetachMutex.LockKey(imagePath) + defer attachdetachMutex.UnlockKey(imagePath) _, err = execCommand("modprobe", []string{moduleName}) if err != nil { @@ -251,7 +242,7 @@ func attachRBDImage(volOptions *rbdVolume, userId string, credentials map[string Steps: rbdImageWatcherSteps, } err := wait.ExponentialBackoff(backoff, func() (bool, error) { - used, rbdOutput, err := rbdStatus(volOptions, userId, credentials) + used, rbdOutput, err := rbdStatus(volOptions, userID, credentials) if err != nil { return false, fmt.Errorf("fail to check rbd image status with: (%v), rbd output: (%s)", err, rbdOutput) } @@ -272,12 +263,12 @@ func attachRBDImage(volOptions *rbdVolume, userId string, credentials map[string } glog.V(5).Infof("rbd: map mon %s", mon) - key, err := getRBDKey(userId, credentials) + key, err := getRBDKey(userID, credentials) if err != nil { return "", err } output, err = execCommand(cmdName, []string{ - "map", imagePath, "--id", userId, "-m", mon, "--key=" + key}) + "map", imagePath, "--id", userID, "-m", mon, "--key=" + key}) if err != nil { glog.Warningf("rbd: map error %v, rbd output: %s", err, string(output)) return "", fmt.Errorf("rbd: map failed %v, rbd output: %s", err, string(output)) @@ -297,9 +288,9 @@ func detachRBDDevice(devicePath string) error { glog.V(3).Infof("rbd: unmap device %s", devicePath) - cmdName := "rbd" + cmdName := rbd if strings.HasPrefix(devicePath, "/dev/nbd") { - cmdName = "rbd-nbd" + cmdName = rbdTonbd } output, err = execCommand(cmdName, []string{"unmap", devicePath}) diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index 03be2b327..abff869d8 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -30,11 +30,7 @@ import ( const ( imageWatcherStr = "watcher=" - rbdImageFormat1 = "1" rbdImageFormat2 = "2" - imageSizeStr = "size " - sizeDivStr = " MB in" - kubeLockMagic = "kubelet_lock_magic_" // The following three values are used for 30 seconds timeout // while waiting for RBD Watcher to expire. rbdImageWatcherInitDelay = 1 * time.Second @@ -52,8 +48,8 @@ type rbdVolume struct { ImageFormat string `json:"imageFormat"` ImageFeatures string `json:"imageFeatures"` VolSize int64 `json:"volSize"` - AdminId string `json:"adminId"` - UserId string `json:"userId"` + AdminID string `json:"adminId"` + UserID string `json:"userId"` Mounter string `json:"mounter"` } @@ -67,8 +63,8 @@ type rbdSnapshot struct { Pool string `json:"pool"` CreatedAt int64 `json:"createdAt"` SizeBytes int64 `json:"sizeBytes"` - AdminId string `json:"adminId"` - UserId string `json:"userId"` + AdminID string `json:"adminId"` + UserID string `json:"userId"` } var ( @@ -115,7 +111,7 @@ func getMon(pOpts *rbdVolume, credentials map[string]string) (string, error) { } // CreateImage creates a new ceph image with provision and volume options. -func createRBDImage(pOpts *rbdVolume, volSz int, adminId string, credentials map[string]string) error { +func createRBDImage(pOpts *rbdVolume, volSz int, adminID string, credentials map[string]string) error { var output []byte mon, err := getMon(pOpts, credentials) @@ -126,16 +122,16 @@ func createRBDImage(pOpts *rbdVolume, volSz int, adminId string, credentials map image := pOpts.VolName volSzGB := fmt.Sprintf("%dG", volSz) - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } if pOpts.ImageFormat == rbdImageFormat2 { - glog.V(4).Infof("rbd: create %s size %s format %s (features: %s) using mon %s, pool %s id %s key %s", image, volSzGB, pOpts.ImageFormat, pOpts.ImageFeatures, mon, pOpts.Pool, adminId, key) + glog.V(4).Infof("rbd: create %s size %s format %s (features: %s) using mon %s, pool %s id %s key %s", image, volSzGB, pOpts.ImageFormat, pOpts.ImageFeatures, mon, pOpts.Pool, adminID, key) } else { - glog.V(4).Infof("rbd: create %s size %s format %s using mon %s, pool %s id %s key %s", image, volSzGB, pOpts.ImageFormat, mon, pOpts.Pool, adminId, key) + glog.V(4).Infof("rbd: create %s size %s format %s using mon %s, pool %s id %s key %s", image, volSzGB, pOpts.ImageFormat, mon, pOpts.Pool, adminID, key) } - args := []string{"create", image, "--size", volSzGB, "--pool", pOpts.Pool, "--id", adminId, "-m", mon, "--key=" + key, "--image-format", pOpts.ImageFormat} + args := []string{"create", image, "--size", volSzGB, "--pool", pOpts.Pool, "--id", adminID, "-m", mon, "--key=" + key, "--image-format", pOpts.ImageFormat} if pOpts.ImageFormat == rbdImageFormat2 { args = append(args, "--image-feature", pOpts.ImageFeatures) } @@ -150,14 +146,14 @@ func createRBDImage(pOpts *rbdVolume, volSz int, adminId string, credentials map // rbdStatus checks if there is watcher on the image. // It returns true if there is a watcher onthe image, otherwise returns false. -func rbdStatus(pOpts *rbdVolume, userId string, credentials map[string]string) (bool, string, error) { +func rbdStatus(pOpts *rbdVolume, userID string, credentials map[string]string) (bool, string, error) { var output string var cmd []byte image := pOpts.VolName // If we don't have admin id/secret (e.g. attaching), fallback to user id/secret. - key, err := getRBDKey(userId, credentials) + key, err := getRBDKey(userID, credentials) if err != nil { return false, "", err } @@ -167,8 +163,8 @@ func rbdStatus(pOpts *rbdVolume, userId string, credentials map[string]string) ( return false, "", err } - glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, userId, key) - args := []string{"status", image, "--pool", pOpts.Pool, "-m", mon, "--id", userId, "--key=" + key} + glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, userID, key) + args := []string{"status", image, "--pool", pOpts.Pool, "-m", mon, "--id", userID, "--key=" + key} cmd, err = execCommand("rbd", args) output = string(cmd) @@ -194,10 +190,10 @@ func rbdStatus(pOpts *rbdVolume, userId string, credentials map[string]string) ( } // DeleteImage deletes a ceph image with provision and volume options. -func deleteRBDImage(pOpts *rbdVolume, adminId string, credentials map[string]string) error { +func deleteRBDImage(pOpts *rbdVolume, adminID string, credentials map[string]string) error { var output []byte image := pOpts.VolName - found, _, err := rbdStatus(pOpts, adminId, credentials) + found, _, err := rbdStatus(pOpts, adminID, credentials) if err != nil { return err } @@ -205,7 +201,7 @@ func deleteRBDImage(pOpts *rbdVolume, adminId string, credentials map[string]str glog.Info("rbd is still being used ", image) return fmt.Errorf("rbd %s is still being used", image) } - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } @@ -214,8 +210,8 @@ func deleteRBDImage(pOpts *rbdVolume, adminId string, credentials map[string]str return err } - glog.V(4).Infof("rbd: rm %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminId, key) - args := []string{"rm", image, "--pool", pOpts.Pool, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: rm %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminID, key) + args := []string{"rm", image, "--pool", pOpts.Pool, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) if err == nil { return nil @@ -250,8 +246,8 @@ func getRBDVolumeOptions(volOptions map[string]string) (*rbdVolume, error) { if rbdVol.ImageFormat == rbdImageFormat2 { // if no image features is provided, it results in empty string // which disable all RBD image format 2 features as we expected - imageFeatures, ok := volOptions["imageFeatures"] - if ok { + imageFeatures, found := volOptions["imageFeatures"] + if found { arr := strings.Split(imageFeatures, ",") for _, f := range arr { if !supportedFeatures.Has(f) { @@ -262,13 +258,13 @@ func getRBDVolumeOptions(volOptions map[string]string) (*rbdVolume, error) { } } - rbdVol.AdminId, ok = volOptions["adminid"] + rbdVol.AdminID, ok = volOptions["adminid"] if !ok { - rbdVol.AdminId = rbdDefaultAdminId + rbdVol.AdminID = rbdDefaultAdminID } - rbdVol.UserId, ok = volOptions["userid"] + rbdVol.UserID, ok = volOptions["userid"] if !ok { - rbdVol.UserId = rbdDefaultUserId + rbdVol.UserID = rbdDefaultUserID } rbdVol.Mounter, ok = volOptions["mounter"] if !ok { @@ -291,13 +287,13 @@ func getRBDSnapshotOptions(snapOptions map[string]string) (*rbdSnapshot, error) return nil, fmt.Errorf("Either monitors or monValueFromSecret must be set") } } - rbdSnap.AdminId, ok = snapOptions["adminid"] + rbdSnap.AdminID, ok = snapOptions["adminid"] if !ok { - rbdSnap.AdminId = rbdDefaultAdminId + rbdSnap.AdminID = rbdDefaultAdminID } - rbdSnap.UserId, ok = snapOptions["userid"] + rbdSnap.UserID, ok = snapOptions["userid"] if !ok { - rbdSnap.UserId = rbdDefaultUserId + rbdSnap.UserID = rbdDefaultUserID } return rbdSnap, nil @@ -355,13 +351,13 @@ func getSnapMon(pOpts *rbdSnapshot, credentials map[string]string) (string, erro return mon, nil } -func protectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]string) error { +func protectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error { var output []byte image := pOpts.VolName snapID := pOpts.SnapID - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } @@ -370,8 +366,8 @@ func protectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string] return err } - glog.V(4).Infof("rbd: snap protect %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminId, key) - args := []string{"snap", "protect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: snap protect %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminID, key) + args := []string{"snap", "protect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) @@ -382,7 +378,7 @@ func protectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string] return nil } -func createSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]string) error { +func createSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error { var output []byte mon, err := getSnapMon(pOpts, credentials) @@ -393,12 +389,12 @@ func createSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]s image := pOpts.VolName snapID := pOpts.SnapID - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } - glog.V(4).Infof("rbd: snap create %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminId, key) - args := []string{"snap", "create", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: snap create %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminID, key) + args := []string{"snap", "create", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) @@ -409,7 +405,7 @@ func createSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]s return nil } -func unprotectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]string) error { +func unprotectSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error { var output []byte mon, err := getSnapMon(pOpts, credentials) @@ -420,12 +416,12 @@ func unprotectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[strin image := pOpts.VolName snapID := pOpts.SnapID - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } - glog.V(4).Infof("rbd: snap unprotect %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminId, key) - args := []string{"snap", "unprotect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: snap unprotect %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminID, key) + args := []string{"snap", "unprotect", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) @@ -436,7 +432,7 @@ func unprotectSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[strin return nil } -func deleteSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]string) error { +func deleteSnapshot(pOpts *rbdSnapshot, adminID string, credentials map[string]string) error { var output []byte mon, err := getSnapMon(pOpts, credentials) @@ -447,12 +443,12 @@ func deleteSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]s image := pOpts.VolName snapID := pOpts.SnapID - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } - glog.V(4).Infof("rbd: snap rm %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminId, key) - args := []string{"snap", "rm", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: snap rm %s using mon %s, pool %s id %s key %s", image, mon, pOpts.Pool, adminID, key) + args := []string{"snap", "rm", "--pool", pOpts.Pool, "--snap", snapID, image, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) @@ -463,7 +459,7 @@ func deleteSnapshot(pOpts *rbdSnapshot, adminId string, credentials map[string]s return nil } -func restoreSnapshot(pVolOpts *rbdVolume, pSnapOpts *rbdSnapshot, adminId string, credentials map[string]string) error { +func restoreSnapshot(pVolOpts *rbdVolume, pSnapOpts *rbdSnapshot, adminID string, credentials map[string]string) error { var output []byte mon, err := getMon(pVolOpts, credentials) @@ -474,12 +470,12 @@ func restoreSnapshot(pVolOpts *rbdVolume, pSnapOpts *rbdSnapshot, adminId string image := pVolOpts.VolName snapID := pSnapOpts.SnapID - key, err := getRBDKey(adminId, credentials) + key, err := getRBDKey(adminID, credentials) if err != nil { return err } - glog.V(4).Infof("rbd: clone %s using mon %s, pool %s id %s key %s", image, mon, pVolOpts.Pool, adminId, key) - args := []string{"clone", pSnapOpts.Pool + "/" + pSnapOpts.VolName + "@" + snapID, pVolOpts.Pool + "/" + image, "--id", adminId, "-m", mon, "--key=" + key} + glog.V(4).Infof("rbd: clone %s using mon %s, pool %s id %s key %s", image, mon, pVolOpts.Pool, adminID, key) + args := []string{"clone", pSnapOpts.Pool + "/" + pSnapOpts.VolName + "@" + snapID, pVolOpts.Pool + "/" + image, "--id", adminID, "-m", mon, "--key=" + key} output, err = execCommand("rbd", args) diff --git a/pkg/util/cachepersister.go b/pkg/util/cachepersister.go index f6ae13f9d..c764cc026 100644 --- a/pkg/util/cachepersister.go +++ b/pkg/util/cachepersister.go @@ -35,7 +35,7 @@ type CachePersister interface { Delete(identifier string) error } -func NewCachePersister(metadataStore string, driverName string) (CachePersister, error) { +func NewCachePersister(metadataStore, driverName string) (CachePersister, error) { if metadataStore == "k8s_configmap" { glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister") k8scm := &K8sCMCache{} diff --git a/pkg/util/k8scmcache.go b/pkg/util/k8scmcache.go index f1e27ba3c..7f5190026 100644 --- a/pkg/util/k8scmcache.go +++ b/pkg/util/k8scmcache.go @@ -104,7 +104,7 @@ func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFun if !match { continue } - if err := json.Unmarshal([]byte(data), destObj); err != nil { + if err = json.Unmarshal([]byte(data), destObj); err != nil { return errors.Wrap(err, "k8s-cm-cache: unmarshal error") } if err = f(cm.ObjectMeta.Name); err != nil { @@ -119,33 +119,32 @@ func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error { if cm != nil && err == nil { glog.V(4).Infof("k8s-cm-cache: configmap already exists, skipping configmap creation") return nil - } else { - dataJson, err := json.Marshal(data) - if err != nil { - return errors.Wrap(err, "k8s-cm-cache: marshal error") - } - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: identifier, - Namespace: k8scm.Namespace, - Labels: map[string]string{ - csiMetadataLabelAttr: cmLabel, - }, - }, - Data: map[string]string{}, - } - cm.Data[cmDataKey] = string(dataJson) - - _, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(cm) - if err != nil { - if apierrs.IsAlreadyExists(err) { - glog.V(4).Infof("k8s-cm-cache: configmap already exists") - return nil - } - return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier) - } - } + dataJSON, err := json.Marshal(data) + if err != nil { + return errors.Wrap(err, "k8s-cm-cache: marshal error") + } + cm = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: identifier, + Namespace: k8scm.Namespace, + Labels: map[string]string{ + csiMetadataLabelAttr: cmLabel, + }, + }, + Data: map[string]string{}, + } + cm.Data[cmDataKey] = string(dataJSON) + + _, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(cm) + if err != nil { + if apierrs.IsAlreadyExists(err) { + glog.V(4).Infof("k8s-cm-cache: configmap already exists") + return nil + } + return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier) + } + glog.V(4).Infof("k8s-cm-cache: configmap %s successfully created\n", identifier) return nil } diff --git a/rbd/main.go b/rbd/main.go index 2031245e0..86346ed12 100644 --- a/rbd/main.go +++ b/rbd/main.go @@ -56,7 +56,7 @@ func main() { os.Exit(1) } - driver := rbd.GetRBDDriver() + driver := rbd.GetDriver() driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp) os.Exit(0)