Add method comments

Signed-off-by: Madhu Rajanna <mrajanna@redhat.com>
This commit is contained in:
Madhu Rajanna 2019-01-28 17:17:06 +05:30
parent 6929db10e5
commit 25642fe404
12 changed files with 95 additions and 29 deletions

View File

@ -56,7 +56,7 @@ func main() {
os.Exit(1)
}
driver := rbd.GetDriver()
driver := rbd.NewDriver()
driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp)
os.Exit(0)

View File

@ -28,6 +28,8 @@ import (
"github.com/ceph/ceph-csi/pkg/util"
)
// ControllerServer struct of CEPH CSI driver with supported methods of CSI
// controller server spec.
type ControllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
@ -38,6 +40,7 @@ type controllerCacheEntry struct {
VolumeID volumeID
}
// CreateVolume creates the volume in backend and store the volume metadata
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil {
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
@ -102,6 +105,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}, nil
}
// DeleteVolume deletes the volume in backend and removes the volume metadata
// from store
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateDeleteVolumeRequest(); err != nil {
glog.Errorf("DeleteVolumeRequest validation failed: %v", err)
@ -159,6 +164,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}
// ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported.
func (cs *ControllerServer) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {

View File

@ -26,10 +26,13 @@ import (
)
const (
// PluginFolder defines the location of ceph plugin
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
Version = "1.0.0"
// version of ceph driver
version = "1.0.0"
)
// Driver contains the default identity,node and controller struct
type Driver struct {
cd *csicommon.CSIDriver
@ -39,19 +42,23 @@ type Driver struct {
}
var (
// DefaultVolumeMounter for mounting volumes
DefaultVolumeMounter string
)
// NewDriver returns new ceph driver
func NewDriver() *Driver {
return &Driver{}
}
// NewIdentityServer initialize a identity server for ceph CSI driver
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
return &IdentityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
// NewControllerServer initialize a controller server for ceph CSI driver
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
@ -59,14 +66,17 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
}
}
// NewNodeServer initialize a node server for ceph CSI driver.
func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
}
}
// Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
glog.Infof("Driver: %v version: %v", driverName, Version)
glog.Infof("Driver: %v version: %v", driverName, version)
// Configuration
@ -91,7 +101,7 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
// Initialize default library driver
fs.cd = csicommon.NewCSIDriver(driverName, Version, nodeID)
fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
if fs.cd == nil {
glog.Fatalln("Failed to initialize CSI driver")
}

View File

@ -23,10 +23,13 @@ import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
// IdentityServer struct of ceph CSI driver with supported methods of CSI
// identity server spec.
type IdentityServer struct {
*csicommon.DefaultIdentityServer
}
// GetPluginCapabilities returns available capabilities of the ceph driver
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{

View File

@ -29,6 +29,8 @@ import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
// NodeServer struct of ceph CSI driver with supported methods of CSI
// node server spec.
type NodeServer struct {
*csicommon.DefaultNodeServer
}
@ -77,6 +79,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
return userCr, nil
}
// NodeStageVolume mounts the volume to a staging path on the node.
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if err := validateNodeStageVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -149,6 +152,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
// NodePublishVolume mounts the volume mounted to the staging path to the target
// path
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if err := validateNodePublishVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -190,6 +195,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmounts the volume from the target path
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := validateNodeUnpublishVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -209,6 +215,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeUnstageVolume unstages the volume from the staging path
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if err := validateNodeUnstageVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -228,6 +235,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodeGetCapabilities returns the supported capabilities of the node server
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
@ -241,7 +249,3 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
},
}, nil
}
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return ns.DefaultNodeServer.NodeGetInfo(ctx, req)
}

View File

