mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-03-21 06:39:28 +00:00
Merge pull request #152 from Madhu-1/gometlinter
Add gometalinter static check
This commit is contained in:
commit
832e0bb591
@ -7,15 +7,17 @@ branches:
|
|||||||
- master
|
- master
|
||||||
- csi-v1.0
|
- csi-v1.0
|
||||||
|
|
||||||
go: 1.10.x
|
go: 1.11.x
|
||||||
|
|
||||||
|
install:
|
||||||
|
- curl -L https://git.io/vp6lP | sh
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
- GO_FILES=$(find . -iname '*.go' -type f | grep -v /vendor/)
|
- GO_FILES=$(find . -iname '*.go' -type f | grep -v /vendor/)
|
||||||
- go get -u golang.org/x/lint/golint #go get github.com/golang/lint/golint
|
|
||||||
|
|
||||||
script:
|
script:
|
||||||
|
- gometalinter --deadline=10m -j 4 --enable=megacheck --enable=misspell --vendor ./...
|
||||||
- test -z $(gofmt -s -l $GO_FILES)
|
- test -z $(gofmt -s -l $GO_FILES)
|
||||||
- go vet -v $(go list ./... | grep -v /vendor/)
|
|
||||||
- make rbdplugin
|
- make rbdplugin
|
||||||
- make cephfsplugin
|
- make cephfsplugin
|
||||||
|
|
||||||
|
@ -27,7 +27,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.Set("logtostderr", "true")
|
if err := flag.Set("logtostderr", "true"); err != nil {
|
||||||
|
glog.Errorf("failed to set logtostderr flag: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -27,7 +27,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.Set("logtostderr", "true")
|
if err := flag.Set("logtostderr", "true"); err != nil {
|
||||||
|
glog.Errorf("failed to set logtostderr flag: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -56,7 +59,7 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
driver := rbd.GetDriver()
|
driver := rbd.NewDriver()
|
||||||
driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp)
|
driver.Run(*driverName, *nodeID, *endpoint, *containerized, cp)
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
@ -64,10 +67,11 @@ func main() {
|
|||||||
|
|
||||||
func createPersistentStorage(persistentStoragePath string) error {
|
func createPersistentStorage(persistentStoragePath string) error {
|
||||||
if _, err := os.Stat(persistentStoragePath); os.IsNotExist(err) {
|
if _, err := os.Stat(persistentStoragePath); os.IsNotExist(err) {
|
||||||
if err := os.MkdirAll(persistentStoragePath, os.FileMode(0755)); err != nil {
|
if err = os.MkdirAll(persistentStoragePath, os.FileMode(0755)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"text/template"
|
"text/template"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const cephConfig = `[global]
|
const cephConfig = `[global]
|
||||||
@ -37,13 +39,13 @@ const cephKeyring = `[client.{{.UserID}}]
|
|||||||
key = {{.Key}}
|
key = {{.Key}}
|
||||||
`
|
`
|
||||||
|
|
||||||
const cephSecret = `{{.Key}}`
|
const cephSecret = `{{.Key}}` // #nosec
|
||||||
|
|
||||||
const (
|
const (
|
||||||
cephConfigRoot = "/etc/ceph"
|
cephConfigRoot = "/etc/ceph"
|
||||||
cephConfigFileNameFmt = "ceph.share.%s.conf"
|
cephConfigFileNameFmt = "ceph.share.%s.conf"
|
||||||
cephKeyringFileNameFmt = "ceph.share.%s.client.%s.keyring"
|
cephKeyringFileNameFmt = "ceph.share.%s.client.%s.keyring"
|
||||||
cephSecretFileNameFmt = "ceph.share.%s.client.%s.secret"
|
cephSecretFileNameFmt = "ceph.share.%s.client.%s.secret" // #nosec
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -74,6 +76,7 @@ type cephConfigData struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error {
|
func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error {
|
||||||
|
// #nosec
|
||||||
if err := os.MkdirAll(cephConfigRoot, 0755); err != nil {
|
if err := os.MkdirAll(cephConfigRoot, 0755); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -86,7 +89,11 @@ func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, dat
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer f.Close()
|
defer func() {
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
glog.Errorf("failed to close file %s with error %s", f.Name(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return t.Execute(f, data)
|
return t.Execute(f, data)
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -111,12 +113,20 @@ func deleteCephUser(adminCr *credentials, volID volumeID) error {
|
|||||||
"auth", "rm", cephEntityClientPrefix + userID,
|
"auth", "rm", cephEntityClientPrefix + userID,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := execCommandAndValidate("ceph", args[:]...); err != nil {
|
var err error
|
||||||
|
if err = execCommandAndValidate("ceph", args[:]...); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
os.Remove(getCephKeyringPath(volID, userID))
|
keyringPath := getCephKeyringPath(volID, userID)
|
||||||
os.Remove(getCephSecretPath(volID, userID))
|
if err = os.Remove(keyringPath); err != nil {
|
||||||
|
glog.Errorf("failed to remove keyring file %s with error %s", keyringPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
secretPath := getCephSecretPath(volID, userID)
|
||||||
|
if err = os.Remove(secretPath); err != nil {
|
||||||
|
glog.Errorf("failed to remove secret file %s with error %s", secretPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,8 @@ import (
|
|||||||
"github.com/ceph/ceph-csi/pkg/util"
|
"github.com/ceph/ceph-csi/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ControllerServer struct of CEPH CSI driver with supported methods of CSI
|
||||||
|
// controller server spec.
|
||||||
type ControllerServer struct {
|
type ControllerServer struct {
|
||||||
*csicommon.DefaultControllerServer
|
*csicommon.DefaultControllerServer
|
||||||
MetadataStore util.CachePersister
|
MetadataStore util.CachePersister
|
||||||
@ -38,6 +40,7 @@ type controllerCacheEntry struct {
|
|||||||
VolumeID volumeID
|
VolumeID volumeID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateVolume creates the volume in backend and store the volume metadata
|
||||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
||||||
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
|
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
|
||||||
@ -102,6 +105,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteVolume deletes the volume in backend and removes the volume metadata
|
||||||
|
// from store
|
||||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
if err := cs.validateDeleteVolumeRequest(); err != nil {
|
||||||
glog.Errorf("DeleteVolumeRequest validation failed: %v", err)
|
glog.Errorf("DeleteVolumeRequest validation failed: %v", err)
|
||||||
@ -127,7 +132,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
// mons may have changed since create volume,
|
// mons may have changed since create volume,
|
||||||
// retrieve the latest mons and override old mons
|
// retrieve the latest mons and override old mons
|
||||||
secret := req.GetSecrets()
|
secret := req.GetSecrets()
|
||||||
if mon, err := getMonValFromSecret(secret); err == nil && len(mon) > 0 {
|
mon := ""
|
||||||
|
if mon, err = getMonValFromSecret(secret); err == nil && len(mon) > 0 {
|
||||||
glog.Infof("override old mons [%q] with [%q]", ce.VolOptions.Monitors, mon)
|
glog.Infof("override old mons [%q] with [%q]", ce.VolOptions.Monitors, mon)
|
||||||
ce.VolOptions.Monitors = mon
|
ce.VolOptions.Monitors = mon
|
||||||
}
|
}
|
||||||
@ -159,6 +165,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
return &csi.DeleteVolumeResponse{}, nil
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateVolumeCapabilities checks whether the volume capabilities requested
|
||||||
|
// are supported.
|
||||||
func (cs *ControllerServer) ValidateVolumeCapabilities(
|
func (cs *ControllerServer) ValidateVolumeCapabilities(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
||||||
|
@ -26,10 +26,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// PluginFolder defines the location of ceph plugin
|
||||||
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
|
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
|
||||||
Version = "1.0.0"
|
// version of ceph driver
|
||||||
|
version = "1.0.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Driver contains the default identity,node and controller struct
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
cd *csicommon.CSIDriver
|
cd *csicommon.CSIDriver
|
||||||
|
|
||||||
@ -39,19 +42,23 @@ type Driver struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// DefaultVolumeMounter for mounting volumes
|
||||||
DefaultVolumeMounter string
|
DefaultVolumeMounter string
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewDriver returns new ceph driver
|
||||||
func NewDriver() *Driver {
|
func NewDriver() *Driver {
|
||||||
return &Driver{}
|
return &Driver{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewIdentityServer initialize a identity server for ceph CSI driver
|
||||||
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
||||||
return &IdentityServer{
|
return &IdentityServer{
|
||||||
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
|
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewControllerServer initialize a controller server for ceph CSI driver
|
||||||
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
||||||
return &ControllerServer{
|
return &ControllerServer{
|
||||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||||
@ -59,14 +66,17 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewNodeServer initialize a node server for ceph CSI driver.
|
||||||
func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
|
func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
|
||||||
return &NodeServer{
|
return &NodeServer{
|
||||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
|
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run start a non-blocking grpc controller,node and identityserver for
|
||||||
|
// ceph CSI driver which can serve multiple parallel requests
|
||||||
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
|
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
|
||||||
glog.Infof("Driver: %v version: %v", driverName, Version)
|
glog.Infof("Driver: %v version: %v", driverName, version)
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
|
|
||||||
@ -91,7 +101,7 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
|
|||||||
|
|
||||||
// Initialize default library driver
|
// Initialize default library driver
|
||||||
|
|
||||||
fs.cd = csicommon.NewCSIDriver(driverName, Version, nodeID)
|
fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
|
||||||
if fs.cd == nil {
|
if fs.cd == nil {
|
||||||
glog.Fatalln("Failed to initialize CSI driver")
|
glog.Fatalln("Failed to initialize CSI driver")
|
||||||
}
|
}
|
||||||
|
@ -23,10 +23,13 @@ import (
|
|||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IdentityServer struct of ceph CSI driver with supported methods of CSI
|
||||||
|
// identity server spec.
|
||||||
type IdentityServer struct {
|
type IdentityServer struct {
|
||||||
*csicommon.DefaultIdentityServer
|
*csicommon.DefaultIdentityServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPluginCapabilities returns available capabilities of the ceph driver
|
||||||
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
||||||
return &csi.GetPluginCapabilitiesResponse{
|
return &csi.GetPluginCapabilitiesResponse{
|
||||||
Capabilities: []*csi.PluginCapability{
|
Capabilities: []*csi.PluginCapability{
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NodeServer struct of ceph CSI driver with supported methods of CSI
|
||||||
|
// node server spec.
|
||||||
type NodeServer struct {
|
type NodeServer struct {
|
||||||
*csicommon.DefaultNodeServer
|
*csicommon.DefaultNodeServer
|
||||||
}
|
}
|
||||||
@ -77,6 +79,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
|
|||||||
return userCr, nil
|
return userCr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||||
if err := validateNodeStageVolumeRequest(req); err != nil {
|
if err := validateNodeStageVolumeRequest(req); err != nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
@ -125,23 +128,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||||||
}
|
}
|
||||||
|
|
||||||
// It's not, mount now
|
// It's not, mount now
|
||||||
|
if err = ns.mount(volOptions, req); err != nil {
|
||||||
cr, err := getCredentialsForVolume(volOptions, volID, req)
|
return nil, err
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
m, err := newMounter(volOptions)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("failed to create mounter for volume %s: %v", volID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infof("cephfs: mounting volume %s with %s", volID, m.name())
|
|
||||||
|
|
||||||
if err = m.mount(stagingTargetPath, cr, volOptions, volID); err != nil {
|
|
||||||
glog.Errorf("failed to mount volume %s: %v", volID, err)
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath)
|
glog.Infof("cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath)
|
||||||
@ -149,6 +137,33 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||||||
return &csi.NodeStageVolumeResponse{}, nil
|
return &csi.NodeStageVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
|
||||||
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
volID := volumeID(req.GetVolumeId())
|
||||||
|
|
||||||
|
cr, err := getCredentialsForVolume(volOptions, volID, req)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := newMounter(volOptions)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("failed to create mounter for volume %s: %v", volID, err)
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("cephfs: mounting volume %s with %s", volID, m.name())
|
||||||
|
|
||||||
|
if err = m.mount(stagingTargetPath, cr, volOptions, volID); err != nil {
|
||||||
|
glog.Errorf("failed to mount volume %s: %v", volID, err)
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodePublishVolume mounts the volume mounted to the staging path to the target
|
||||||
|
// path
|
||||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||||
if err := validateNodePublishVolumeRequest(req); err != nil {
|
if err := validateNodePublishVolumeRequest(req); err != nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
@ -190,44 +205,53 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
return &csi.NodePublishVolumeResponse{}, nil
|
return &csi.NodePublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeUnpublishVolume unmounts the volume from the target path
|
||||||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
if err := validateNodeUnpublishVolumeRequest(req); err != nil {
|
var err error
|
||||||
|
if err = validateNodeUnpublishVolumeRequest(req); err != nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPath := req.GetTargetPath()
|
targetPath := req.GetTargetPath()
|
||||||
|
|
||||||
// Unmount the bind-mount
|
// Unmount the bind-mount
|
||||||
if err := unmountVolume(targetPath); err != nil {
|
if err = unmountVolume(targetPath); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
os.Remove(targetPath)
|
if err = os.Remove(targetPath); err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
glog.Infof("cephfs: successfully unbinded volume %s from %s", req.GetVolumeId(), targetPath)
|
glog.Infof("cephfs: successfully unbinded volume %s from %s", req.GetVolumeId(), targetPath)
|
||||||
|
|
||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeUnstageVolume unstages the volume from the staging path
|
||||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||||
if err := validateNodeUnstageVolumeRequest(req); err != nil {
|
var err error
|
||||||
|
if err = validateNodeUnstageVolumeRequest(req); err != nil {
|
||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
stagingTargetPath := req.GetStagingTargetPath()
|
stagingTargetPath := req.GetStagingTargetPath()
|
||||||
|
|
||||||
// Unmount the volume
|
// Unmount the volume
|
||||||
if err := unmountVolume(stagingTargetPath); err != nil {
|
if err = unmountVolume(stagingTargetPath); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
os.Remove(stagingTargetPath)
|
if err = os.Remove(stagingTargetPath); err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
glog.Infof("cephfs: successfully umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
|
glog.Infof("cephfs: successfully umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
|
||||||
|
|
||||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeGetCapabilities returns the supported capabilities of the node server
|
||||||
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||||
return &csi.NodeGetCapabilitiesResponse{
|
return &csi.NodeGetCapabilitiesResponse{
|
||||||
Capabilities: []*csi.NodeServiceCapability{
|
Capabilities: []*csi.NodeServiceCapability{
|
||||||
@ -241,7 +265,3 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
|
|||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
|
|
||||||
return ns.DefaultNodeServer.NodeGetInfo(ctx, req)
|
|
||||||
}
|
|
||||||
|
@ -40,7 +40,7 @@ func makeVolumeID(volName string) volumeID {
|
|||||||
func execCommand(command string, args ...string) ([]byte, error) {
|
func execCommand(command string, args ...string) ([]byte, error) {
|
||||||
glog.V(4).Infof("cephfs: EXEC %s %s", command, args)
|
glog.V(4).Infof("cephfs: EXEC %s %s", command, args)
|
||||||
|
|
||||||
cmd := exec.Command(command, args...)
|
cmd := exec.Command(command, args...) // #nosec
|
||||||
return cmd.CombinedOutput()
|
return cmd.CombinedOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -70,8 +72,7 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeI
|
|||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
unmountVolume(cephRoot)
|
umountAndRemove(cephRoot)
|
||||||
os.Remove(cephRoot)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
volOptions.RootPath = getVolumeRootPathCeph(volID)
|
||||||
@ -123,8 +124,7 @@ func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions
|
|||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
unmountVolume(volRoot)
|
umountAndRemove(volRoot)
|
||||||
os.Remove(volRoot)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := os.Rename(volRoot, volRootDeleting); err != nil {
|
if err := os.Rename(volRoot, volRootDeleting); err != nil {
|
||||||
@ -137,3 +137,14 @@ func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func umountAndRemove(mountPoint string) {
|
||||||
|
var err error
|
||||||
|
if err = unmountVolume(mountPoint); err != nil {
|
||||||
|
glog.Errorf("failed to unmount %s with error %s", mountPoint, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = os.Remove(mountPoint); err != nil {
|
||||||
|
glog.Errorf("failed to remove %s with error %s", mountPoint, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -36,7 +36,9 @@ var (
|
|||||||
// Load available ceph mounters installed on system into availableMounters
|
// Load available ceph mounters installed on system into availableMounters
|
||||||
// Called from driver.go's Run()
|
// Called from driver.go's Run()
|
||||||
func loadAvailableMounters() error {
|
func loadAvailableMounters() error {
|
||||||
|
// #nosec
|
||||||
fuseMounterProbe := exec.Command("ceph-fuse", "--version")
|
fuseMounterProbe := exec.Command("ceph-fuse", "--version")
|
||||||
|
// #nosec
|
||||||
kernelMounterProbe := exec.Command("mount.ceph")
|
kernelMounterProbe := exec.Command("mount.ceph")
|
||||||
|
|
||||||
if fuseMounterProbe.Run() == nil {
|
if fuseMounterProbe.Run() == nil {
|
||||||
|
@ -96,13 +96,13 @@ func validateMounter(m string) error {
|
|||||||
func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, error) {
|
func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, error) {
|
||||||
var (
|
var (
|
||||||
opts volumeOptions
|
opts volumeOptions
|
||||||
provisionVolumeBool string
|
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
// extract mon from secret first
|
// extract mon from secret first
|
||||||
if err = extractOption(&opts.MonValueFromSecret, "monValueFromSecret", volOptions); err == nil {
|
if err = extractOption(&opts.MonValueFromSecret, "monValueFromSecret", volOptions); err == nil {
|
||||||
if mon, err := getMonValFromSecret(secret); err == nil && len(mon) > 0 {
|
mon := ""
|
||||||
|
if mon, err = getMonValFromSecret(secret); err == nil && len(mon) > 0 {
|
||||||
opts.Monitors = mon
|
opts.Monitors = mon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,30 +112,44 @@ func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, err
|
|||||||
return nil, fmt.Errorf("either monitors or monValueFromSecret should be set")
|
return nil, fmt.Errorf("either monitors or monValueFromSecret should be set")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = extractOption(&provisionVolumeBool, "provisionVolume", volOptions); err != nil {
|
|
||||||
|
if err = extractNewVolOpt(&opts, volOptions); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to parse provisionVolume: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.ProvisionVolume {
|
|
||||||
if err = extractOption(&opts.Pool, "pool", volOptions); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err = extractOption(&opts.RootPath, "rootPath", volOptions); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This field is optional, don't check for its presence
|
|
||||||
extractOption(&opts.Mounter, "mounter", volOptions)
|
|
||||||
|
|
||||||
if err = opts.validate(); err != nil {
|
if err = opts.validate(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &opts, nil
|
return &opts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractNewVolOpt(opts *volumeOptions, volOpt map[string]string) error {
|
||||||
|
var (
|
||||||
|
provisionVolumeBool string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if err = extractOption(&provisionVolumeBool, "provisionVolume", volOpt); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
|
||||||
|
return fmt.Errorf("Failed to parse provisionVolume: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.ProvisionVolume {
|
||||||
|
if err = extractOption(&opts.Pool, "pool", volOpt); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = extractOption(&opts.RootPath, "rootPath", volOpt); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This field is optional, don't check for its presence
|
||||||
|
// nolint
|
||||||
|
// (skip errcheck and gosec as this is optional)
|
||||||
|
extractOption(&opts.Mounter, "mounter", volOpt)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -39,6 +39,8 @@ const (
|
|||||||
oneGB = 1073741824
|
oneGB = 1073741824
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ControllerServer struct of rbd CSI driver with supported methods of CSI
|
||||||
|
// controller server spec.
|
||||||
type ControllerServer struct {
|
type ControllerServer struct {
|
||||||
*csicommon.DefaultControllerServer
|
*csicommon.DefaultControllerServer
|
||||||
MetadataStore util.CachePersister
|
MetadataStore util.CachePersister
|
||||||
@ -49,18 +51,23 @@ var (
|
|||||||
rbdSnapshots = map[string]*rbdSnapshot{}
|
rbdSnapshots = map[string]*rbdSnapshot{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LoadExDataFromMetadataStore loads the rbd volume and snapshot
|
||||||
|
// info from metadata store
|
||||||
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
|
func (cs *ControllerServer) LoadExDataFromMetadataStore() error {
|
||||||
vol := &rbdVolume{}
|
vol := &rbdVolume{}
|
||||||
|
// nolint
|
||||||
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
|
cs.MetadataStore.ForAll("csi-rbd-vol-", vol, func(identifier string) error {
|
||||||
rbdVolumes[identifier] = vol
|
rbdVolumes[identifier] = vol
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
snap := &rbdSnapshot{}
|
snap := &rbdSnapshot{}
|
||||||
|
// nolint
|
||||||
cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
|
cs.MetadataStore.ForAll("csi-rbd-(.*)-snap-", snap, func(identifier string) error {
|
||||||
rbdSnapshots[identifier] = snap
|
rbdSnapshots[identifier] = snap
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
glog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
|
glog.Infof("Loaded %d volumes and %d snapshots from metadata store", len(rbdVolumes), len(rbdSnapshots))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -80,34 +87,7 @@ func (cs *ControllerServer) validateVolumeReq(req *csi.CreateVolumeRequest) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
|
||||||
|
|
||||||
if err := cs.validateVolumeReq(req); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
volumeNameMutex.LockKey(req.GetName())
|
|
||||||
defer volumeNameMutex.UnlockKey(req.GetName())
|
|
||||||
|
|
||||||
// Need to check for already existing volume name, and if found
|
|
||||||
// check for the requested capacity and already allocated capacity
|
|
||||||
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
|
||||||
// Since err is nil, it means the volume with the same name already exists
|
|
||||||
// need to check if the size of exisiting volume is the same as in new
|
|
||||||
// request
|
|
||||||
if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() {
|
|
||||||
// exisiting volume is compatible with new request and should be reused.
|
|
||||||
// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
|
|
||||||
return &csi.CreateVolumeResponse{
|
|
||||||
Volume: &csi.Volume{
|
|
||||||
VolumeId: exVol.VolID,
|
|
||||||
CapacityBytes: exVol.VolSize,
|
|
||||||
VolumeContext: req.GetParameters(),
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName())
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO (sbezverk) Last check for not exceeding total storage capacity
|
// TODO (sbezverk) Last check for not exceeding total storage capacity
|
||||||
|
|
||||||
rbdVol, err := getRBDVolumeOptions(req.GetParameters())
|
rbdVol, err := getRBDVolumeOptions(req.GetParameters())
|
||||||
@ -130,27 +110,56 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
volSizeBytes = req.GetCapacityRange().GetRequiredBytes()
|
volSizeBytes = req.GetCapacityRange().GetRequiredBytes()
|
||||||
}
|
}
|
||||||
rbdVol.VolSize = volSizeBytes
|
rbdVol.VolSize = volSizeBytes
|
||||||
volSizeGB := int(volSizeBytes / 1024 / 1024 / 1024)
|
|
||||||
|
return rbdVol, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateVolume creates the volume in backend and store the volume metadata
|
||||||
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
|
|
||||||
|
if err := cs.validateVolumeReq(req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
volumeNameMutex.LockKey(req.GetName())
|
||||||
|
defer func() {
|
||||||
|
if err := volumeNameMutex.UnlockKey(req.GetName()); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex volume:%s %v", req.GetName(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Need to check for already existing volume name, and if found
|
||||||
|
// check for the requested capacity and already allocated capacity
|
||||||
|
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
||||||
|
// Since err is nil, it means the volume with the same name already exists
|
||||||
|
// need to check if the size of existing volume is the same as in new
|
||||||
|
// request
|
||||||
|
if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() {
|
||||||
|
// existing volume is compatible with new request and should be reused.
|
||||||
|
// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
|
||||||
|
return &csi.CreateVolumeResponse{
|
||||||
|
Volume: &csi.Volume{
|
||||||
|
VolumeId: exVol.VolID,
|
||||||
|
CapacityBytes: exVol.VolSize,
|
||||||
|
VolumeContext: req.GetParameters(),
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
|
rbdVol, err := parseVolCreateRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
volSizeGB := int(rbdVol.VolSize / 1024 / 1024 / 1024)
|
||||||
|
|
||||||
// Check if there is already RBD image with requested name
|
// Check if there is already RBD image with requested name
|
||||||
found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets())
|
err = cs.checkRBDStatus(rbdVol, req, volSizeGB)
|
||||||
if !found {
|
|
||||||
// if VolumeContentSource is not nil, this request is for snapshot
|
|
||||||
if req.VolumeContentSource != nil {
|
|
||||||
if err = cs.checkSnapshot(req, rbdVol); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = createRBDImage(rbdVol, volSizeGB, rbdVol.AdminID, req.GetSecrets())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("failed to create volume: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if createErr := cs.MetadataStore.Create(rbdVol.VolID, rbdVol); createErr != nil {
|
||||||
glog.V(4).Infof("create volume %s", volName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if createErr := cs.MetadataStore.Create(volumeID, rbdVol); createErr != nil {
|
|
||||||
glog.Warningf("failed to store volume metadata with error: %v", err)
|
glog.Warningf("failed to store volume metadata with error: %v", err)
|
||||||
if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
|
if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
|
||||||
glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
|
glog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
|
||||||
@ -159,16 +168,38 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
return nil, createErr
|
return nil, createErr
|
||||||
}
|
}
|
||||||
|
|
||||||
rbdVolumes[volumeID] = rbdVol
|
rbdVolumes[rbdVol.VolID] = rbdVol
|
||||||
return &csi.CreateVolumeResponse{
|
return &csi.CreateVolumeResponse{
|
||||||
Volume: &csi.Volume{
|
Volume: &csi.Volume{
|
||||||
VolumeId: volumeID,
|
VolumeId: rbdVol.VolID,
|
||||||
CapacityBytes: volSizeBytes,
|
CapacityBytes: rbdVol.VolSize,
|
||||||
VolumeContext: req.GetParameters(),
|
VolumeContext: req.GetParameters(),
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) checkRBDStatus(rbdVol *rbdVolume, req *csi.CreateVolumeRequest, volSizeGB int) error {
|
||||||
|
var err error
|
||||||
|
// Check if there is already RBD image with requested name
|
||||||
|
found, _, _ := rbdStatus(rbdVol, rbdVol.UserID, req.GetSecrets()) // #nosec
|
||||||
|
if !found {
|
||||||
|
// if VolumeContentSource is not nil, this request is for snapshot
|
||||||
|
if req.VolumeContentSource != nil {
|
||||||
|
if err = cs.checkSnapshot(req, rbdVol); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = createRBDImage(rbdVol, volSizeGB, rbdVol.AdminID, req.GetSecrets())
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("failed to create volume: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("create volume %s", rbdVol.VolName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error {
|
func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error {
|
||||||
snapshot := req.VolumeContentSource.GetSnapshot()
|
snapshot := req.VolumeContentSource.GetSnapshot()
|
||||||
if snapshot == nil {
|
if snapshot == nil {
|
||||||
@ -193,6 +224,8 @@ func (cs *ControllerServer) checkSnapshot(req *csi.CreateVolumeRequest, rbdVol *
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteVolume deletes the volume in backend and removes the volume metadata
|
||||||
|
// from store
|
||||||
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||||
glog.Warningf("invalid delete volume req: %v", req)
|
glog.Warningf("invalid delete volume req: %v", req)
|
||||||
@ -201,7 +234,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
// For now the image get unconditionally deleted, but here retention policy can be checked
|
// For now the image get unconditionally deleted, but here retention policy can be checked
|
||||||
volumeID := req.GetVolumeId()
|
volumeID := req.GetVolumeId()
|
||||||
volumeIDMutex.LockKey(volumeID)
|
volumeIDMutex.LockKey(volumeID)
|
||||||
defer volumeIDMutex.UnlockKey(volumeID)
|
|
||||||
|
defer func() {
|
||||||
|
if err := volumeIDMutex.UnlockKey(volumeID); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex volume:%s %v", volumeID, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
rbdVol := &rbdVolume{}
|
rbdVol := &rbdVolume{}
|
||||||
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
|
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
|
||||||
if os.IsNotExist(errors.Cause(err)) {
|
if os.IsNotExist(errors.Cause(err)) {
|
||||||
@ -227,6 +266,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
return &csi.DeleteVolumeResponse{}, nil
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateVolumeCapabilities checks whether the volume capabilities requested
|
||||||
|
// are supported.
|
||||||
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
||||||
for _, cap := range req.VolumeCapabilities {
|
for _, cap := range req.VolumeCapabilities {
|
||||||
if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
|
if cap.GetAccessMode().GetMode() != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
|
||||||
@ -240,30 +281,30 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ControllerUnpublishVolume returns success response
|
||||||
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
||||||
return &csi.ControllerUnpublishVolumeResponse{}, nil
|
return &csi.ControllerUnpublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ControllerPublishVolume returns success response
|
||||||
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
|
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
|
||||||
return &csi.ControllerPublishVolumeResponse{}, nil
|
return &csi.ControllerPublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateSnapshot creates the snapshot in backend and stores metadata
|
||||||
|
// in store
|
||||||
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
|
|
||||||
glog.Warningf("invalid create snapshot req: %v", req)
|
if err := cs.validateSnapshotReq(req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check sanity of request Snapshot Name, Source Volume Id
|
|
||||||
if len(req.Name) == 0 {
|
|
||||||
return nil, status.Error(codes.InvalidArgument, "Snapshot Name cannot be empty")
|
|
||||||
}
|
|
||||||
if len(req.SourceVolumeId) == 0 {
|
|
||||||
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshotNameMutex.LockKey(req.GetName())
|
snapshotNameMutex.LockKey(req.GetName())
|
||||||
defer snapshotNameMutex.UnlockKey(req.GetName())
|
|
||||||
|
defer func() {
|
||||||
|
if err := snapshotNameMutex.UnlockKey(req.GetName()); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex snapshot:%s %v", req.GetName(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Need to check for already existing snapshot name, and if found
|
// Need to check for already existing snapshot name, and if found
|
||||||
// check for the requested source volume id and already allocated source volume id
|
// check for the requested source volume id and already allocated source volume id
|
||||||
@ -307,54 +348,16 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
||||||
rbdSnap.SizeBytes = rbdVolume.VolSize
|
rbdSnap.SizeBytes = rbdVolume.VolSize
|
||||||
|
|
||||||
err = createSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets())
|
err = cs.doSnapshot(rbdSnap, req.GetSecrets())
|
||||||
// if we already have the snapshot, return the snapshot
|
// if we already have the snapshot, return the snapshot
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
||||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
|
||||||
if status.ExitStatus() == int(syscall.EEXIST) {
|
|
||||||
glog.Warningf("Snapshot with the same name: %s, we return this.", req.GetName())
|
|
||||||
} else {
|
|
||||||
glog.Warningf("failed to create snapshot: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
glog.Warningf("failed to create snapshot: %v", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Warningf("failed to create snapshot: %v", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("create snapshot %s", snapName)
|
|
||||||
err = protectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets())
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("snapshot is created but failed to protect and delete snapshot: %v", err)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("Snapshot is created but failed to protect snapshot")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds()
|
rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds()
|
||||||
|
|
||||||
if createErr := cs.MetadataStore.Create(snapshotID, rbdSnap); createErr != nil {
|
if err = cs.storeSnapMetadata(rbdSnap, req.GetSecrets()); err != nil {
|
||||||
|
return nil, err
|
||||||
glog.Warningf("rbd: failed to store snapInfo with error: %v", err)
|
|
||||||
// Unprotect snapshot
|
|
||||||
err = unprotectSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets())
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
|
||||||
}
|
|
||||||
// Deleting snapshot
|
|
||||||
glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
|
|
||||||
if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, req.GetSecrets()); err != nil {
|
|
||||||
return nil, status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
|
||||||
}
|
|
||||||
return nil, createErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rbdSnapshots[snapshotID] = rbdSnap
|
rbdSnapshots[snapshotID] = rbdSnap
|
||||||
@ -371,6 +374,77 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) storeSnapMetadata(rbdSnap *rbdSnapshot, secret map[string]string) error {
|
||||||
|
errCreate := cs.MetadataStore.Create(rbdSnap.SnapID, rbdSnap)
|
||||||
|
if errCreate != nil {
|
||||||
|
glog.Warningf("rbd: failed to store snapInfo with error: %v", errCreate)
|
||||||
|
// Unprotect snapshot
|
||||||
|
err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||||
|
if err != nil {
|
||||||
|
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||||
|
}
|
||||||
|
// Deleting snapshot
|
||||||
|
glog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
|
||||||
|
if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret); err != nil {
|
||||||
|
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errCreate
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error {
|
||||||
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
|
||||||
|
glog.Warningf("invalid create snapshot req: %v", req)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check sanity of request Snapshot Name, Source Volume Id
|
||||||
|
if len(req.Name) == 0 {
|
||||||
|
return status.Error(codes.InvalidArgument, "Snapshot Name cannot be empty")
|
||||||
|
}
|
||||||
|
if len(req.SourceVolumeId) == 0 {
|
||||||
|
return status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) doSnapshot(rbdSnap *rbdSnapshot, secret map[string]string) error {
|
||||||
|
err := createSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||||
|
// if we already have the snapshot, return the snapshot
|
||||||
|
if err != nil {
|
||||||
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||||
|
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||||
|
if status.ExitStatus() == int(syscall.EEXIST) {
|
||||||
|
glog.Warningf("Snapshot with the same name: %s, we return this.", rbdSnap.SnapName)
|
||||||
|
} else {
|
||||||
|
glog.Warningf("failed to create snapshot: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Warningf("failed to create snapshot: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Warningf("failed to create snapshot: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("create snapshot %s", rbdSnap.SnapName)
|
||||||
|
err = protectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("snapshot is created but failed to protect and delete snapshot: %v", err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Snapshot is created but failed to protect snapshot")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteSnapshot deletes the snapshot in backend and removes the
|
||||||
|
//snapshot metadata from store
|
||||||
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
|
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
|
||||||
glog.Warningf("invalid delete snapshot req: %v", req)
|
glog.Warningf("invalid delete snapshot req: %v", req)
|
||||||
@ -382,7 +456,12 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
||||||
}
|
}
|
||||||
snapshotIDMutex.LockKey(snapshotID)
|
snapshotIDMutex.LockKey(snapshotID)
|
||||||
defer snapshotIDMutex.UnlockKey(snapshotID)
|
|
||||||
|
defer func() {
|
||||||
|
if err := snapshotIDMutex.UnlockKey(snapshotID); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex snapshot:%s %v", snapshotID, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
rbdSnap := &rbdSnapshot{}
|
rbdSnap := &rbdSnapshot{}
|
||||||
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
|
if err := cs.MetadataStore.Get(snapshotID, rbdSnap); err != nil {
|
||||||
@ -410,6 +489,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
return &csi.DeleteSnapshotResponse{}, nil
|
return &csi.DeleteSnapshotResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListSnapshots lists the snapshots in the store
|
||||||
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
||||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil {
|
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS); err != nil {
|
||||||
glog.Warningf("invalid list snapshot req: %v", req)
|
glog.Warningf("invalid list snapshot req: %v", req)
|
||||||
|
@ -23,10 +23,13 @@ import (
|
|||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IdentityServer struct of rbd CSI driver with supported methods of CSI
|
||||||
|
// identity server spec.
|
||||||
type IdentityServer struct {
|
type IdentityServer struct {
|
||||||
*csicommon.DefaultIdentityServer
|
*csicommon.DefaultIdentityServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPluginCapabilities returns available capabilities of the rbd driver
|
||||||
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
||||||
return &csi.GetPluginCapabilitiesResponse{
|
return &csi.GetPluginCapabilitiesResponse{
|
||||||
Capabilities: []*csi.PluginCapability{
|
Capabilities: []*csi.PluginCapability{
|
||||||
|
@ -35,57 +35,48 @@ import (
|
|||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NodeServer struct of ceph rbd driver with supported methods of CSI
|
||||||
|
// node server spec
|
||||||
type NodeServer struct {
|
type NodeServer struct {
|
||||||
*csicommon.DefaultNodeServer
|
*csicommon.DefaultNodeServer
|
||||||
mounter mount.Interface
|
mounter mount.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO remove both stage and unstage methods
|
||||||
|
//once https://github.com/kubernetes-csi/drivers/pull/145 is merged
|
||||||
|
|
||||||
|
// NodeStageVolume returns unimplemented response
|
||||||
|
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeUnstageVolume returns unimplemented response
|
||||||
|
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodePublishVolume mounts the volume mounted to the device path to the target
|
||||||
|
// path
|
||||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||||
targetPath := req.GetTargetPath()
|
targetPath := req.GetTargetPath()
|
||||||
targetPathMutex.LockKey(targetPath)
|
targetPathMutex.LockKey(targetPath)
|
||||||
defer targetPathMutex.UnlockKey(targetPath)
|
|
||||||
|
|
||||||
var volName string
|
defer func() {
|
||||||
|
if err := targetPathMutex.UnlockKey(targetPath); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
volName, err := ns.getVolumeName(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||||
|
|
||||||
if isBlock {
|
|
||||||
// Get volName from targetPath
|
|
||||||
s := strings.Split(targetPath, "/")
|
|
||||||
volName = s[len(s)-1]
|
|
||||||
} else {
|
|
||||||
// Get volName from targetPath
|
|
||||||
if !strings.HasSuffix(targetPath, "/mount") {
|
|
||||||
return nil, fmt.Errorf("rbd: malformed the value of target path: %s", targetPath)
|
|
||||||
}
|
|
||||||
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
|
||||||
volName = s[len(s)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if that target path exists properly
|
// Check if that target path exists properly
|
||||||
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
|
notMnt, err := ns.createTargetPath(targetPath, isBlock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
return nil, err
|
||||||
if isBlock {
|
|
||||||
// create an empty file
|
|
||||||
targetPathFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0750)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(4).Infof("Failed to create targetPath:%s with error: %v", targetPath, err)
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
if err := targetPathFile.Close(); err != nil {
|
|
||||||
glog.V(4).Infof("Failed to close targetPath:%s with error: %v", targetPath, err)
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Create a directory
|
|
||||||
if err = os.MkdirAll(targetPath, 0750); err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
notMnt = true
|
|
||||||
} else {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !notMnt {
|
if !notMnt {
|
||||||
@ -103,11 +94,41 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
}
|
}
|
||||||
glog.V(4).Infof("rbd image: %s/%s was successfully mapped at %s\n", req.GetVolumeId(), volOptions.Pool, devicePath)
|
glog.V(4).Infof("rbd image: %s/%s was successfully mapped at %s\n", req.GetVolumeId(), volOptions.Pool, devicePath)
|
||||||
|
|
||||||
|
// Publish Path
|
||||||
|
err = ns.mountVolume(req, devicePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &csi.NodePublishVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *NodeServer) getVolumeName(req *csi.NodePublishVolumeRequest) (string, error) {
|
||||||
|
var volName string
|
||||||
|
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
if isBlock {
|
||||||
|
// Get volName from targetPath
|
||||||
|
s := strings.Split(targetPath, "/")
|
||||||
|
volName = s[len(s)-1]
|
||||||
|
} else {
|
||||||
|
// Get volName from targetPath
|
||||||
|
if !strings.HasSuffix(targetPath, "/mount") {
|
||||||
|
return "", fmt.Errorf("rbd: malformed the value of target path: %s", targetPath)
|
||||||
|
}
|
||||||
|
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
||||||
|
volName = s[len(s)-1]
|
||||||
|
}
|
||||||
|
return volName, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *NodeServer) mountVolume(req *csi.NodePublishVolumeRequest, devicePath string) error {
|
||||||
// Publish Path
|
// Publish Path
|
||||||
fsType := req.GetVolumeCapability().GetMount().GetFsType()
|
fsType := req.GetVolumeCapability().GetMount().GetFsType()
|
||||||
readOnly := req.GetReadonly()
|
readOnly := req.GetReadonly()
|
||||||
attrib := req.GetVolumeContext()
|
attrib := req.GetVolumeContext()
|
||||||
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
|
||||||
|
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||||
|
targetPath := req.GetTargetPath()
|
||||||
|
|
||||||
glog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
|
glog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
|
||||||
targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags)
|
targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags)
|
||||||
@ -116,7 +137,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
if isBlock {
|
if isBlock {
|
||||||
options := []string{"bind"}
|
options := []string{"bind"}
|
||||||
if err := diskMounter.Mount(devicePath, targetPath, fsType, options); err != nil {
|
if err := diskMounter.Mount(devicePath, targetPath, fsType, options); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
options := []string{}
|
options := []string{}
|
||||||
@ -125,17 +146,54 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
|
if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *NodeServer) createTargetPath(targetPath string, isBlock bool) (bool, error) {
|
||||||
|
// Check if that target path exists properly
|
||||||
|
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
if isBlock {
|
||||||
|
// create an empty file
|
||||||
|
// #nosec
|
||||||
|
targetPathFile, e := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0750)
|
||||||
|
if e != nil {
|
||||||
|
glog.V(4).Infof("Failed to create targetPath:%s with error: %v", targetPath, err)
|
||||||
|
return notMnt, status.Error(codes.Internal, e.Error())
|
||||||
|
}
|
||||||
|
if err = targetPathFile.Close(); err != nil {
|
||||||
|
glog.V(4).Infof("Failed to close targetPath:%s with error: %v", targetPath, err)
|
||||||
|
return notMnt, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Create a directory
|
||||||
|
if err = os.MkdirAll(targetPath, 0750); err != nil {
|
||||||
|
return notMnt, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
notMnt = true
|
||||||
|
} else {
|
||||||
|
return false, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notMnt, err
|
||||||
|
|
||||||
return &csi.NodePublishVolumeResponse{}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeUnpublishVolume unmounts the volume from the target path
|
||||||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
targetPath := req.GetTargetPath()
|
targetPath := req.GetTargetPath()
|
||||||
targetPathMutex.LockKey(targetPath)
|
targetPathMutex.LockKey(targetPath)
|
||||||
defer targetPathMutex.UnlockKey(targetPath)
|
|
||||||
|
defer func() {
|
||||||
|
if err := targetPathMutex.UnlockKey(targetPath); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
|
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -157,12 +215,20 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = ns.unmount(targetPath, devicePath, cnt); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *NodeServer) unmount(targetPath, devicePath string, cnt int) error {
|
||||||
|
var err error
|
||||||
// Bind mounted device needs to be resolved by using resolveBindMountedBlockDevice
|
// Bind mounted device needs to be resolved by using resolveBindMountedBlockDevice
|
||||||
if devicePath == "devtmpfs" {
|
if devicePath == "devtmpfs" {
|
||||||
var err error
|
|
||||||
devicePath, err = resolveBindMountedBlockDevice(targetPath)
|
devicePath, err = resolveBindMountedBlockDevice(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("NodeUnpublishVolume: devicePath: %s, (original)cnt: %d\n", devicePath, cnt)
|
glog.V(4).Infof("NodeUnpublishVolume: devicePath: %s, (original)cnt: %d\n", devicePath, cnt)
|
||||||
// cnt for GetDeviceNameFromMount is broken for bind mouted device,
|
// cnt for GetDeviceNameFromMount is broken for bind mouted device,
|
||||||
@ -178,51 +244,29 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||||||
err = ns.mounter.Unmount(targetPath)
|
err = ns.mounter.Unmount(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(3).Infof("failed to unmount targetPath: %s with error: %v", targetPath, err)
|
glog.V(3).Infof("failed to unmount targetPath: %s with error: %v", targetPath, err)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
cnt--
|
cnt--
|
||||||
if cnt != 0 {
|
if cnt != 0 {
|
||||||
// TODO should this be fixed not to success, so that driver can retry unmounting?
|
// TODO should this be fixed not to success, so that driver can retry unmounting?
|
||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmapping rbd device
|
// Unmapping rbd device
|
||||||
if err := detachRBDDevice(devicePath); err != nil {
|
if err = detachRBDDevice(devicePath); err != nil {
|
||||||
glog.V(3).Infof("failed to unmap rbd device: %s with error: %v", devicePath, err)
|
glog.V(3).Infof("failed to unmap rbd device: %s with error: %v", devicePath, err)
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove targetPath
|
// Remove targetPath
|
||||||
if err := os.RemoveAll(targetPath); err != nil {
|
if err = os.RemoveAll(targetPath); err != nil {
|
||||||
glog.V(3).Infof("failed to remove targetPath: %s with error: %v", targetPath, err)
|
glog.V(3).Infof("failed to remove targetPath: %s with error: %v", targetPath, err)
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *NodeServer) NodeStageVolume(
|
|
||||||
ctx context.Context,
|
|
||||||
req *csi.NodeStageVolumeRequest) (
|
|
||||||
*csi.NodeStageVolumeResponse, error) {
|
|
||||||
|
|
||||||
return nil, status.Error(codes.Unimplemented, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ns *NodeServer) NodeUnstageVolume(
|
|
||||||
ctx context.Context,
|
|
||||||
req *csi.NodeUnstageVolumeRequest) (
|
|
||||||
*csi.NodeUnstageVolumeResponse, error) {
|
|
||||||
|
|
||||||
return nil, status.Error(codes.Unimplemented, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
|
|
||||||
return ns.DefaultNodeServer.NodeGetInfo(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func resolveBindMountedBlockDevice(mountPath string) (string, error) {
|
func resolveBindMountedBlockDevice(mountPath string) (string, error) {
|
||||||
|
// #nosec
|
||||||
cmd := exec.Command("findmnt", "-n", "-o", "SOURCE", "--first-only", "--target", mountPath)
|
cmd := exec.Command("findmnt", "-n", "-o", "SOURCE", "--first-only", "--target", mountPath)
|
||||||
out, err := cmd.CombinedOutput()
|
out, err := cmd.CombinedOutput()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -242,6 +286,7 @@ func parseFindMntResolveSource(out string) (string, error) {
|
|||||||
return match[1], nil
|
return match[1], nil
|
||||||
}
|
}
|
||||||
// Check if out is a block device
|
// Check if out is a block device
|
||||||
|
// nolint
|
||||||
reBlk := regexp.MustCompile("^devtmpfs\\[(/[^/]+(?:/[^/]*)*)\\]$")
|
reBlk := regexp.MustCompile("^devtmpfs\\[(/[^/]+(?:/[^/]*)*)\\]$")
|
||||||
if match := reBlk.FindStringSubmatch(out); match != nil {
|
if match := reBlk.FindStringSubmatch(out); match != nil {
|
||||||
return fmt.Sprintf("/dev%s", match[1]), nil
|
return fmt.Sprintf("/dev%s", match[1]), nil
|
||||||
|
@ -35,6 +35,7 @@ const (
|
|||||||
rbdDefaultUserID = rbdDefaultAdminID
|
rbdDefaultUserID = rbdDefaultAdminID
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Driver contains the default identity,node and controller struct
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
cd *csicommon.CSIDriver
|
cd *csicommon.CSIDriver
|
||||||
|
|
||||||
@ -47,16 +48,19 @@ var (
|
|||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetDriver() *Driver {
|
// NewDriver returns new rbd driver
|
||||||
|
func NewDriver() *Driver {
|
||||||
return &Driver{}
|
return &Driver{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewIdentityServer initialize a identity server for rbd CSI driver
|
||||||
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
|
||||||
return &IdentityServer{
|
return &IdentityServer{
|
||||||
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
|
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewControllerServer initialize a controller server for rbd CSI driver
|
||||||
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersister) *ControllerServer {
|
||||||
return &ControllerServer{
|
return &ControllerServer{
|
||||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||||
@ -64,6 +68,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewNodeServer initialize a node server for rbd CSI driver.
|
||||||
func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) {
|
func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, error) {
|
||||||
mounter := mount.New("")
|
mounter := mount.New("")
|
||||||
if containerized {
|
if containerized {
|
||||||
@ -79,6 +84,8 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool) (*NodeServer, err
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run start a non-blocking grpc controller,node and identityserver for
|
||||||
|
// rbd CSI driver which can serve multiple parallel requests
|
||||||
func (r *Driver) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) {
|
func (r *Driver) Run(driverName, nodeID, endpoint string, containerized bool, cachePersister util.CachePersister) {
|
||||||
var err error
|
var err error
|
||||||
glog.Infof("Driver: %v version: %v", driverName, version)
|
glog.Infof("Driver: %v version: %v", driverName, version)
|
||||||
@ -105,7 +112,10 @@ func (r *Driver) Run(driverName, nodeID, endpoint string, containerized bool, ca
|
|||||||
}
|
}
|
||||||
|
|
||||||
r.cs = NewControllerServer(r.cd, cachePersister)
|
r.cs = NewControllerServer(r.cd, cachePersister)
|
||||||
r.cs.LoadExDataFromMetadataStore()
|
|
||||||
|
if err = r.cs.LoadExDataFromMetadataStore(); err != nil {
|
||||||
|
glog.Fatalf("failed to load metadata from store, err %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
s := csicommon.NewNonBlockingGRPCServer()
|
s := csicommon.NewNonBlockingGRPCServer()
|
||||||
s.Start(endpoint, r.ids, r.cs, r.ns)
|
s.Start(endpoint, r.ids, r.cs, r.ns)
|
||||||
|
@ -61,6 +61,7 @@ func getRbdDevFromImageAndPool(pool string, image string) (string, bool) {
|
|||||||
name := f.Name()
|
name := f.Name()
|
||||||
// First match pool, then match name.
|
// First match pool, then match name.
|
||||||
poolFile := path.Join(sysPath, name, "pool")
|
poolFile := path.Join(sysPath, name, "pool")
|
||||||
|
// #nosec
|
||||||
poolBytes, err := ioutil.ReadFile(poolFile)
|
poolBytes, err := ioutil.ReadFile(poolFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("error reading %s: %v", poolFile, err)
|
glog.V(4).Infof("error reading %s: %v", poolFile, err)
|
||||||
@ -71,6 +72,7 @@ func getRbdDevFromImageAndPool(pool string, image string) (string, bool) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
imgFile := path.Join(sysPath, name, "name")
|
imgFile := path.Join(sysPath, name, "name")
|
||||||
|
// #nosec
|
||||||
imgBytes, err := ioutil.ReadFile(imgFile)
|
imgBytes, err := ioutil.ReadFile(imgFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("error reading %s: %v", imgFile, err)
|
glog.V(4).Infof("error reading %s: %v", imgFile, err)
|
||||||
@ -135,21 +137,34 @@ func getNbdDevFromImageAndPool(pool string, image string) (string, bool) {
|
|||||||
|
|
||||||
for i := 0; i < maxNbds; i++ {
|
for i := 0; i < maxNbds; i++ {
|
||||||
nbdPath := basePath + strconv.Itoa(i)
|
nbdPath := basePath + strconv.Itoa(i)
|
||||||
|
devicePath, err := getnbdDevicePath(nbdPath, imgPath, i)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return devicePath, true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func getnbdDevicePath(nbdPath, imgPath string, count int) (string, error) {
|
||||||
|
|
||||||
_, err := os.Lstat(nbdPath)
|
_, err := os.Lstat(nbdPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("error reading nbd info directory %s: %v", nbdPath, err)
|
glog.V(4).Infof("error reading nbd info directory %s: %v", nbdPath, err)
|
||||||
continue
|
return "", err
|
||||||
}
|
}
|
||||||
|
// #nosec
|
||||||
pidBytes, err := ioutil.ReadFile(path.Join(nbdPath, "pid"))
|
pidBytes, err := ioutil.ReadFile(path.Join(nbdPath, "pid"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(5).Infof("did not find valid pid file in dir %s: %v", nbdPath, err)
|
glog.V(5).Infof("did not find valid pid file in dir %s: %v", nbdPath, err)
|
||||||
continue
|
return "", err
|
||||||
}
|
}
|
||||||
cmdlineFileName := path.Join(hostRootFS, "/proc", strings.TrimSpace(string(pidBytes)), "cmdline")
|
cmdlineFileName := path.Join(hostRootFS, "/proc", strings.TrimSpace(string(pidBytes)), "cmdline")
|
||||||
|
// #nosec
|
||||||
rawCmdline, err := ioutil.ReadFile(cmdlineFileName)
|
rawCmdline, err := ioutil.ReadFile(cmdlineFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("failed to read cmdline file %s: %v", cmdlineFileName, err)
|
glog.V(4).Infof("failed to read cmdline file %s: %v", cmdlineFileName, err)
|
||||||
continue
|
return "", err
|
||||||
}
|
}
|
||||||
cmdlineArgs := strings.FieldsFunc(string(rawCmdline), func(r rune) bool {
|
cmdlineArgs := strings.FieldsFunc(string(rawCmdline), func(r rune) bool {
|
||||||
return r == '\u0000'
|
return r == '\u0000'
|
||||||
@ -159,21 +174,20 @@ func getNbdDevFromImageAndPool(pool string, image string) (string, bool) {
|
|||||||
// rbd-nbd map pool/image ...
|
// rbd-nbd map pool/image ...
|
||||||
if len(cmdlineArgs) < 3 || cmdlineArgs[0] != rbdTonbd || cmdlineArgs[1] != "map" {
|
if len(cmdlineArgs) < 3 || cmdlineArgs[0] != rbdTonbd || cmdlineArgs[1] != "map" {
|
||||||
glog.V(4).Infof("nbd device %s is not used by rbd", nbdPath)
|
glog.V(4).Infof("nbd device %s is not used by rbd", nbdPath)
|
||||||
continue
|
return "", err
|
||||||
|
|
||||||
}
|
}
|
||||||
if cmdlineArgs[2] != imgPath {
|
if cmdlineArgs[2] != imgPath {
|
||||||
glog.V(4).Infof("rbd-nbd device %s did not match expected image path: %s with path found: %s",
|
glog.V(4).Infof("rbd-nbd device %s did not match expected image path: %s with path found: %s",
|
||||||
nbdPath, imgPath, cmdlineArgs[2])
|
nbdPath, imgPath, cmdlineArgs[2])
|
||||||
continue
|
return "", err
|
||||||
}
|
}
|
||||||
devicePath := path.Join("/dev", "nbd"+strconv.Itoa(i))
|
devicePath := path.Join("/dev", "nbd"+strconv.Itoa(count))
|
||||||
if _, err := os.Lstat(devicePath); err != nil {
|
if _, err := os.Lstat(devicePath); err != nil {
|
||||||
glog.Warningf("Stat device %s for imgpath %s failed %v", devicePath, imgPath, err)
|
glog.Warningf("Stat device %s for imgpath %s failed %v", devicePath, imgPath, err)
|
||||||
continue
|
return "", err
|
||||||
}
|
}
|
||||||
return devicePath, true
|
return devicePath, nil
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat a path, if it doesn't exist, retry maxRetries times.
|
// Stat a path, if it doesn't exist, retry maxRetries times.
|
||||||
@ -212,28 +226,31 @@ func checkRbdNbdTools() bool {
|
|||||||
|
|
||||||
func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string]string) (string, error) {
|
func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string]string) (string, error) {
|
||||||
var err error
|
var err error
|
||||||
var output []byte
|
|
||||||
|
|
||||||
image := volOptions.VolName
|
image := volOptions.VolName
|
||||||
imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image)
|
imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image)
|
||||||
|
|
||||||
useNBD := false
|
useNBD := false
|
||||||
cmdName := rbd
|
|
||||||
moduleName := rbd
|
moduleName := rbd
|
||||||
if volOptions.Mounter == rbdTonbd && hasNBD {
|
if volOptions.Mounter == rbdTonbd && hasNBD {
|
||||||
useNBD = true
|
useNBD = true
|
||||||
cmdName = rbdTonbd
|
|
||||||
moduleName = nbd
|
moduleName = nbd
|
||||||
}
|
}
|
||||||
|
|
||||||
devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD)
|
devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD)
|
||||||
if !found {
|
if !found {
|
||||||
attachdetachMutex.LockKey(imagePath)
|
attachdetachMutex.LockKey(imagePath)
|
||||||
defer attachdetachMutex.UnlockKey(imagePath)
|
|
||||||
|
defer func() {
|
||||||
|
if err = attachdetachMutex.UnlockKey(imagePath); err != nil {
|
||||||
|
glog.Warningf("failed to unlock mutex imagepath:%s %v", imagePath, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
_, err = execCommand("modprobe", []string{moduleName})
|
_, err = execCommand("modprobe", []string{moduleName})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("rbd: failed to load rbd kernel module:%v", err)
|
glog.Warningf("rbd: failed to load rbd kernel module:%v", err)
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff := wait.Backoff{
|
backoff := wait.Backoff{
|
||||||
@ -241,6 +258,56 @@ func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string
|
|||||||
Factor: rbdImageWatcherFactor,
|
Factor: rbdImageWatcherFactor,
|
||||||
Steps: rbdImageWatcherSteps,
|
Steps: rbdImageWatcherSteps,
|
||||||
}
|
}
|
||||||
|
err = waitForrbdImage(backoff, volOptions, userID, credentials)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
devicePath, err = createPath(volOptions, userID, credentials)
|
||||||
|
}
|
||||||
|
|
||||||
|
return devicePath, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPath(volOpt *rbdVolume, userID string, creds map[string]string) (string, error) {
|
||||||
|
image := volOpt.VolName
|
||||||
|
imagePath := fmt.Sprintf("%s/%s", volOpt.Pool, image)
|
||||||
|
|
||||||
|
mon, err := getMon(volOpt, creds)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(5).Infof("rbd: map mon %s", mon)
|
||||||
|
key, err := getRBDKey(userID, creds)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
useNBD := false
|
||||||
|
cmdName := rbd
|
||||||
|
if volOpt.Mounter == rbdTonbd && hasNBD {
|
||||||
|
useNBD = true
|
||||||
|
cmdName = rbdTonbd
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := execCommand(cmdName, []string{
|
||||||
|
"map", imagePath, "--id", userID, "-m", mon, "--key=" + key})
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("rbd: map error %v, rbd output: %s", err, string(output))
|
||||||
|
return "", fmt.Errorf("rbd: map failed %v, rbd output: %s", err, string(output))
|
||||||
|
}
|
||||||
|
devicePath, found := waitForPath(volOpt.Pool, image, 10, useNBD)
|
||||||
|
if !found {
|
||||||
|
return "", fmt.Errorf("Could not map image %s, Timeout after 10s", imagePath)
|
||||||
|
}
|
||||||
|
return devicePath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForrbdImage(backoff wait.Backoff, volOptions *rbdVolume, userID string, credentials map[string]string) error {
|
||||||
|
image := volOptions.VolName
|
||||||
|
imagePath := fmt.Sprintf("%s/%s", volOptions.Pool, image)
|
||||||
|
|
||||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||||
used, rbdOutput, err := rbdStatus(volOptions, userID, credentials)
|
used, rbdOutput, err := rbdStatus(volOptions, userID, credentials)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -250,36 +317,10 @@ func attachRBDImage(volOptions *rbdVolume, userID string, credentials map[string
|
|||||||
})
|
})
|
||||||
// return error if rbd image has not become available for the specified timeout
|
// return error if rbd image has not become available for the specified timeout
|
||||||
if err == wait.ErrWaitTimeout {
|
if err == wait.ErrWaitTimeout {
|
||||||
return "", fmt.Errorf("rbd image %s is still being used", imagePath)
|
return fmt.Errorf("rbd image %s is still being used", imagePath)
|
||||||
}
|
}
|
||||||
// return error if any other errors were encountered during wating for the image to become available
|
// return error if any other errors were encountered during waiting for the image to become available
|
||||||
if err != nil {
|
return err
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
mon, err := getMon(volOptions, credentials)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(5).Infof("rbd: map mon %s", mon)
|
|
||||||
key, err := getRBDKey(userID, credentials)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
output, err = execCommand(cmdName, []string{
|
|
||||||
"map", imagePath, "--id", userID, "-m", mon, "--key=" + key})
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("rbd: map error %v, rbd output: %s", err, string(output))
|
|
||||||
return "", fmt.Errorf("rbd: map failed %v, rbd output: %s", err, string(output))
|
|
||||||
}
|
|
||||||
devicePath, found = waitForPath(volOptions.Pool, image, 10, useNBD)
|
|
||||||
if !found {
|
|
||||||
return "", fmt.Errorf("Could not map image %s, Timeout after 10s", imagePath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return devicePath, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func detachRBDDevice(devicePath string) error {
|
func detachRBDDevice(devicePath string) error {
|
||||||
|
@ -221,6 +221,7 @@ func deleteRBDImage(pOpts *rbdVolume, adminID string, credentials map[string]str
|
|||||||
}
|
}
|
||||||
|
|
||||||
func execCommand(command string, args []string) ([]byte, error) {
|
func execCommand(command string, args []string) ([]byte, error) {
|
||||||
|
// #nosec
|
||||||
cmd := exec.Command(command, args...)
|
cmd := exec.Command(command, args...)
|
||||||
return cmd.CombinedOutput()
|
return cmd.CombinedOutput()
|
||||||
}
|
}
|
||||||
@ -258,6 +259,12 @@ func getRBDVolumeOptions(volOptions map[string]string) (*rbdVolume, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
getCredsFromVol(rbdVol, volOptions)
|
||||||
|
return rbdVol, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCredsFromVol(rbdVol *rbdVolume, volOptions map[string]string) {
|
||||||
|
var ok bool
|
||||||
rbdVol.AdminID, ok = volOptions["adminid"]
|
rbdVol.AdminID, ok = volOptions["adminid"]
|
||||||
if !ok {
|
if !ok {
|
||||||
rbdVol.AdminID = rbdDefaultAdminID
|
rbdVol.AdminID = rbdDefaultAdminID
|
||||||
@ -270,9 +277,7 @@ func getRBDVolumeOptions(volOptions map[string]string) (*rbdVolume, error) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
rbdVol.Mounter = rbdDefaultMounter
|
rbdVol.Mounter = rbdDefaultMounter
|
||||||
}
|
}
|
||||||
return rbdVol, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRBDSnapshotOptions(snapOptions map[string]string) (*rbdSnapshot, error) {
|
func getRBDSnapshotOptions(snapOptions map[string]string) (*rbdSnapshot, error) {
|
||||||
var ok bool
|
var ok bool
|
||||||
rbdSnap := &rbdSnapshot{}
|
rbdSnap := &rbdSnapshot{}
|
||||||
|
@ -23,11 +23,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
//PluginFolder defines location of plugins
|
||||||
PluginFolder = "/var/lib/kubelet/plugins"
|
PluginFolder = "/var/lib/kubelet/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ForAllFunc stores metadata with identifier
|
||||||
type ForAllFunc func(identifier string) error
|
type ForAllFunc func(identifier string) error
|
||||||
|
|
||||||
|
// CachePersister interface implemented for store
|
||||||
type CachePersister interface {
|
type CachePersister interface {
|
||||||
Create(identifier string, data interface{}) error
|
Create(identifier string, data interface{}) error
|
||||||
Get(identifier string, data interface{}) error
|
Get(identifier string, data interface{}) error
|
||||||
@ -35,6 +38,7 @@ type CachePersister interface {
|
|||||||
Delete(identifier string) error
|
Delete(identifier string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewCachePersister returns CachePersister based on store
|
||||||
func NewCachePersister(metadataStore, driverName string) (CachePersister, error) {
|
func NewCachePersister(metadataStore, driverName string) (CachePersister, error) {
|
||||||
if metadataStore == "k8s_configmap" {
|
if metadataStore == "k8s_configmap" {
|
||||||
glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")
|
glog.Infof("cache-perister: using kubernetes configmap as metadata cache persister")
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// K8sCMCache to store metadata
|
||||||
type K8sCMCache struct {
|
type K8sCMCache struct {
|
||||||
Client *k8s.Clientset
|
Client *k8s.Clientset
|
||||||
Namespace string
|
Namespace string
|
||||||
@ -47,6 +48,8 @@ const (
|
|||||||
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
|
csiMetadataLabelAttr = "com.ceph.ceph-csi/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GetK8sNamespace returns pod namespace. if pod namespace is empty
|
||||||
|
// it returns default namespace
|
||||||
func GetK8sNamespace() string {
|
func GetK8sNamespace() string {
|
||||||
namespace := os.Getenv("POD_NAMESPACE")
|
namespace := os.Getenv("POD_NAMESPACE")
|
||||||
if namespace == "" {
|
if namespace == "" {
|
||||||
@ -55,6 +58,7 @@ func GetK8sNamespace() string {
|
|||||||
return namespace
|
return namespace
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewK8sClient create kubernetes client
|
||||||
func NewK8sClient() *k8s.Clientset {
|
func NewK8sClient() *k8s.Clientset {
|
||||||
var cfg *rest.Config
|
var cfg *rest.Config
|
||||||
var err error
|
var err error
|
||||||
@ -88,6 +92,7 @@ func (k8scm *K8sCMCache) getMetadataCM(resourceID string) (*v1.ConfigMap, error)
|
|||||||
return cm, nil
|
return cm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//ForAll list the metadata in configmaps and filters outs based on the pattern
|
||||||
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
||||||
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
|
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", csiMetadataLabelAttr, cmLabel)}
|
||||||
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(listOpts)
|
cms, err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).List(listOpts)
|
||||||
@ -114,6 +119,7 @@ func (k8scm *K8sCMCache) ForAll(pattern string, destObj interface{}, f ForAllFun
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create stores the metadata in configmaps with identifier name
|
||||||
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
|
func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
|
||||||
cm, err := k8scm.getMetadataCM(identifier)
|
cm, err := k8scm.getMetadataCM(identifier)
|
||||||
if cm != nil && err == nil {
|
if cm != nil && err == nil {
|
||||||
@ -149,6 +155,7 @@ func (k8scm *K8sCMCache) Create(identifier string, data interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get retrieves the metadata in configmaps with identifier name
|
||||||
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
|
func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
|
||||||
cm, err := k8scm.getMetadataCM(identifier)
|
cm, err := k8scm.getMetadataCM(identifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -161,6 +168,7 @@ func (k8scm *K8sCMCache) Get(identifier string, data interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete deletes the metadata in configmaps with identifier name
|
||||||
func (k8scm *K8sCMCache) Delete(identifier string) error {
|
func (k8scm *K8sCMCache) Delete(identifier string) error {
|
||||||
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(identifier, nil)
|
err := k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Delete(identifier, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -29,15 +29,20 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NodeCache to store metadata
|
||||||
type NodeCache struct {
|
type NodeCache struct {
|
||||||
BasePath string
|
BasePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
var cacheDir = "controller"
|
var cacheDir = "controller"
|
||||||
|
|
||||||
|
var errDec = errors.New("file not found")
|
||||||
|
|
||||||
|
// EnsureCacheDirectory creates cache directory if not present
|
||||||
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
|
func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
|
||||||
fullPath := path.Join(nc.BasePath, cacheDir)
|
fullPath := path.Join(nc.BasePath, cacheDir)
|
||||||
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
|
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
|
||||||
|
// #nosec
|
||||||
if err := os.Mkdir(fullPath, 0755); err != nil {
|
if err := os.Mkdir(fullPath, 0755); err != nil {
|
||||||
return errors.Wrapf(err, "node-cache: failed to create %s folder with error: %v", fullPath, err)
|
return errors.Wrapf(err, "node-cache: failed to create %s folder with error: %v", fullPath, err)
|
||||||
}
|
}
|
||||||
@ -45,6 +50,7 @@ func (nc *NodeCache) EnsureCacheDirectory(cacheDir string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//ForAll list the metadata in Nodecache and filters outs based on the pattern
|
||||||
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) error {
|
||||||
err := nc.EnsureCacheDirectory(cacheDir)
|
err := nc.EnsureCacheDirectory(cacheDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -54,39 +60,62 @@ func (nc *NodeCache) ForAll(pattern string, destObj interface{}, f ForAllFunc) e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
|
return errors.Wrapf(err, "node-cache: failed to read %s folder", nc.BasePath)
|
||||||
}
|
}
|
||||||
|
path := path.Join(nc.BasePath, cacheDir)
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
match, err := regexp.MatchString(pattern, file.Name())
|
err = decodeObj(path, pattern, file, destObj)
|
||||||
if err != nil || !match {
|
if err == errDec {
|
||||||
continue
|
continue
|
||||||
}
|
} else if err == nil {
|
||||||
if !strings.HasSuffix(file.Name(), ".json") {
|
if err = f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil {
|
||||||
continue
|
|
||||||
}
|
|
||||||
fp, err := os.Open(path.Join(nc.BasePath, cacheDir, file.Name()))
|
|
||||||
if err != nil {
|
|
||||||
glog.Infof("node-cache: open file: %s err %v", file.Name(), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
decoder := json.NewDecoder(fp)
|
|
||||||
if err = decoder.Decode(destObj); err != nil {
|
|
||||||
fp.Close()
|
|
||||||
return errors.Wrapf(err, "node-cache: couldn't decode file %s", file.Name())
|
|
||||||
}
|
|
||||||
if err := f(strings.TrimSuffix(file.Name(), filepath.Ext(file.Name()))); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decodeObj(filepath, pattern string, file os.FileInfo, destObj interface{}) error {
|
||||||
|
match, err := regexp.MatchString(pattern, file.Name())
|
||||||
|
if err != nil || !match {
|
||||||
|
return errDec
|
||||||
|
}
|
||||||
|
if !strings.HasSuffix(file.Name(), ".json") {
|
||||||
|
return errDec
|
||||||
|
}
|
||||||
|
// #nosec
|
||||||
|
fp, err := os.Open(path.Join(filepath, file.Name()))
|
||||||
|
if err != nil {
|
||||||
|
glog.Infof("node-cache: open file: %s err %v", file.Name(), err)
|
||||||
|
return errDec
|
||||||
|
}
|
||||||
|
decoder := json.NewDecoder(fp)
|
||||||
|
if err = decoder.Decode(destObj); err != nil {
|
||||||
|
if err = fp.Close(); err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to close file %s", file.Name())
|
||||||
|
|
||||||
|
}
|
||||||
|
return errors.Wrapf(err, "node-cache: couldn't decode file %s", file.Name())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create creates the metadata file in cache directory with identifier name
|
||||||
func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
||||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||||
fp, err := os.Create(file)
|
fp, err := os.Create(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
|
return errors.Wrapf(err, "node-cache: failed to create metadata storage file %s\n", file)
|
||||||
}
|
}
|
||||||
defer fp.Close()
|
|
||||||
|
defer func() {
|
||||||
|
if err = fp.Close(); err != nil {
|
||||||
|
glog.Warningf("failed to close file:%s %v", fp.Name(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
encoder := json.NewEncoder(fp)
|
encoder := json.NewEncoder(fp)
|
||||||
if err = encoder.Encode(data); err != nil {
|
if err = encoder.Encode(data); err != nil {
|
||||||
return errors.Wrapf(err, "node-cache: failed to encode metadata for file: %s\n", file)
|
return errors.Wrapf(err, "node-cache: failed to encode metadata for file: %s\n", file)
|
||||||
@ -95,13 +124,20 @@ func (nc *NodeCache) Create(identifier string, data interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get retrieves the metadata from cache directory with identifier name
|
||||||
func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
||||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||||
|
// #nosec
|
||||||
fp, err := os.Open(file)
|
fp, err := os.Open(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "node-cache: open error for %s", file)
|
return errors.Wrapf(err, "node-cache: open error for %s", file)
|
||||||
}
|
}
|
||||||
defer fp.Close()
|
|
||||||
|
defer func() {
|
||||||
|
if err = fp.Close(); err != nil {
|
||||||
|
glog.Warningf("failed to close file:%s %v", fp.Name(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
decoder := json.NewDecoder(fp)
|
decoder := json.NewDecoder(fp)
|
||||||
if err = decoder.Decode(data); err != nil {
|
if err = decoder.Decode(data); err != nil {
|
||||||
@ -111,6 +147,7 @@ func (nc *NodeCache) Get(identifier string, data interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete deletes the metadata file from cache directory with identifier name
|
||||||
func (nc *NodeCache) Delete(identifier string) error {
|
func (nc *NodeCache) Delete(identifier string) error {
|
||||||
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
file := path.Join(nc.BasePath, cacheDir, identifier+".json")
|
||||||
err := os.Remove(file)
|
err := os.Remove(file)
|
||||||
|
Loading…
Reference in New Issue
Block a user