From c0ab4c03e6c71005ff5d5bc67008ec2372f7d07b Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Mon, 11 Jan 2021 09:02:41 +0100 Subject: [PATCH] cephfs: move NodeGetVolumeStats() to CephFS NodeServer The CephFS NodeServer should handle the CephFS specific requests. This is not something that the NodeServer for RBD should handle. Signed-off-by: Niels de Vos --- internal/cephfs/nodeserver.go | 21 +++++ internal/csi-common/nodeserver-default.go | 96 +---------------------- internal/csi-common/utils.go | 70 +++++++++++++++++ 3 files changed, 93 insertions(+), 94 deletions(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 21b123ce0..6206a87ff 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -325,3 +325,24 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC }, }, nil } + +// NodeGetVolumeStats returns volume stats. +func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + var err error + targetPath := req.GetVolumePath() + if targetPath == "" { + err = fmt.Errorf("targetpath %v is empty", targetPath) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + stat, err := os.Stat(targetPath) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to get stat for targetpath %q: %v", targetPath, err) + } + + if stat.Mode().IsDir() { + return csicommon.FilesystemNodeGetVolumeStats(ctx, targetPath) + } + + return nil, fmt.Errorf("targetpath %q is not a directory or device", targetPath) +} diff --git a/internal/csi-common/nodeserver-default.go b/internal/csi-common/nodeserver-default.go index 97376da9a..d692111bf 100644 --- a/internal/csi-common/nodeserver-default.go +++ b/internal/csi-common/nodeserver-default.go @@ -17,18 +17,13 @@ limitations under the License. package csicommon import ( - "fmt" - "os" + "context" "github.com/ceph/ceph-csi/internal/util" - "context" - "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/volume" ) // DefaultNodeServer stores driver object. @@ -85,94 +80,7 @@ func (ns *DefaultNodeServer) NodeGetCapabilities(ctx context.Context, req *csi.N // NodeGetVolumeStats returns volume stats. func (ns *DefaultNodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - var err error - targetPath := req.GetVolumePath() - if targetPath == "" { - err = fmt.Errorf("targetpath %v is empty", targetPath) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - /* - volID := req.GetVolumeId() - - TODO: Map the volumeID to the targetpath. - - CephFS: - we need secret to connect to the ceph cluster to get the volumeID from volume - Name, however `secret` field/option is not available in NodeGetVolumeStats spec, - Below issue covers this request and once its available, we can do the validation - as per the spec. - - https://github.com/container-storage-interface/spec/issues/371 - - RBD: - Below issue covers this request for RBD and once its available, we can do the validation - as per the spec. - - https://github.com/ceph/ceph-csi/issues/511 - - */ - - isMnt, err := util.IsMountPoint(targetPath) - - if err != nil { - if os.IsNotExist(err) { - return nil, status.Errorf(codes.InvalidArgument, "targetpath %s does not exist", targetPath) - } - return nil, err - } - if !isMnt { - return nil, status.Errorf(codes.InvalidArgument, "targetpath %s is not mounted", targetPath) - } - - cephMetricsProvider := volume.NewMetricsStatFS(targetPath) - volMetrics, volMetErr := cephMetricsProvider.GetMetrics() - if volMetErr != nil { - return nil, status.Error(codes.Internal, volMetErr.Error()) - } - - available, ok := (*(volMetrics.Available)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch available bytes")) - } - capacity, ok := (*(volMetrics.Capacity)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch capacity bytes")) - return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes") - } - used, ok := (*(volMetrics.Used)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch used bytes")) - } - inodes, ok := (*(volMetrics.Inodes)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch available inodes")) - return nil, status.Error(codes.Unknown, "failed to fetch available inodes") - } - inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch free inodes")) - } - - inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64() - if !ok { - klog.Errorf(util.Log(ctx, "failed to fetch used inodes")) - } - return &csi.NodeGetVolumeStatsResponse{ - Usage: []*csi.VolumeUsage{ - { - Available: available, - Total: capacity, - Used: used, - Unit: csi.VolumeUsage_BYTES, - }, - { - Available: inodesFree, - Total: inodes, - Used: inodesUsed, - Unit: csi.VolumeUsage_INODES, - }, - }, - }, nil + return nil, status.Error(codes.Unimplemented, "") } // ConstructMountOptions returns only unique mount options in slice. diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index b5cbdc415..36b8c3dc2 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -19,6 +19,7 @@ package csicommon import ( "context" "fmt" + "os" "runtime/debug" "strings" "sync/atomic" @@ -33,6 +34,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/volume" ) func parseEndpoint(ep string) (string, string, error) { @@ -185,3 +187,71 @@ func panicHandler(ctx context.Context, req interface{}, info *grpc.UnaryServerIn }() return handler(ctx, req) } + +// FilesystemNodeGetVolumeStats can be used for getting the metrics as +// requested by the NodeGetVolumeStats CSI procedure. +// It is shared for FileMode volumes, both the CephFS and RBD NodeServers call +// this. +func FilesystemNodeGetVolumeStats(ctx context.Context, targetPath string) (*csi.NodeGetVolumeStatsResponse, error) { + isMnt, err := util.IsMountPoint(targetPath) + + if err != nil { + if os.IsNotExist(err) { + return nil, status.Errorf(codes.InvalidArgument, "targetpath %s does not exist", targetPath) + } + return nil, err + } + if !isMnt { + return nil, status.Errorf(codes.InvalidArgument, "targetpath %s is not mounted", targetPath) + } + + cephMetricsProvider := volume.NewMetricsStatFS(targetPath) + volMetrics, volMetErr := cephMetricsProvider.GetMetrics() + if volMetErr != nil { + return nil, status.Error(codes.Internal, volMetErr.Error()) + } + + available, ok := (*(volMetrics.Available)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch available bytes") + } + capacity, ok := (*(volMetrics.Capacity)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch capacity bytes") + return nil, status.Error(codes.Unknown, "failed to fetch capacity bytes") + } + used, ok := (*(volMetrics.Used)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch used bytes") + } + inodes, ok := (*(volMetrics.Inodes)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch available inodes") + return nil, status.Error(codes.Unknown, "failed to fetch available inodes") + } + inodesFree, ok := (*(volMetrics.InodesFree)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch free inodes") + } + + inodesUsed, ok := (*(volMetrics.InodesUsed)).AsInt64() + if !ok { + util.ErrorLog(ctx, "failed to fetch used inodes") + } + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Available: available, + Total: capacity, + Used: used, + Unit: csi.VolumeUsage_BYTES, + }, + { + Available: inodesFree, + Total: inodes, + Used: inodesUsed, + Unit: csi.VolumeUsage_INODES, + }, + }, + }, nil +}