diff --git a/pkg/csi-common/server.go b/pkg/csi-common/server.go index 37b31aa3c..53d56c8f6 100644 --- a/pkg/csi-common/server.go +++ b/pkg/csi-common/server.go @@ -93,6 +93,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c opts := []grpc.ServerOption{ grpc_middleware.WithUnaryServerChain( + contextIDInjector, logGRPC, panicHandler, ), diff --git a/pkg/csi-common/utils.go b/pkg/csi-common/utils.go index 28b4fc8c4..78b1d1735 100644 --- a/pkg/csi-common/utils.go +++ b/pkg/csi-common/utils.go @@ -20,6 +20,7 @@ import ( "fmt" "runtime/debug" "strings" + "sync/atomic" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" @@ -105,14 +106,22 @@ func RunControllerandNodePublishServer(endpoint string, d *CSIDriver, cs csi.Con s.Wait() } +var id uint64 + +func contextIDInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + id = atomic.AddUint64(&id, 1) + ctx = context.WithValue(ctx, "ID", id) + return handler(ctx, req) +} + func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - klog.V(3).Infof("GRPC call: %s", info.FullMethod) - klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) + klog.V(3).Infof("ID: %d GRPC call: %s", ctx.Value("ID"), info.FullMethod) + klog.V(5).Infof("ID: %d GRPC request: %s", ctx.Value("ID"), protosanitizer.StripSecrets(req)) resp, err := handler(ctx, req) if err != nil { - klog.Errorf("GRPC error: %v", err) + klog.Errorf("ID: %d GRPC error: %v", ctx.Value("ID"), err) } else { - klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) + klog.V(5).Infof("ID: %d GRPC response: %s", ctx.Value("ID"), protosanitizer.StripSecrets(resp)) } return resp, err } diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index ac426de60..19cd9e78f 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -243,7 +243,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis defer targetPathLocker.Unlock(idLk, targetPath) // Check if that target path exists properly - notMnt, err := ns.createTargetMountPath(targetPath, isBlock) + notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) if err != nil { return nil, err } @@ -253,12 +253,12 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } // Publish Path - err = ns.mountVolume(stagingPath, req) + err = ns.mountVolume(ctx, stagingPath, req) if err != nil { return nil, err } - klog.Infof("rbd: successfully mounted stagingPath %s to targetPath %s", stagingPath, targetPath) + klog.Infof("ID: %d rbd: successfully mounted stagingPath %s to targetPath %s", ctx.Value("ID"), stagingPath, targetPath) return &csi.NodePublishVolumeResponse{}, nil } @@ -318,15 +318,15 @@ func (ns *NodeServer) mountVolumeToStagePath(req *csi.NodeStageVolumeRequest, st return err } -func (ns *NodeServer) mountVolume(stagingPath string, req *csi.NodePublishVolumeRequest) error { +func (ns *NodeServer) mountVolume(ctx context.Context, stagingPath string, req *csi.NodePublishVolumeRequest) error { // Publish Path fsType := req.GetVolumeCapability().GetMount().GetFsType() readOnly := req.GetReadonly() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() isBlock := req.GetVolumeCapability().GetBlock() != nil targetPath := req.GetTargetPath() - klog.V(4).Infof("target %v\nisBlock %v\nfstype %v\nstagingPath %v\nreadonly %v\nmountflags %v\n", - targetPath, isBlock, fsType, stagingPath, readOnly, mountFlags) + klog.V(4).Infof("ID: %d target %v\nisBlock %v\nfstype %v\nstagingPath %v\nreadonly %v\nmountflags %v\n", + ctx.Value("ID"), targetPath, isBlock, fsType, stagingPath, readOnly, mountFlags) mountFlags = append(mountFlags, "bind") if readOnly { mountFlags = append(mountFlags, "ro") @@ -338,7 +338,7 @@ func (ns *NodeServer) mountVolume(stagingPath string, req *csi.NodePublishVolume return nil } -func (ns *NodeServer) createTargetMountPath(mountPath string, isBlock bool) (bool, error) { +func (ns *NodeServer) createTargetMountPath(ctx context.Context, mountPath string, isBlock bool) (bool, error) { // Check if that mount path exists properly notMnt, err := mount.IsNotMountPoint(ns.mounter, mountPath) if err != nil { @@ -347,11 +347,11 @@ func (ns *NodeServer) createTargetMountPath(mountPath string, isBlock bool) (boo // #nosec pathFile, e := os.OpenFile(mountPath, os.O_CREATE|os.O_RDWR, 0750) if e != nil { - klog.V(4).Infof("Failed to create mountPath:%s with error: %v", mountPath, err) + klog.V(4).Infof("ID: %d Failed to create mountPath:%s with error: %v", ctx.Value("ID"), mountPath, err) return notMnt, status.Error(codes.Internal, e.Error()) } if err = pathFile.Close(); err != nil { - klog.V(4).Infof("Failed to close mountPath:%s with error: %v", mountPath, err) + klog.V(4).Infof("ID: %d Failed to close mountPath:%s with error: %v", ctx.Value("ID"), mountPath, err) return notMnt, status.Error(codes.Internal, err.Error()) } } else {