@ -39,6 +39,8 @@ const (
oneGB = 1073741824
)
// ControllerServer struct of rbd CSI driver with supported methods of CSI
// controller server spec.
type ControllerServer struct {
*csicommon.DefaultControllerServer
MetadataStore util.CachePersister
@ -49,6 +51,8 @@ var (
rbdSnapshots = map[string]*rbdSnapshot{}
)
// LoadExDataFromMetadataStore loads the rbd volume and snapshot
// info from metadata store
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
vol := &rbdVolume{}
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
@ -80,6 +84,7 @@ func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) erro
return nil
}
// CreateVolume creates the volume in backend and store the volume metadata
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateVolumeReq(req); err != nil {
@ -193,6 +198,8 @@ func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *
return nil
}
// DeleteVolume deletes the volume in backend and removes the volume metadata
// from store
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.Warningf("invalid delete volume req: %v", req)
@ -227,6 +234,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}
// ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported.
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
@ -240,14 +249,18 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}, nil
}
// ControllerUnpublishVolume returns success response
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
// ControllerPublishVolume returns success response
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return &csi.ControllerPublishVolumeResponse{}, nil
}
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
glog.Warningf("invalid create snapshot req: %v", req)
@ -371,6 +384,8 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}, nil
}
// DeleteSnapshot deletes the snapshot in backend and removes the
//snapshot metadata from store
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
glog.Warningf("invalid delete snapshot req: %v", req)
@ -410,6 +425,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return &csi.DeleteSnapshotResponse{}, nil
}
// ListSnapshots lists the snapshots in the store
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil {
glog.Warningf("invalid list snapshot req: %v", req)

View File

@ -23,10 +23,13 @@ import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
// IdentityServer struct of rbd CSI driver with supported methods of CSI
// identity server spec.
type IdentityServer struct {
*csicommon.DefaultIdentityServer
}
// GetPluginCapabilities returns available capabilities of the rbd driver
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{

View File

@ -35,11 +35,28 @@ import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
// NodeServer struct of ceph rbd driver with supported methods of CSI
// node server spec
type NodeServer struct {
*csicommon.DefaultNodeServer
mounter mount.Interface
}
//TODO remove both stage and unstage methods
//once https://github.com/kubernetes-csi/drivers/pull/145 is merged
// NodeStageVolume returns unimplemented response
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodeUnstageVolume returns unimplemented response
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodePublishVolume mounts the volume mounted to the device path to the target
// path
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
targetPathMutex.LockKey(targetPath)
@ -132,6 +149,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmounts the volume from the target path
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
targetPathMutex.LockKey(targetPath)
@ -202,26 +220,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *NodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (
*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (ns *NodeServer) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest) (
*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return ns.DefaultNodeServer.NodeGetInfo(ctx, req)
}
func resolveBindMountedBlockDevice(mountPath string) (string, error) {
cmd := exec.Command("findmnt", "-n", "-o", "SOURCE", "--first-only", "--target", mountPath)
out, err := cmd.CombinedOutput()

View File

@ -35,6 +35,7 @@ const (
rbdDefaultUserID = rbdDefaultAdminID
)
// Driver contains the default identity,node and controller struct
type Driver struct {
cd *csicommon.CSIDriver
@ -47,16 +48,19 @@ var (
version = "1.0.0"
)
func GetDriver() *Driver {
// NewDriver returns new rbd driver
func NewDriver() *Driver {
return &Driver{}
}
// NewIdentityServer initialize a identity server for rbd CSI driver
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
return &IdentityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
// NewControllerServer initialize a controller server for rbd CSI driver
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
@ -64,6 +68,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
}
}
// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) {
mounter := mount.New("")
if containerized {
@ -79,6 +84,8 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err
}, nil
}
// Run start a non-blocking grpc controller,node and identityserver for
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) {
var err error
glog.Infof("Driver: %v version: %v", driverName, version)

View File

@ -23,11 +23,14 @@ import (
)
const (
//PluginFolder defines location of plugins
PluginFolder = "/var/lib/kubelet/plugins"
)
// ForAllFunc stores metdata with identifier
type ForAllFunc func(identifier string) error
// CachePersister interface implemented for store
type CachePersister interface {
Create(identifier string, data interface{}) error
Get(identifier string, data interface{}) error
@ -35,6 +38,7 @@ type CachePersister interface {
Delete(identifier string) error
}
// NewCachePersister returns CachePersister based on store
func NewCachePersister(metadataStore, driverName string) (CachePersister, error) {
if metadataStore == "k8s_configmap" {
glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")

View File

@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
)
// K8sCMCache to store metadata
type K8sCMCache struct {
Client *k8s.Clientset
Namespace string
@ -47,6 +48,8 @@ const (
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
)
// GetK8sNamespace returns pod namespace. if pod namespace is empty
// it returns default namespace
func GetK8sNamespace() string {
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
@ -55,6 +58,7 @@ func GetK8sNamespace() string {
return namespace
}
// NewK8sClient create kubernetes client
func NewK8sClient() *k8s.Clientset {
var cfg *rest.Config
var err error
@ -88,6 +92,7 @@ func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error)
return cm, nil
}
//ForAll list the metadata in configmaps and filters outs based on the pattern
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(listOpts)
@ -114,6 +119,7 @@ func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFun
return nil
}
// Create stores the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if cm != nil && err == nil {
@ -149,6 +155,7 @@ func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
return nil
}
// Get retrieves the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
cm, err := k8scm.getMetadataCM(identifier)
if err != nil {
@ -161,6 +168,7 @@ func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
return nil
}
// Delete deletes the metadata in configmaps with identifier name
func (k8scm *K8sCMCache) Delete(identifier string) error {
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(identifier, nil)
if err != nil {

View File

@ -29,12 +29,14 @@ import (
"github.com/pkg/errors"
)
// NodeCache to store metadata
type NodeCache struct {
BasePath string
}
var cacheDir = "controller"
// EnsureCacheDirectory creates cache directory if not present
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
fullPath := path.Join(nc.BasePath, cacheDir)
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
@ -45,6 +47,7 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
return nil
}
//ForAll list the metadata in Nodecache and filters outs based on the pattern
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
err := nc.EnsureCacheDirectory(cacheDir)
if err != nil {
@ -80,6 +83,7 @@ func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) e
return nil
}
// Create creates the metadata file in cache directory with identifier name
func (nc *NodeCache) Create(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
fp, err := os.Create(file)
@ -95,6 +99,7 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error {
return nil
}
// Get retrieves the metadata from cache directory with identifier name
func (nc *NodeCache) Get(identifier string, data interface{}) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
fp, err := os.Open(file)
@ -111,6 +116,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error {
return nil
}
// Delete deletes the metadata file from cache directory with identifier name
func (nc *NodeCache) Delete(identifier string) error {
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
err := os.Remove(file)