From 0a22e3a18642153fe5f021e66c51759f0ef69946 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Tue, 29 Apr 2025 11:32:43 +0200 Subject: [PATCH] cleanup: address golangci 'funcorder' linter problems The new 'funcorder' linter expects all public functions to be placed before private functions of a struct. Many private functions needed moving further down into their files. Some files had many issues reported. To reduce the churn in those files, they have been annotated with a `//nolint:funcorder` comment. Signed-off-by: Niels de Vos --- internal/cephfs/controllerserver.go | 1 + internal/cephfs/core/quiesce.go | 32 +- internal/cephfs/mounter/kernel.go | 30 +- internal/cephfs/nodeserver.go | 1 + internal/cephfs/store/volumeoptions.go | 1 + .../persistentvolume/persistentvolume.go | 194 ++++++------ .../volumegroupreplicationcontent.go | 46 +-- internal/csi-addons/networkfence/fencing.go | 286 +++++++++--------- internal/csi-addons/server/server.go | 18 +- internal/health-checker/manager.go | 80 ++--- internal/journal/volumegroupjournal.go | 10 +- internal/kms/aws_metadata.go | 94 +++--- internal/kms/aws_sts_metadata.go | 86 +++--- internal/kms/azure_vault.go | 88 +++--- internal/kms/keyprotect.go | 96 +++--- internal/kms/kmip.go | 8 +- internal/kms/secretskms.go | 94 +++--- internal/kms/vault.go | 26 +- internal/kms/vault_tokens.go | 96 +++--- internal/nfs/controller/volume.go | 40 +-- internal/rbd/controllerserver.go | 1 + internal/rbd/group/util.go | 1 + internal/rbd/manager.go | 214 ++++++------- internal/rbd/nodeserver.go | 1 + internal/rbd/rbd_util.go | 1 + internal/util/conn_pool.go | 82 ++--- internal/util/credentials.go | 84 ++--- internal/util/idlocker.go | 106 +++---- .../util/reftracker/radoswrapper/fakerados.go | 18 +- 29 files changed, 921 insertions(+), 914 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 1ab1ff4fc..d89783d97 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups package cephfs import ( diff --git a/internal/cephfs/core/quiesce.go b/internal/cephfs/core/quiesce.go index d867e3ef9..74e233d2f 100644 --- a/internal/cephfs/core/quiesce.go +++ b/internal/cephfs/core/quiesce.go @@ -130,22 +130,6 @@ func (fq *fsQuiesce) GetVolumes() []Volume { return fq.volumes } -// getMembers returns the list of names in the format -// group/subvolume that are to be quiesced. This is the format that the -// ceph fs quiesce expects. -// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"]. -func (fq *fsQuiesce) getMembers() []string { - volName := []string{} - for svg, sb := range fq.subVolumeGroupMapping { - for _, s := range sb { - name := svg + "/" + s - volName = append(volName, name) - } - } - - return volName -} - func (fq *fsQuiesce) FSQuiesce( ctx context.Context, reserveName string, @@ -248,3 +232,19 @@ func (fq *fsQuiesce) ReleaseFSQuiesce(ctx context.Context, return nil, err } + +// getMembers returns the list of names in the format +// group/subvolume that are to be quiesced. This is the format that the +// ceph fs quiesce expects. +// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"]. +func (fq *fsQuiesce) getMembers() []string { + volName := []string{} + for svg, sb := range fq.subVolumeGroupMapping { + for _, s := range sb { + name := svg + "/" + s + volName = append(volName, name) + } + } + + return volName +} diff --git a/internal/cephfs/mounter/kernel.go b/internal/cephfs/mounter/kernel.go index 67f2530a0..260fdc570 100644 --- a/internal/cephfs/mounter/kernel.go +++ b/internal/cephfs/mounter/kernel.go @@ -58,6 +58,21 @@ func NewKernelMounter() KernelMounter { } } +func (m *kernelMounter) Mount( + ctx context.Context, + mountPoint string, + cr *util.Credentials, + volOptions *store.VolumeOptions, +) error { + if err := util.CreateMountPoint(mountPoint); err != nil { + return err + } + + return m.mountKernel(ctx, mountPoint, cr, volOptions) +} + +func (m *kernelMounter) Name() string { return "Ceph kernel client" } + func (m *kernelMounter) mountKernel( ctx context.Context, mountPoint string, @@ -103,21 +118,6 @@ func (m *kernelMounter) mountKernel( return err } -func (m *kernelMounter) Mount( - ctx context.Context, - mountPoint string, - cr *util.Credentials, - volOptions *store.VolumeOptions, -) error { - if err := util.CreateMountPoint(mountPoint); err != nil { - return err - } - - return m.mountKernel(ctx, mountPoint, cr, volOptions) -} - -func (m *kernelMounter) Name() string { return "Ceph kernel client" } - // filesystemSupported checks if the passed name of the filesystem is included // in /proc/filesystems. func filesystemSupported(fs string) bool { diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 9f1fdf368..2e427f52c 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups package cephfs import ( diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index b2bdbf0d4..93e659796 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file package store import ( diff --git a/internal/controller/persistentvolume/persistentvolume.go b/internal/controller/persistentvolume/persistentvolume.go index 58e8db596..6e7bf7c0e 100644 --- a/internal/controller/persistentvolume/persistentvolume.go +++ b/internal/controller/persistentvolume/persistentvolume.go @@ -60,6 +60,103 @@ func (r *ReconcilePersistentVolume) Add(mgr manager.Manager, config ctrl.Config) return add(mgr, newPVReconciler(mgr, config)) } +// Reconcile reconciles the PersistentVolume object and creates a new omap entries +// for the volume. +func (r *ReconcilePersistentVolume) Reconcile(ctx context.Context, + request reconcile.Request, +) (reconcile.Result, error) { + pv := &corev1.PersistentVolume{} + err := r.client.Get(ctx, request.NamespacedName, pv) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + + return reconcile.Result{}, err + } + // Check if the object is under deletion + if !pv.GetDeletionTimestamp().IsZero() { + return reconcile.Result{}, nil + } + + err = r.reconcilePV(ctx, pv) + if err != nil { + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// reconcilePV will extract the image details from the pv spec and regenerates +// the omap data. +func (r *ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.Object) error { + pv, ok := obj.(*corev1.PersistentVolume) + if !ok { + return nil + } + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName { + return nil + } + // PV is not attached to any PVC + if pv.Spec.ClaimRef == nil { + return nil + } + + pvcNamespace := pv.Spec.ClaimRef.Namespace + requestName := pv.Name + volumeHandler := pv.Spec.CSI.VolumeHandle + secretName := "" + secretNamespace := "" + // check static volume + static := checkStaticVolume(pv) + // if the volume is static, dont generate OMAP data + if static { + return nil + } + if pv.Spec.CSI.ControllerExpandSecretRef != nil { + secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name + secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace + } else if pv.Spec.CSI.NodeStageSecretRef != nil { + secretName = pv.Spec.CSI.NodeStageSecretRef.Name + secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace + } + + // Take lock to process only one volumeHandle at a time. + if ok := r.Locks.TryAcquire(pv.Spec.CSI.VolumeHandle); !ok { + return fmt.Errorf(util.VolumeOperationAlreadyExistsFmt, pv.Spec.CSI.VolumeHandle) + } + defer r.Locks.Release(pv.Spec.CSI.VolumeHandle) + + cr, err := r.getCredentials(ctx, secretName, secretNamespace) + if err != nil { + log.ErrorLogMsg("failed to get credentials from secret %s", err) + + return err + } + defer cr.DeleteCredentials() + + rbdVolID, err := rbd.RegenerateJournal( + pv.Spec.CSI.VolumeAttributes, + pv.Spec.ClaimRef.Name, + volumeHandler, + requestName, + pvcNamespace, + r.config.ClusterName, + r.config.InstanceID, + r.config.SetMetadata, + cr) + if err != nil { + log.ErrorLogMsg("failed to regenerate journal %s", err) + + return err + } + if rbdVolID != volumeHandler { + log.DebugLog(ctx, "volumeHandler changed from %s to %s", volumeHandler, rbdVolID) + } + + return nil +} + // newReconciler returns a ReconcilePersistentVolume. func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler { r := &ReconcilePersistentVolume{ @@ -133,100 +230,3 @@ func (r *ReconcilePersistentVolume) getCredentials( func checkStaticVolume(pv *corev1.PersistentVolume) bool { return pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" } - -// reconcilePV will extract the image details from the pv spec and regenerates -// the omap data. -func (r *ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.Object) error { - pv, ok := obj.(*corev1.PersistentVolume) - if !ok { - return nil - } - if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName { - return nil - } - // PV is not attached to any PVC - if pv.Spec.ClaimRef == nil { - return nil - } - - pvcNamespace := pv.Spec.ClaimRef.Namespace - requestName := pv.Name - volumeHandler := pv.Spec.CSI.VolumeHandle - secretName := "" - secretNamespace := "" - // check static volume - static := checkStaticVolume(pv) - // if the volume is static, dont generate OMAP data - if static { - return nil - } - if pv.Spec.CSI.ControllerExpandSecretRef != nil { - secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name - secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace - } else if pv.Spec.CSI.NodeStageSecretRef != nil { - secretName = pv.Spec.CSI.NodeStageSecretRef.Name - secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace - } - - // Take lock to process only one volumeHandle at a time. - if ok := r.Locks.TryAcquire(pv.Spec.CSI.VolumeHandle); !ok { - return fmt.Errorf(util.VolumeOperationAlreadyExistsFmt, pv.Spec.CSI.VolumeHandle) - } - defer r.Locks.Release(pv.Spec.CSI.VolumeHandle) - - cr, err := r.getCredentials(ctx, secretName, secretNamespace) - if err != nil { - log.ErrorLogMsg("failed to get credentials from secret %s", err) - - return err - } - defer cr.DeleteCredentials() - - rbdVolID, err := rbd.RegenerateJournal( - pv.Spec.CSI.VolumeAttributes, - pv.Spec.ClaimRef.Name, - volumeHandler, - requestName, - pvcNamespace, - r.config.ClusterName, - r.config.InstanceID, - r.config.SetMetadata, - cr) - if err != nil { - log.ErrorLogMsg("failed to regenerate journal %s", err) - - return err - } - if rbdVolID != volumeHandler { - log.DebugLog(ctx, "volumeHandler changed from %s to %s", volumeHandler, rbdVolID) - } - - return nil -} - -// Reconcile reconciles the PersistentVolume object and creates a new omap entries -// for the volume. -func (r *ReconcilePersistentVolume) Reconcile(ctx context.Context, - request reconcile.Request, -) (reconcile.Result, error) { - pv := &corev1.PersistentVolume{} - err := r.client.Get(ctx, request.NamespacedName, pv) - if err != nil { - if apierrors.IsNotFound(err) { - return reconcile.Result{}, nil - } - - return reconcile.Result{}, err - } - // Check if the object is under deletion - if !pv.GetDeletionTimestamp().IsZero() { - return reconcile.Result{}, nil - } - - err = r.reconcilePV(ctx, pv) - if err != nil { - return reconcile.Result{}, err - } - - return reconcile.Result{}, nil -} diff --git a/internal/controller/volumegroup/volumegroupreplicationcontent.go b/internal/controller/volumegroup/volumegroupreplicationcontent.go index ebe3f67fd..4c4766ab5 100644 --- a/internal/controller/volumegroup/volumegroupreplicationcontent.go +++ b/internal/controller/volumegroup/volumegroupreplicationcontent.go @@ -150,6 +150,29 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return nil } +// Reconcile reconciles the VolumeGroupReplicationContent object and creates a new omap entries +// for the volume group. +func (r *ReconcileVGRContent) Reconcile(ctx context.Context, + request reconcile.Request, +) (reconcile.Result, error) { + vgrc := &replicationv1alpha1.VolumeGroupReplicationContent{} + err := r.client.Get(ctx, request.NamespacedName, vgrc) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + + return reconcile.Result{}, err + } + + // Proceed with reconciliation only if the object is not marked for deletion. + if vgrc.GetDeletionTimestamp().IsZero() { + err = r.reconcileVGRContent(ctx, vgrc) + } + + return reconcile.Result{}, err +} + func (r *ReconcileVGRContent) getSecrets( ctx context.Context, name, @@ -222,26 +245,3 @@ func (r *ReconcileVGRContent) reconcileVGRContent(ctx context.Context, obj runti return nil } - -// Reconcile reconciles the VolumeGroupReplicationContent object and creates a new omap entries -// for the volume group. -func (r *ReconcileVGRContent) Reconcile(ctx context.Context, - request reconcile.Request, -) (reconcile.Result, error) { - vgrc := &replicationv1alpha1.VolumeGroupReplicationContent{} - err := r.client.Get(ctx, request.NamespacedName, vgrc) - if err != nil { - if apierrors.IsNotFound(err) { - return reconcile.Result{}, nil - } - - return reconcile.Result{}, err - } - - // Proceed with reconciliation only if the object is not marked for deletion. - if vgrc.GetDeletionTimestamp().IsZero() { - err = r.reconcileVGRContent(ctx, vgrc) - } - - return reconcile.Result{}, err -} diff --git a/internal/csi-addons/networkfence/fencing.go b/internal/csi-addons/networkfence/fencing.go index efa0de0fc..1009607b8 100644 --- a/internal/csi-addons/networkfence/fencing.go +++ b/internal/csi-addons/networkfence/fencing.go @@ -89,30 +89,123 @@ func NewNetworkFence( return nwFence, nil } -// addCephBlocklist adds an IP to ceph osd blocklist. -func (nf *NetworkFence) addCephBlocklist(ctx context.Context, ip string, useRange bool) error { - arg := []string{ - "--id", nf.cr.ID, - "--keyfile=" + nf.cr.KeyFile, - "-m", nf.Monitors, - } - // TODO: add blocklist till infinity. - // Currently, ceph does not provide the functionality to blocklist IPs - // for infinite time. As a workaround, add a blocklist for 5 YEARS to - // represent infinity from ceph-csi side. - // At any point in this time, the IPs can be unblocked by an UnfenceClusterReq. - // This needs to be updated once ceph provides functionality for the same. - cmd := []string{"osd", "blocklist"} - if useRange { - cmd = append(cmd, "range") - } - cmd = append(cmd, "add", ip, blocklistTime) - cmd = append(cmd, arg...) - _, stdErr, err := util.ExecCommand(ctx, "ceph", cmd...) +// AddClientEviction blocks access for all the IPs in the CIDR block +// using client eviction, it also blocks the entire CIDR. +func (nf *NetworkFence) AddClientEviction(ctx context.Context) error { + evictedIPs := make(map[string]bool) + // fetch active clients + activeClients, err := nf.listActiveClients(ctx) if err != nil { - return fmt.Errorf("failed to blocklist IP %q: %w stderr: %q", ip, err, stdErr) + return err + } + // iterate through CIDR blocks and check if any active client matches + for _, cidr := range nf.Cidr { + for _, client := range activeClients { + var clientIP string + clientIP, err = client.fetchIP() + if err != nil { + return fmt.Errorf("error fetching client IP: %w", err) + } + // check if the clientIP is in the CIDR block + if isIPInCIDR(ctx, clientIP, cidr) { + var clientID int + clientID, err = client.fetchID() + if err != nil { + return fmt.Errorf("error fetching client ID: %w", err) + } + // evict the client + err = nf.evictCephFSClient(ctx, clientID) + if err != nil { + return fmt.Errorf("error evicting client %d: %w", clientID, err) + } + log.DebugLog(ctx, "client %d has been evicted\n", clientID) + // add the CIDR to the list of blocklisted IPs + evictedIPs[clientIP] = true + } + } + } + + // add the range based blocklist for CIDR + err = nf.AddNetworkFence(ctx) + if err != nil { + return err + } + + return nil +} + +// RemoveNetworkFence unblocks access for all the IPs in the IP range mentioned via the CIDR block +// using a network fence. +// Unfencing one of the protocols(CephFS or RBD) suggests the node is expected to be recovered, so +// both CephFS and RBD are expected to work again too. +// example: +// Create RBD NetworkFence CR for one IP 10.10.10.10 +// Created CephFS NetworkFence CR for IP range but above IP comes in the Range +// Delete the CephFS Network Fence CR to unblocklist the IP +// So now the IP (10.10.10.10) is (un)blocklisted and can be used by both protocols. +func (nf *NetworkFence) RemoveNetworkFence(ctx context.Context) error { + hasBlocklistRangeSupport := true + // for each CIDR block, convert it into a range of IPs so as to undo blocklisting operation. + for _, cidr := range nf.Cidr { + // try range blocklist cmd, if invalid fallback to + // iterating through IP range. + if hasBlocklistRangeSupport { + err := nf.removeCephBlocklist(ctx, cidr, "", true) + if err == nil { + continue + } + if !strings.Contains(err.Error(), invalidCommandStr) { + return fmt.Errorf("failed to remove blocklist range %q: %w", cidr, err) + } + hasBlocklistRangeSupport = false + } + // fetch the list of IPs from a CIDR block + hosts, err := getIPRange(cidr) + if err != nil { + return fmt.Errorf("failed to convert CIDR block %s to corresponding IP range", cidr) + } + // remove ceph blocklist for each IP in the range mentioned by the CIDR + for _, host := range hosts { + // 0 is used as nonce here to tell ceph + // to remove the blocklist entry matching: :0/0 + // it is same as telling ceph to remove just the IP + // without specifying any port or nonce with it. + err := nf.removeCephBlocklist(ctx, host, "0", false) + if err != nil { + return err + } + } + } + + return nil +} + +func (nf *NetworkFence) RemoveClientEviction(ctx context.Context) error { + // Remove the CIDR block first + err := nf.RemoveNetworkFence(ctx) + if err != nil { + return err + } + + // Get the ceph blocklist + blocklist, err := nf.getCephBlocklist(ctx) + if err != nil { + return err + } + + // For each CIDR block, remove the IPs in the blocklist + // that fall under the CIDR with nonce + for _, cidr := range nf.Cidr { + hosts := nf.parseBlocklistForCIDR(ctx, blocklist, cidr) + log.DebugLog(ctx, "parsed blocklist for CIDR %s: %+v", cidr, hosts) + + for _, host := range hosts { + err := nf.removeCephBlocklist(ctx, host.IP, host.Nonce, false) + if err != nil { + return err + } + } } - log.DebugLog(ctx, "blocklisted IP %q successfully", ip) return nil } @@ -153,6 +246,34 @@ func (nf *NetworkFence) AddNetworkFence(ctx context.Context) error { return nil } +// addCephBlocklist adds an IP to ceph osd blocklist. +func (nf *NetworkFence) addCephBlocklist(ctx context.Context, ip string, useRange bool) error { + arg := []string{ + "--id", nf.cr.ID, + "--keyfile=" + nf.cr.KeyFile, + "-m", nf.Monitors, + } + // TODO: add blocklist till infinity. + // Currently, ceph does not provide the functionality to blocklist IPs + // for infinite time. As a workaround, add a blocklist for 5 YEARS to + // represent infinity from ceph-csi side. + // At any point in this time, the IPs can be unblocked by an UnfenceClusterReq. + // This needs to be updated once ceph provides functionality for the same. + cmd := []string{"osd", "blocklist"} + if useRange { + cmd = append(cmd, "range") + } + cmd = append(cmd, "add", ip, blocklistTime) + cmd = append(cmd, arg...) + _, stdErr, err := util.ExecCommand(ctx, "ceph", cmd...) + if err != nil { + return fmt.Errorf("failed to blocklist IP %q: %w stderr: %q", ip, err, stdErr) + } + log.DebugLog(ctx, "blocklisted IP %q successfully", ip) + + return nil +} + func (nf *NetworkFence) listActiveClients(ctx context.Context) ([]activeClient, error) { arg := []string{ "--id", nf.cr.ID, @@ -238,51 +359,6 @@ func (ac *activeClient) fetchID() (int, error) { return 0, fmt.Errorf("failed to extract client ID, incorrect format: %s", clientInfo) } -// AddClientEviction blocks access for all the IPs in the CIDR block -// using client eviction, it also blocks the entire CIDR. -func (nf *NetworkFence) AddClientEviction(ctx context.Context) error { - evictedIPs := make(map[string]bool) - // fetch active clients - activeClients, err := nf.listActiveClients(ctx) - if err != nil { - return err - } - // iterate through CIDR blocks and check if any active client matches - for _, cidr := range nf.Cidr { - for _, client := range activeClients { - var clientIP string - clientIP, err = client.fetchIP() - if err != nil { - return fmt.Errorf("error fetching client IP: %w", err) - } - // check if the clientIP is in the CIDR block - if isIPInCIDR(ctx, clientIP, cidr) { - var clientID int - clientID, err = client.fetchID() - if err != nil { - return fmt.Errorf("error fetching client ID: %w", err) - } - // evict the client - err = nf.evictCephFSClient(ctx, clientID) - if err != nil { - return fmt.Errorf("error evicting client %d: %w", clientID, err) - } - log.DebugLog(ctx, "client %d has been evicted\n", clientID) - // add the CIDR to the list of blocklisted IPs - evictedIPs[clientIP] = true - } - } - } - - // add the range based blocklist for CIDR - err = nf.AddNetworkFence(ctx) - if err != nil { - return err - } - - return nil -} - // getIPRange returns a list of IPs from the IP range // corresponding to a CIDR block. func getIPRange(cidr string) ([]string, error) { @@ -357,82 +433,6 @@ func (nf *NetworkFence) removeCephBlocklist(ctx context.Context, ip, nonce strin return nil } -// RemoveNetworkFence unblocks access for all the IPs in the IP range mentioned via the CIDR block -// using a network fence. -// Unfencing one of the protocols(CephFS or RBD) suggests the node is expected to be recovered, so -// both CephFS and RBD are expected to work again too. -// example: -// Create RBD NetworkFence CR for one IP 10.10.10.10 -// Created CephFS NetworkFence CR for IP range but above IP comes in the Range -// Delete the CephFS Network Fence CR to unblocklist the IP -// So now the IP (10.10.10.10) is (un)blocklisted and can be used by both protocols. -func (nf *NetworkFence) RemoveNetworkFence(ctx context.Context) error { - hasBlocklistRangeSupport := true - // for each CIDR block, convert it into a range of IPs so as to undo blocklisting operation. - for _, cidr := range nf.Cidr { - // try range blocklist cmd, if invalid fallback to - // iterating through IP range. - if hasBlocklistRangeSupport { - err := nf.removeCephBlocklist(ctx, cidr, "", true) - if err == nil { - continue - } - if !strings.Contains(err.Error(), invalidCommandStr) { - return fmt.Errorf("failed to remove blocklist range %q: %w", cidr, err) - } - hasBlocklistRangeSupport = false - } - // fetch the list of IPs from a CIDR block - hosts, err := getIPRange(cidr) - if err != nil { - return fmt.Errorf("failed to convert CIDR block %s to corresponding IP range", cidr) - } - // remove ceph blocklist for each IP in the range mentioned by the CIDR - for _, host := range hosts { - // 0 is used as nonce here to tell ceph - // to remove the blocklist entry matching: :0/0 - // it is same as telling ceph to remove just the IP - // without specifying any port or nonce with it. - err := nf.removeCephBlocklist(ctx, host, "0", false) - if err != nil { - return err - } - } - } - - return nil -} - -func (nf *NetworkFence) RemoveClientEviction(ctx context.Context) error { - // Remove the CIDR block first - err := nf.RemoveNetworkFence(ctx) - if err != nil { - return err - } - - // Get the ceph blocklist - blocklist, err := nf.getCephBlocklist(ctx) - if err != nil { - return err - } - - // For each CIDR block, remove the IPs in the blocklist - // that fall under the CIDR with nonce - for _, cidr := range nf.Cidr { - hosts := nf.parseBlocklistForCIDR(ctx, blocklist, cidr) - log.DebugLog(ctx, "parsed blocklist for CIDR %s: %+v", cidr, hosts) - - for _, host := range hosts { - err := nf.removeCephBlocklist(ctx, host.IP, host.Nonce, false) - if err != nil { - return err - } - } - } - - return nil -} - // getCephBlocklist fetches the ceph blocklist and returns it as a string. func (nf *NetworkFence) getCephBlocklist(ctx context.Context) (string, error) { arg := []string{ diff --git a/internal/csi-addons/server/server.go b/internal/csi-addons/server/server.go index c41e27100..fdf3abc9b 100644 --- a/internal/csi-addons/server/server.go +++ b/internal/csi-addons/server/server.go @@ -108,6 +108,15 @@ func (cas *CSIAddonsServer) Start(middlewareConfig csicommon.MiddlewareServerOpt return nil } +// Stop can be used to stop the internal gRPC server. +func (cas *CSIAddonsServer) Stop() { + if cas.server == nil { + return + } + + cas.server.GracefulStop() +} + // serve starts the actual process of listening for requests on the gRPC // server. This is a blocking call, so it should get executed in a go-routine. func (cas *CSIAddonsServer) serve(listener net.Listener) { @@ -121,12 +130,3 @@ func (cas *CSIAddonsServer) serve(listener net.Listener) { log.DefaultLog("the CSI-Addons server at %q has been stopped", listener.Addr()) } - -// Stop can be used to stop the internal gRPC server. -func (cas *CSIAddonsServer) Stop() { - if cas.server == nil { - return - } - - cas.server.GracefulStop() -} diff --git a/internal/health-checker/manager.go b/internal/health-checker/manager.go index 3d6b71fce..e59f12ff3 100644 --- a/internal/health-checker/manager.go +++ b/internal/health-checker/manager.go @@ -99,6 +99,46 @@ func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerTyp return hcm.createChecker(volumeID, path, ct, false) } +func (hcm *healthCheckManager) StopSharedChecker(volumeID string) { + hcm.StopChecker(volumeID, "") +} + +func (hcm *healthCheckManager) StopChecker(volumeID, path string) { + old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path)) + if !ok { + // nothing was loaded, nothing to do + return + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + // failed to cast, should not be possible + return + } + cc.stop() +} + +func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) { + // load the 'old' ConditionChecker if it exists + old, ok := hcm.checkers.Load(volumeID) + if !ok { + // try fallback which include an optional (unique) path (usually publishTargetPath) + old, ok = hcm.checkers.Load(fallbackKey(volumeID, path)) + if !ok { + return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID) + } + } + + // 'old' was loaded, cast it to ConditionChecker + cc, ok := old.(ConditionChecker) + if !ok { + return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) + } + + return cc.isHealthy() +} + // createChecker decides based on the CheckerType what checker to start for // the volume. func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error { @@ -158,46 +198,6 @@ func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path return nil } -func (hcm *healthCheckManager) StopSharedChecker(volumeID string) { - hcm.StopChecker(volumeID, "") -} - -func (hcm *healthCheckManager) StopChecker(volumeID, path string) { - old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path)) - if !ok { - // nothing was loaded, nothing to do - return - } - - // 'old' was loaded, cast it to ConditionChecker - cc, ok := old.(ConditionChecker) - if !ok { - // failed to cast, should not be possible - return - } - cc.stop() -} - -func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) { - // load the 'old' ConditionChecker if it exists - old, ok := hcm.checkers.Load(volumeID) - if !ok { - // try fallback which include an optional (unique) path (usually publishTargetPath) - old, ok = hcm.checkers.Load(fallbackKey(volumeID, path)) - if !ok { - return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID) - } - } - - // 'old' was loaded, cast it to ConditionChecker - cc, ok := old.(ConditionChecker) - if !ok { - return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID) - } - - return cc.isHealthy() -} - // fallbackKey returns the key for a checker in the map. If the path is empty, // it is assumed that the key'd checked is shared. func fallbackKey(volumeID, path string) string { diff --git a/internal/journal/volumegroupjournal.go b/internal/journal/volumegroupjournal.go index 39e92384a..38a7c679b 100644 --- a/internal/journal/volumegroupjournal.go +++ b/internal/journal/volumegroupjournal.go @@ -109,11 +109,6 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig { } } -// SetNamespace sets the namespace for the journal. -func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) { - vgc.Config.namespace = ns -} - // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // volume groups using a predetermined namespace value. func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig { @@ -123,6 +118,11 @@ func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal return j } +// SetNamespace sets the namespace for the journal. +func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) { + vgc.Config.namespace = ns +} + // Connect establishes a new connection to a ceph cluster for journal metadata. func (vgc *VolumeGroupJournalConfig) Connect( monitors, diff --git a/internal/kms/aws_metadata.go b/internal/kms/aws_metadata.go index a8a67bb5a..14c6778e4 100644 --- a/internal/kms/aws_metadata.go +++ b/internal/kms/aws_metadata.go @@ -124,35 +124,6 @@ func initAWSMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) { return kms, nil } -func (kms *awsMetadataKMS) getSecrets() (map[string]interface{}, error) { - c, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("failed to connect to Kubernetes to "+ - "get Secret %s/%s: %w", kms.namespace, kms.secretName, err) - } - - secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), - kms.secretName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get Secret %s/%s: %w", - kms.namespace, kms.secretName, err) - } - - config := make(map[string]interface{}) - - for k, v := range secret.Data { - switch k { - case awsSecretAccessKey, awsAccessKey, awsSessionToken, awsCMK: - config[k] = string(v) - default: - return nil, fmt.Errorf("unsupported option for KMS "+ - "provider %q: %s", kmsTypeAWSMetadata, k) - } - } - - return config, nil -} - func (kms *awsMetadataKMS) Destroy() { // Nothing to do. } @@ -164,24 +135,6 @@ func (kms *awsMetadataKMS) RequiresDEKStore() DEKStoreType { return DEKStoreMetadata } -func (kms *awsMetadataKMS) getService() (*awsKMS.KMS, error) { - creds := awsCreds.NewStaticCredentials(kms.accessKey, - kms.secretAccessKey, kms.sessionToken) - - sess, err := awsSession.NewSessionWithOptions(awsSession.Options{ - SharedConfigState: awsSession.SharedConfigDisable, - Config: aws.Config{ - Credentials: creds, - Region: aws.String(kms.region), - }, - }) - if err != nil { - return nil, fmt.Errorf("failed to create AWS session: %w", err) - } - - return awsKMS.New(sess), nil -} - // EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK. func (kms *awsMetadataKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) { svc, err := kms.getService() @@ -230,3 +183,50 @@ func (kms *awsMetadataKMS) DecryptDEK(ctx context.Context, volumeID, encryptedDE func (kms *awsMetadataKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { return "", ErrGetSecretUnsupported } + +func (kms *awsMetadataKMS) getSecrets() (map[string]interface{}, error) { + c, err := k8s.NewK8sClient() + if err != nil { + return nil, fmt.Errorf("failed to connect to Kubernetes to "+ + "get Secret %s/%s: %w", kms.namespace, kms.secretName, err) + } + + secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), + kms.secretName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get Secret %s/%s: %w", + kms.namespace, kms.secretName, err) + } + + config := make(map[string]interface{}) + + for k, v := range secret.Data { + switch k { + case awsSecretAccessKey, awsAccessKey, awsSessionToken, awsCMK: + config[k] = string(v) + default: + return nil, fmt.Errorf("unsupported option for KMS "+ + "provider %q: %s", kmsTypeAWSMetadata, k) + } + } + + return config, nil +} + +func (kms *awsMetadataKMS) getService() (*awsKMS.KMS, error) { + creds := awsCreds.NewStaticCredentials(kms.accessKey, + kms.secretAccessKey, kms.sessionToken) + + sess, err := awsSession.NewSessionWithOptions(awsSession.Options{ + SharedConfigState: awsSession.SharedConfigDisable, + Config: aws.Config{ + Credentials: creds, + Region: aws.String(kms.region), + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create AWS session: %w", err) + } + + return awsKMS.New(sess), nil +} diff --git a/internal/kms/aws_sts_metadata.go b/internal/kms/aws_sts_metadata.go index 7dd70ed59..587ae4992 100644 --- a/internal/kms/aws_sts_metadata.go +++ b/internal/kms/aws_sts_metadata.go @@ -115,6 +115,49 @@ func initAWSSTSMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) { return kms, nil } +// EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK. +func (as *awsSTSMetadataKMS) EncryptDEK(ctx context.Context, _, plainDEK string) (string, error) { + svc, err := as.getServiceWithSTS() + if err != nil { + return "", fmt.Errorf("failed to get KMS service: %w", err) + } + + result, err := svc.Encrypt(&awsKMS.EncryptInput{ + KeyId: aws.String(as.cmk), + Plaintext: []byte(plainDEK), + }) + if err != nil { + return "", fmt.Errorf("failed to encrypt DEK: %w", err) + } + + // base64 encode the encrypted DEK, so that storing it should not have + // issues + return base64.StdEncoding.EncodeToString(result.CiphertextBlob), nil +} + +// DecryptDEK uses the Amazon KMS and the configured CMK to decrypt the DEK. +func (as *awsSTSMetadataKMS) DecryptDEK(ctx context.Context, _, encryptedDEK string) (string, error) { + svc, err := as.getServiceWithSTS() + if err != nil { + return "", fmt.Errorf("failed to get KMS service: %w", err) + } + + ciphertextBlob, err := base64.StdEncoding.DecodeString(encryptedDEK) + if err != nil { + return "", fmt.Errorf("failed to decode base64 cipher: %w", + err) + } + + result, err := svc.Decrypt(&awsKMS.DecryptInput{ + CiphertextBlob: ciphertextBlob, + }) + if err != nil { + return "", fmt.Errorf("failed to decrypt DEK: %w", err) + } + + return string(result.Plaintext), nil +} + // getSecrets returns required STS configuration options from the Kubernetes Secret. func (as *awsSTSMetadataKMS) getSecrets() (map[string]string, error) { c, err := k8s.NewK8sClient() @@ -191,46 +234,3 @@ func (as *awsSTSMetadataKMS) getServiceWithSTS() (*awsKMS.KMS, error) { return awsKMS.New(sess), nil } - -// EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK. -func (as *awsSTSMetadataKMS) EncryptDEK(ctx context.Context, _, plainDEK string) (string, error) { - svc, err := as.getServiceWithSTS() - if err != nil { - return "", fmt.Errorf("failed to get KMS service: %w", err) - } - - result, err := svc.Encrypt(&awsKMS.EncryptInput{ - KeyId: aws.String(as.cmk), - Plaintext: []byte(plainDEK), - }) - if err != nil { - return "", fmt.Errorf("failed to encrypt DEK: %w", err) - } - - // base64 encode the encrypted DEK, so that storing it should not have - // issues - return base64.StdEncoding.EncodeToString(result.CiphertextBlob), nil -} - -// DecryptDEK uses the Amazon KMS and the configured CMK to decrypt the DEK. -func (as *awsSTSMetadataKMS) DecryptDEK(ctx context.Context, _, encryptedDEK string) (string, error) { - svc, err := as.getServiceWithSTS() - if err != nil { - return "", fmt.Errorf("failed to get KMS service: %w", err) - } - - ciphertextBlob, err := base64.StdEncoding.DecodeString(encryptedDEK) - if err != nil { - return "", fmt.Errorf("failed to decode base64 cipher: %w", - err) - } - - result, err := svc.Decrypt(&awsKMS.DecryptInput{ - CiphertextBlob: ciphertextBlob, - }) - if err != nil { - return "", fmt.Errorf("failed to decrypt DEK: %w", err) - } - - return string(result.Plaintext), nil -} diff --git a/internal/kms/azure_vault.go b/internal/kms/azure_vault.go index 5f3dc5ca9..036c868cf 100644 --- a/internal/kms/azure_vault.go +++ b/internal/kms/azure_vault.go @@ -117,50 +117,6 @@ func (kms *azureKMS) Destroy() { // Nothing to do. } -func (kms *azureKMS) getService() (*azsecrets.Client, error) { - certs, key, err := azidentity.ParseCertificates([]byte(kms.clientCertificate), []byte{}) - if err != nil { - return nil, fmt.Errorf("failed to parse Azure client certificate: %w", err) - } - creds, err := azidentity.NewClientCertificateCredential(kms.tenantID, kms.clientID, certs, key, nil) - if err != nil { - return nil, fmt.Errorf("failed to create Azure credentials: %w", err) - } - - azClient, err := azsecrets.NewClient(kms.vaultURL, creds, nil) - if err != nil { - return nil, fmt.Errorf("failed to create Azure client: %w", err) - } - - return azClient, nil -} - -func (kms *azureKMS) getSecrets() (map[string]interface{}, error) { - c, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("failed to connect to kubernetes to "+ - "get secret %s/%s: %w", kms.namespace, kms.secretName, err) - } - - secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), - kms.secretName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get secret %s/%s: %w", kms.namespace, kms.secretName, err) - } - - config := make(map[string]interface{}) - for k, v := range secret.Data { - switch k { - case azureClientCertificate: - config[k] = string(v) - default: - return nil, fmt.Errorf("unsupported option for KMS provider %q: %s", kmsTypeAzure, k) - } - } - - return config, nil -} - // FetchDEK returns passphrase from Azure key vault. func (kms *azureKMS) FetchDEK(ctx context.Context, key string) (string, error) { svc, err := kms.getService() @@ -208,3 +164,47 @@ func (kms *azureKMS) RemoveDEK(ctx context.Context, key string) error { return nil } + +func (kms *azureKMS) getService() (*azsecrets.Client, error) { + certs, key, err := azidentity.ParseCertificates([]byte(kms.clientCertificate), []byte{}) + if err != nil { + return nil, fmt.Errorf("failed to parse Azure client certificate: %w", err) + } + creds, err := azidentity.NewClientCertificateCredential(kms.tenantID, kms.clientID, certs, key, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure credentials: %w", err) + } + + azClient, err := azsecrets.NewClient(kms.vaultURL, creds, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure client: %w", err) + } + + return azClient, nil +} + +func (kms *azureKMS) getSecrets() (map[string]interface{}, error) { + c, err := k8s.NewK8sClient() + if err != nil { + return nil, fmt.Errorf("failed to connect to kubernetes to "+ + "get secret %s/%s: %w", kms.namespace, kms.secretName, err) + } + + secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), + kms.secretName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get secret %s/%s: %w", kms.namespace, kms.secretName, err) + } + + config := make(map[string]interface{}) + for k, v := range secret.Data { + switch k { + case azureClientCertificate: + config[k] = string(v) + default: + return nil, fmt.Errorf("unsupported option for KMS provider %q: %s", kmsTypeAzure, k) + } + } + + return config, nil +} diff --git a/internal/kms/keyprotect.go b/internal/kms/keyprotect.go index fdb70a99b..11d6b5192 100644 --- a/internal/kms/keyprotect.go +++ b/internal/kms/keyprotect.go @@ -147,35 +147,6 @@ func initKeyProtectKMS(args ProviderInitArgs) (EncryptionKMS, error) { return kms, nil } -func (kms *keyProtectKMS) getSecrets() (map[string]interface{}, error) { - c, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("failed to connect to Kubernetes to "+ - "get Secret %s/%s: %w", kms.namespace, kms.secretName, err) - } - - secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), - kms.secretName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get Secret %s/%s: %w", - kms.namespace, kms.secretName, err) - } - - config := make(map[string]interface{}) - - for k, v := range secret.Data { - switch k { - case keyProtectServiceAPIKey, KeyProtectCustomerRootKey, keyProtectSessionToken, keyProtectCRK: - config[k] = string(v) - default: - return nil, fmt.Errorf("unsupported option for KMS "+ - "provider %q: %s", kmsTypeKeyProtectMetadata, k) - } - } - - return config, nil -} - func (kms *keyProtectKMS) Destroy() { // Nothing to do. } @@ -184,25 +155,6 @@ func (kms *keyProtectKMS) RequiresDEKStore() DEKStoreType { return DEKStoreMetadata } -func (kms *keyProtectKMS) getService() error { - // Use your Service API Key and your KeyProtect Service Instance ID to create a ClientConfig - cc := kp.ClientConfig{ - BaseURL: kms.baseURL, - TokenURL: kms.tokenURL, - APIKey: kms.serviceAPIKey, - InstanceID: kms.serviceInstanceID, - } - - // Build a new client from the config - client, err := kp.New(cc, kp.DefaultTransport()) - if err != nil { - return fmt.Errorf("failed to create keyprotect client: %w", err) - } - kms.client = client - - return nil -} - // EncryptDEK uses the KeyProtect KMS and the configured CRK to encrypt the DEK. func (kms *keyProtectKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) { if err := kms.getService(); err != nil { @@ -246,3 +198,51 @@ func (kms *keyProtectKMS) DecryptDEK(ctx context.Context, volumeID, encryptedDEK func (kms *keyProtectKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { return "", ErrGetSecretUnsupported } + +func (kms *keyProtectKMS) getSecrets() (map[string]interface{}, error) { + c, err := k8s.NewK8sClient() + if err != nil { + return nil, fmt.Errorf("failed to connect to Kubernetes to "+ + "get Secret %s/%s: %w", kms.namespace, kms.secretName, err) + } + + secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(), + kms.secretName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get Secret %s/%s: %w", + kms.namespace, kms.secretName, err) + } + + config := make(map[string]interface{}) + + for k, v := range secret.Data { + switch k { + case keyProtectServiceAPIKey, KeyProtectCustomerRootKey, keyProtectSessionToken, keyProtectCRK: + config[k] = string(v) + default: + return nil, fmt.Errorf("unsupported option for KMS "+ + "provider %q: %s", kmsTypeKeyProtectMetadata, k) + } + } + + return config, nil +} + +func (kms *keyProtectKMS) getService() error { + // Use your Service API Key and your KeyProtect Service Instance ID to create a ClientConfig + cc := kp.ClientConfig{ + BaseURL: kms.baseURL, + TokenURL: kms.tokenURL, + APIKey: kms.serviceAPIKey, + InstanceID: kms.serviceInstanceID, + } + + // Build a new client from the config + client, err := kp.New(cc, kp.DefaultTransport()) + if err != nil { + return fmt.Errorf("failed to create keyprotect client: %w", err) + } + kms.client = client + + return nil +} diff --git a/internal/kms/kmip.go b/internal/kms/kmip.go index 7296abd5b..3e25bbd59 100644 --- a/internal/kms/kmip.go +++ b/internal/kms/kmip.go @@ -293,6 +293,10 @@ func (kms *kmipKMS) RequiresDEKStore() DEKStoreType { return DEKStoreMetadata } +func (kms *kmipKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { + return "", ErrGetSecretUnsupported +} + // getSecrets returns required options from the Kubernetes Secret. func (kms *kmipKMS) getSecrets() (map[string]string, error) { c, err := k8s.NewK8sClient() @@ -500,10 +504,6 @@ func (kms *kmipKMS) verifyResponse( return &batchItem, nil } -func (kms *kmipKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { - return "", ErrGetSecretUnsupported -} - // TODO: use the following structs from https://github.com/gemalto/kmip-go // when https://github.com/ThalesGroup/kmip-go/issues/21 is resolved. // refer: https://docs.oasis-open.org/kmip/spec/v1.4/kmip-spec-v1.4.html. diff --git a/internal/kms/secretskms.go b/internal/kms/secretskms.go index 6514906e2..591b6bd59 100644 --- a/internal/kms/secretskms.go +++ b/internal/kms/secretskms.go @@ -116,53 +116,6 @@ func initSecretsMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) { return smKMS, nil } -// fetchEncryptionPassphrase fetches encryptionPassphrase from user provided secret. -func (kms secretsMetadataKMS) fetchEncryptionPassphrase( - config map[string]interface{}, - defaultNamespace string, -) (string, error) { - var ( - secretName string - secretNamespace string - ) - - err := setConfigString(&secretName, config, metadataSecretNameKey) - if err != nil { - return "", err - } - - err = setConfigString(&secretNamespace, config, metadataSecretNamespaceKey) - if err != nil { - if !errors.Is(err, errConfigOptionMissing) { - return "", err - } - // if 'secretNamespace' option is not specified, defaults to namespace in - // which PVC was created - secretNamespace = defaultNamespace - } - - c, err := k8s.NewK8sClient() - if err != nil { - return "", fmt.Errorf("can not get Secret %s/%s, failed to "+ - "connect to Kubernetes: %w", secretNamespace, secretName, err) - } - - secret, err := c.CoreV1().Secrets(secretNamespace).Get(context.TODO(), - secretName, metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("failed to get Secret %s/%s: %w", - secretNamespace, secretName, err) - } - - passphraseValue, ok := secret.Data[encryptionPassphraseKey] - if !ok { - return "", fmt.Errorf("missing %q in Secret %s/%s", - encryptionPassphraseKey, secretNamespace, secretName) - } - - return string(passphraseValue), nil -} - // Destroy frees all used resources. func (kms secretsMetadataKMS) Destroy() { // nothing to do @@ -287,6 +240,53 @@ func (kms secretsMetadataKMS) GetSecret(ctx context.Context, volumeID string) (s return kms.FetchDEK(ctx, volumeID) } +// fetchEncryptionPassphrase fetches encryptionPassphrase from user provided secret. +func (kms secretsMetadataKMS) fetchEncryptionPassphrase( + config map[string]interface{}, + defaultNamespace string, +) (string, error) { + var ( + secretName string + secretNamespace string + ) + + err := setConfigString(&secretName, config, metadataSecretNameKey) + if err != nil { + return "", err + } + + err = setConfigString(&secretNamespace, config, metadataSecretNamespaceKey) + if err != nil { + if !errors.Is(err, errConfigOptionMissing) { + return "", err + } + // if 'secretNamespace' option is not specified, defaults to namespace in + // which PVC was created + secretNamespace = defaultNamespace + } + + c, err := k8s.NewK8sClient() + if err != nil { + return "", fmt.Errorf("can not get Secret %s/%s, failed to "+ + "connect to Kubernetes: %w", secretNamespace, secretName, err) + } + + secret, err := c.CoreV1().Secrets(secretNamespace).Get(context.TODO(), + secretName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("failed to get Secret %s/%s: %w", + secretNamespace, secretName, err) + } + + passphraseValue, ok := secret.Data[encryptionPassphraseKey] + if !ok { + return "", fmt.Errorf("missing %q in Secret %s/%s", + encryptionPassphraseKey, secretNamespace, secretName) + } + + return string(passphraseValue), nil +} + // generateCipher returns a AEAD cipher based on a passphrase and salt // (volumeID). The cipher can then be used to encrypt/decrypt the DEK. func generateCipher(passphrase, salt string) (cipher.AEAD, error) { diff --git a/internal/kms/vault.go b/internal/kms/vault.go index efc34fa5b..de2d06abd 100644 --- a/internal/kms/vault.go +++ b/internal/kms/vault.go @@ -122,6 +122,19 @@ func setConfigString(option *string, config map[string]interface{}, key string) return nil } +// Destroy frees allocated resources. For a vaultConnection that means removing +// the created temporary files. +func (vc *vaultConnection) Destroy() { + if vc.vaultConfig != nil { + tmpFile, ok := vc.vaultConfig[api.EnvVaultCACert] + if ok { + // ignore error on failure to remove tmpfile (gosec complains) + //nolint:forcetypeassert,errcheck // ignore error on failure to remove tmpfile + _ = os.Remove(tmpFile.(string)) + } + } +} + // initConnection sets VAULT_* environment variables in the vc.vaultConfig map, // these settings will be used when connecting to the Vault service with // vc.connectVault(). @@ -298,19 +311,6 @@ func (vc *vaultConnection) connectVault() error { return nil } -// Destroy frees allocated resources. For a vaultConnection that means removing -// the created temporary files. -func (vc *vaultConnection) Destroy() { - if vc.vaultConfig != nil { - tmpFile, ok := vc.vaultConfig[api.EnvVaultCACert] - if ok { - // ignore error on failure to remove tmpfile (gosec complains) - //nolint:forcetypeassert,errcheck // ignore error on failure to remove tmpfile - _ = os.Remove(tmpFile.(string)) - } - } -} - // getDeleteKeyContext creates a new KeyContext that has an optional value set // to destroy the contents of secrets. This is configurable with the // `vaultDestroyKeys` configuration parameter. diff --git a/internal/kms/vault_tokens.go b/internal/kms/vault_tokens.go index c0fcc431e..159cb1033 100644 --- a/internal/kms/vault_tokens.go +++ b/internal/kms/vault_tokens.go @@ -281,6 +281,54 @@ func initVaultTokensKMS(args ProviderInitArgs) (EncryptionKMS, error) { return kms, nil } +// FetchDEK returns passphrase from Vault. The passphrase is stored in a +// data.data.passphrase structure. +func (vtc *vaultTenantConnection) FetchDEK(ctx context.Context, key string) (string, error) { + // Since the second return variable loss.Version is not used, there it is ignored. + s, _, err := vtc.secrets.GetSecret(key, vtc.keyContext) + if err != nil { + return "", err + } + + data, ok := s["data"].(map[string]interface{}) + if !ok { + return "", fmt.Errorf("failed parsing data for get passphrase request for %s", key) + } + passphrase, ok := data["passphrase"].(string) + if !ok { + return "", fmt.Errorf("failed parsing passphrase for get passphrase request for %s", key) + } + + return passphrase, nil +} + +// StoreDEK saves new passphrase in Vault. +func (vtc *vaultTenantConnection) StoreDEK(ctx context.Context, key, value string) error { + data := map[string]interface{}{ + "data": map[string]string{ + "passphrase": value, + }, + } + + // Since the first return variable loss.Version is not used, there it is ignored. + _, err := vtc.secrets.PutSecret(key, data, vtc.keyContext) + if err != nil { + return fmt.Errorf("saving passphrase at %s request to vault failed: %w", key, err) + } + + return nil +} + +// RemoveDEK deletes passphrase from Vault. +func (vtc *vaultTenantConnection) RemoveDEK(ctx context.Context, key string) error { + err := vtc.secrets.DeleteSecret(key, vtc.getDeleteKeyContext()) + if err != nil { + return fmt.Errorf("delete passphrase at %s request to vault failed: %w", key, err) + } + + return nil +} + func (kms *vaultTokensKMS) configureTenant(config map[string]interface{}, tenant string) error { kms.Tenant = tenant tenantConfig, found := fetchTenantConfig(config, tenant) @@ -461,54 +509,6 @@ func (vtc *vaultTenantConnection) getK8sClient() (*kubernetes.Clientset, error) return vtc.client, nil } -// FetchDEK returns passphrase from Vault. The passphrase is stored in a -// data.data.passphrase structure. -func (vtc *vaultTenantConnection) FetchDEK(ctx context.Context, key string) (string, error) { - // Since the second return variable loss.Version is not used, there it is ignored. - s, _, err := vtc.secrets.GetSecret(key, vtc.keyContext) - if err != nil { - return "", err - } - - data, ok := s["data"].(map[string]interface{}) - if !ok { - return "", fmt.Errorf("failed parsing data for get passphrase request for %s", key) - } - passphrase, ok := data["passphrase"].(string) - if !ok { - return "", fmt.Errorf("failed parsing passphrase for get passphrase request for %s", key) - } - - return passphrase, nil -} - -// StoreDEK saves new passphrase in Vault. -func (vtc *vaultTenantConnection) StoreDEK(ctx context.Context, key, value string) error { - data := map[string]interface{}{ - "data": map[string]string{ - "passphrase": value, - }, - } - - // Since the first return variable loss.Version is not used, there it is ignored. - _, err := vtc.secrets.PutSecret(key, data, vtc.keyContext) - if err != nil { - return fmt.Errorf("saving passphrase at %s request to vault failed: %w", key, err) - } - - return nil -} - -// RemoveDEK deletes passphrase from Vault. -func (vtc *vaultTenantConnection) RemoveDEK(ctx context.Context, key string) error { - err := vtc.secrets.DeleteSecret(key, vtc.getDeleteKeyContext()) - if err != nil { - return fmt.Errorf("delete passphrase at %s request to vault failed: %w", key, err) - } - - return nil -} - func (kms *vaultTokensKMS) getToken() (string, error) { c, err := kms.getK8sClient() if err != nil { diff --git a/internal/nfs/controller/volume.go b/internal/nfs/controller/volume.go index f8ca47d2b..39d06ea40 100644 --- a/internal/nfs/controller/volume.go +++ b/internal/nfs/controller/volume.go @@ -187,26 +187,6 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error { return nil } -// createExportCommand returns the "ceph nfs export create ..." command -// arguments (without "ceph"). The order of the parameters matches old Ceph -// releases, new Ceph releases added --option formats, which can be added when -// passing the parameters to this function. -func (nv *NFSVolume) createExportCommand(nfsCluster, fs, export, path string) []string { - return []string{ - "--id", nv.cr.ID, - "--keyfile=" + nv.cr.KeyFile, - "-m", nv.mons, - "nfs", - "export", - "create", - "cephfs", - fs, - nfsCluster, - export, - path, - } -} - // DeleteExport removes the NFS-export from the Ceph managed NFS-server. func (nv *NFSVolume) DeleteExport() error { if !nv.connected { @@ -250,6 +230,26 @@ func (nv *NFSVolume) DeleteExport() error { return nil } +// createExportCommand returns the "ceph nfs export create ..." command +// arguments (without "ceph"). The order of the parameters matches old Ceph +// releases, new Ceph releases added --option formats, which can be added when +// passing the parameters to this function. +func (nv *NFSVolume) createExportCommand(nfsCluster, fs, export, path string) []string { + return []string{ + "--id", nv.cr.ID, + "--keyfile=" + nv.cr.KeyFile, + "-m", nv.mons, + "nfs", + "export", + "create", + "cephfs", + fs, + nfsCluster, + export, + path, + } +} + // deleteExportCommand returns the "ceph nfs export delete ..." command // arguments (without "ceph"). Old releases of Ceph expect "delete" as cmd, // newer releases use "rm". diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index cb784df85..3264f0673 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups package rbd import ( diff --git a/internal/rbd/group/util.go b/internal/rbd/group/util.go index b16319c5f..3af9986be 100644 --- a/internal/rbd/group/util.go +++ b/internal/rbd/group/util.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file package group import ( diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 966564507..306f001ba 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -67,113 +67,6 @@ func (mgr *rbdManager) Destroy(ctx context.Context) { } } -// getCredentials sets up credentials and connects to the journal. -func (mgr *rbdManager) getCredentials() (*util.Credentials, error) { - if mgr.creds != nil { - return mgr.creds, nil - } - - creds, err := util.NewUserCredentials(mgr.secrets) - if err != nil { - return nil, fmt.Errorf("failed to get credentials: %w", err) - } - - mgr.creds = creds - - return creds, nil -} - -// getVolumeGroupNamePrefix returns the prefix for the volume group if set, or -// an empty string if none is configured. -func (mgr *rbdManager) getVolumeGroupNamePrefix() string { - return mgr.parameters["volumeGroupNamePrefix"] -} - -func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) { - if mgr.vgJournal != nil { - return mgr.vgJournal, nil - } - - creds, err := mgr.getCredentials() - if err != nil { - return nil, err - } - - monitors, err := util.Mons(util.CsiConfigFile, clusterID) - if err != nil { - return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) - } - - ns, err := util.GetRBDRadosNamespace(util.CsiConfigFile, clusterID) - if err != nil { - return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err) - } - - vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.driverInstance, ns) - - vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds) - if err != nil { - return nil, fmt.Errorf("failed to connect to journal: %w", err) - } - - mgr.vgJournal = vgJournal - - return vgJournal, nil -} - -// getGroupUUID checks if a UUID in the volume group journal is already -// reserved. If none is reserved, a new reservation is made. Upon exit of -// getGroupUUID, the function returns: -// 1. the UUID that was reserved -// 2. an undo() function that reverts the reservation (if that succeeded), should be called in a defer -// 3. an error or nil. -func (mgr *rbdManager) getGroupUUID( - ctx context.Context, - clusterID, journalPool, name string, -) (string, func(), error) { - nothingToUndo := func() { - // the reservation was not done, no need to undo the reservation - } - - prefix := mgr.getVolumeGroupNamePrefix() - - vgJournal, err := mgr.getVolumeGroupJournal(clusterID) - if err != nil { - return "", nothingToUndo, err - } - - vgsData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix) - if err != nil { - return "", nothingToUndo, fmt.Errorf("failed to check reservation for group %q: %w", name, err) - } - - var uuid string - if vgsData != nil && vgsData.GroupUUID != "" { - uuid = vgsData.GroupUUID - } else { - log.DebugLog(ctx, "the journal does not contain a reservation for group %q yet", name) - - uuid, _ /*vgsName*/, err = vgJournal.ReserveName(ctx, journalPool, name, uuid, prefix) - if err != nil { - return "", nothingToUndo, fmt.Errorf("failed to reserve a UUID for group %q: %w", name, err) - } - } - - log.DebugLog(ctx, "got UUID %q for group %q", uuid, name) - - // undo contains the cleanup that should be done by the caller when the - // reservation was made, and further actions fulfilling the final - // request failed - undo := func() { - err = vgJournal.UndoReservation(ctx, journalPool, uuid, name) - if err != nil { - log.ErrorLog(ctx, "failed to undo the reservation for group %q: %w", name, err) - } - } - - return uuid, undo, nil -} - func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) { creds, err := mgr.getCredentials() if err != nil { @@ -725,3 +618,110 @@ func (mgr *rbdManager) VolumesInSameGroup(ctx context.Context, volumes []types.V return true, nil } + +// getCredentials sets up credentials and connects to the journal. +func (mgr *rbdManager) getCredentials() (*util.Credentials, error) { + if mgr.creds != nil { + return mgr.creds, nil + } + + creds, err := util.NewUserCredentials(mgr.secrets) + if err != nil { + return nil, fmt.Errorf("failed to get credentials: %w", err) + } + + mgr.creds = creds + + return creds, nil +} + +// getVolumeGroupNamePrefix returns the prefix for the volume group if set, or +// an empty string if none is configured. +func (mgr *rbdManager) getVolumeGroupNamePrefix() string { + return mgr.parameters["volumeGroupNamePrefix"] +} + +func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) { + if mgr.vgJournal != nil { + return mgr.vgJournal, nil + } + + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + monitors, err := util.Mons(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) + } + + ns, err := util.GetRBDRadosNamespace(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err) + } + + vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.driverInstance, ns) + + vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds) + if err != nil { + return nil, fmt.Errorf("failed to connect to journal: %w", err) + } + + mgr.vgJournal = vgJournal + + return vgJournal, nil +} + +// getGroupUUID checks if a UUID in the volume group journal is already +// reserved. If none is reserved, a new reservation is made. Upon exit of +// getGroupUUID, the function returns: +// 1. the UUID that was reserved +// 2. an undo() function that reverts the reservation (if that succeeded), should be called in a defer +// 3. an error or nil. +func (mgr *rbdManager) getGroupUUID( + ctx context.Context, + clusterID, journalPool, name string, +) (string, func(), error) { + nothingToUndo := func() { + // the reservation was not done, no need to undo the reservation + } + + prefix := mgr.getVolumeGroupNamePrefix() + + vgJournal, err := mgr.getVolumeGroupJournal(clusterID) + if err != nil { + return "", nothingToUndo, err + } + + vgsData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix) + if err != nil { + return "", nothingToUndo, fmt.Errorf("failed to check reservation for group %q: %w", name, err) + } + + var uuid string + if vgsData != nil && vgsData.GroupUUID != "" { + uuid = vgsData.GroupUUID + } else { + log.DebugLog(ctx, "the journal does not contain a reservation for group %q yet", name) + + uuid, _ /*vgsName*/, err = vgJournal.ReserveName(ctx, journalPool, name, uuid, prefix) + if err != nil { + return "", nothingToUndo, fmt.Errorf("failed to reserve a UUID for group %q: %w", name, err) + } + } + + log.DebugLog(ctx, "got UUID %q for group %q", uuid, name) + + // undo contains the cleanup that should be done by the caller when the + // reservation was made, and further actions fulfilling the final + // request failed + undo := func() { + err = vgJournal.UndoReservation(ctx, journalPool, uuid, name) + if err != nil { + log.ErrorLog(ctx, "failed to undo the reservation for group %q: %w", name, err) + } + } + + return uuid, undo, nil +} diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index ca75c8187..1bc9f201a 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups package rbd import ( diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 13ac40c62..be4f71a56 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups package rbd import ( diff --git a/internal/util/conn_pool.go b/internal/util/conn_pool.go index 8b85017db..9206a01b0 100644 --- a/internal/util/conn_pool.go +++ b/internal/util/conn_pool.go @@ -59,23 +59,6 @@ func NewConnPool(interval, expiry time.Duration) *ConnPool { return &cp } -// loop through all cp.conns and destroy objects that have not been used for cp.expiry. -func (cp *ConnPool) gc() { - cp.lock.Lock() - defer cp.lock.Unlock() - - now := time.Now() - for key, ce := range cp.conns { - if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry { - ce.destroy() - delete(cp.conns, key) - } - } - - // schedule the next gc() run - cp.timer.Reset(cp.interval) -} - // Destroy stops the garbage collector and destroys all connections in the pool. func (cp *ConnPool) Destroy() { cp.timer.Stop() @@ -94,30 +77,6 @@ func (cp *ConnPool) Destroy() { } } -func (cp *ConnPool) generateUniqueKey(monitors, user, keyfile string) (string, error) { - // the keyfile can be unique for operations, contents will be the same - key, err := os.ReadFile(keyfile) // #nosec:G304, file inclusion via variable. - if err != nil { - return "", fmt.Errorf("could not open keyfile %s: %w", keyfile, err) - } - - return fmt.Sprintf("%s|%s|%s", monitors, user, string(key)), nil -} - -// getExisting returns the existing rados.Conn associated with the unique key. -// -// Requires: locked cp.lock because of ce.get(). -func (cp *ConnPool) getConn(unique string) *rados.Conn { - ce, exists := cp.conns[unique] - if exists { - ce.get() - - return ce.conn - } - - return nil -} - // Get returns a rados.Conn for the given arguments. Creates a new rados.Conn in // case there is none. Use the returned rados.Conn to reduce the reference // count with ConnPool.Put(unique). @@ -206,6 +165,47 @@ func (cp *ConnPool) Put(conn *rados.Conn) { } } +// loop through all cp.conns and destroy objects that have not been used for cp.expiry. +func (cp *ConnPool) gc() { + cp.lock.Lock() + defer cp.lock.Unlock() + + now := time.Now() + for key, ce := range cp.conns { + if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry { + ce.destroy() + delete(cp.conns, key) + } + } + + // schedule the next gc() run + cp.timer.Reset(cp.interval) +} + +func (cp *ConnPool) generateUniqueKey(monitors, user, keyfile string) (string, error) { + // the keyfile can be unique for operations, contents will be the same + key, err := os.ReadFile(keyfile) // #nosec:G304, file inclusion via variable. + if err != nil { + return "", fmt.Errorf("could not open keyfile %s: %w", keyfile, err) + } + + return fmt.Sprintf("%s|%s|%s", monitors, user, string(key)), nil +} + +// getExisting returns the existing rados.Conn associated with the unique key. +// +// Requires: locked cp.lock because of ce.get(). +func (cp *ConnPool) getConn(unique string) *rados.Conn { + ce, exists := cp.conns[unique] + if exists { + ce.get() + + return ce.conn + } + + return nil +} + // Add a reference to the connEntry. // /!\ Only call this while holding the ConnPool.lock. func (ce *connEntry) get() { diff --git a/internal/util/credentials.go b/internal/util/credentials.go index 091efe3e0..f152ab145 100644 --- a/internal/util/credentials.go +++ b/internal/util/credentials.go @@ -43,6 +43,48 @@ type Credentials struct { KeyFile string } +// NewUserCredentials creates new user credentials from secret. +func NewUserCredentials(secrets map[string]string) (*Credentials, error) { + return newCredentialsFromSecret(credUserID, credUserKey, secrets) +} + +// NewAdminCredentials creates new admin credentials from secret. +func NewAdminCredentials(secrets map[string]string) (*Credentials, error) { + // Use userID and userKey if found else fallback to adminID and adminKey + if cred, err := newCredentialsFromSecret(credUserID, credUserKey, secrets); err == nil { + return cred, nil + } + log.WarningLogMsg("adminID and adminKey are deprecated, please use userID and userKey instead") + + return newCredentialsFromSecret(credAdminID, credAdminKey, secrets) +} + +// NewUserCredentialsWithMigration takes secret map from the request and validate it is +// a migration secret, if yes, it continues to create CR from it after parsing the migration +// secret. If it is not a migration it will continue the attempt to create credentials from it +// without parsing the secret. This function returns credentials and error. +func NewUserCredentialsWithMigration(secrets map[string]string) (*Credentials, error) { + if isMigrationSecret(secrets) { + migSecret, err := ParseAndSetSecretMapFromMigSecret(secrets) + if err != nil { + return nil, err + } + secrets = migSecret + } + cr, cErr := NewUserCredentials(secrets) + if cErr != nil { + return nil, cErr + } + + return cr, nil +} + +// DeleteCredentials removes the KeyFile. +func (cr *Credentials) DeleteCredentials() { + // don't complain about unhandled error + _ = os.Remove(cr.KeyFile) +} + func storeKey(key string) (string, error) { tmpfile, err := os.CreateTemp(tmpKeyFileLocation, tmpKeyFileNamePrefix) if err != nil { @@ -99,28 +141,6 @@ func newCredentialsFromSecret(idField, keyField string, secrets map[string]strin return c, err } -// DeleteCredentials removes the KeyFile. -func (cr *Credentials) DeleteCredentials() { - // don't complain about unhandled error - _ = os.Remove(cr.KeyFile) -} - -// NewUserCredentials creates new user credentials from secret. -func NewUserCredentials(secrets map[string]string) (*Credentials, error) { - return newCredentialsFromSecret(credUserID, credUserKey, secrets) -} - -// NewAdminCredentials creates new admin credentials from secret. -func NewAdminCredentials(secrets map[string]string) (*Credentials, error) { - // Use userID and userKey if found else fallback to adminID and adminKey - if cred, err := newCredentialsFromSecret(credUserID, credUserKey, secrets); err == nil { - return cred, nil - } - log.WarningLogMsg("adminID and adminKey are deprecated, please use userID and userKey instead") - - return newCredentialsFromSecret(credAdminID, credAdminKey, secrets) -} - // GetMonValFromSecret returns monitors from secret. func GetMonValFromSecret(secrets map[string]string) (string, error) { if mons, ok := secrets[credMonitors]; ok { @@ -160,23 +180,3 @@ func isMigrationSecret(secrets map[string]string) bool { // was hit on migration request compared to general one. return len(secrets) != 0 && secrets[migUserKey] != "" } - -// NewUserCredentialsWithMigration takes secret map from the request and validate it is -// a migration secret, if yes, it continues to create CR from it after parsing the migration -// secret. If it is not a migration it will continue the attempt to create credentials from it -// without parsing the secret. This function returns credentials and error. -func NewUserCredentialsWithMigration(secrets map[string]string) (*Credentials, error) { - if isMigrationSecret(secrets) { - migSecret, err := ParseAndSetSecretMapFromMigSecret(secrets) - if err != nil { - return nil, err - } - secrets = migSecret - } - cr, cErr := NewUserCredentials(secrets) - if cErr != nil { - return nil, cErr - } - - return cr, nil -} diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 211081a13..7742c576b 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -108,6 +108,59 @@ func NewOperationLock() *OperationLock { } } +// GetSnapshotCreateLock gets the snapshot lock on given volumeID. +func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error { + return ol.tryAcquire(createOp, volumeID) +} + +// GetCloneLock gets the clone lock on given volumeID. +func (ol *OperationLock) GetCloneLock(volumeID string) error { + return ol.tryAcquire(cloneOpt, volumeID) +} + +// GetDeleteLock gets the delete lock on given volumeID,ensures that there is +// no clone,restore and expand operation on given volumeID. +func (ol *OperationLock) GetDeleteLock(volumeID string) error { + return ol.tryAcquire(deleteOp, volumeID) +} + +// GetRestoreLock gets the restore lock on given volumeID,ensures that there is +// no delete operation on given volumeID. +func (ol *OperationLock) GetRestoreLock(volumeID string) error { + return ol.tryAcquire(restoreOp, volumeID) +} + +// GetExpandLock gets the expand lock on given volumeID,ensures that there is +// no delete and clone operation on given volumeID. +func (ol *OperationLock) GetExpandLock(volumeID string) error { + return ol.tryAcquire(expandOp, volumeID) +} + +// ReleaseSnapshotCreateLock releases the create lock on given volumeID. +func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) { + ol.release(createOp, volumeID) +} + +// ReleaseCloneLock releases the clone lock on given volumeID. +func (ol *OperationLock) ReleaseCloneLock(volumeID string) { + ol.release(cloneOpt, volumeID) +} + +// ReleaseDeleteLock releases the delete lock on given volumeID. +func (ol *OperationLock) ReleaseDeleteLock(volumeID string) { + ol.release(deleteOp, volumeID) +} + +// ReleaseRestoreLock releases the restore lock on given volumeID. +func (ol *OperationLock) ReleaseRestoreLock(volumeID string) { + ol.release(restoreOp, volumeID) +} + +// ReleaseExpandLock releases the expand lock on given volumeID. +func (ol *OperationLock) ReleaseExpandLock(volumeID string) { + ol.release(expandOp, volumeID) +} + // tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful. // If another operation is already using volumeID, returns false. func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { @@ -178,59 +231,6 @@ func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { return nil } -// GetSnapshotCreateLock gets the snapshot lock on given volumeID. -func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error { - return ol.tryAcquire(createOp, volumeID) -} - -// GetCloneLock gets the clone lock on given volumeID. -func (ol *OperationLock) GetCloneLock(volumeID string) error { - return ol.tryAcquire(cloneOpt, volumeID) -} - -// GetDeleteLock gets the delete lock on given volumeID,ensures that there is -// no clone,restore and expand operation on given volumeID. -func (ol *OperationLock) GetDeleteLock(volumeID string) error { - return ol.tryAcquire(deleteOp, volumeID) -} - -// GetRestoreLock gets the restore lock on given volumeID,ensures that there is -// no delete operation on given volumeID. -func (ol *OperationLock) GetRestoreLock(volumeID string) error { - return ol.tryAcquire(restoreOp, volumeID) -} - -// GetExpandLock gets the expand lock on given volumeID,ensures that there is -// no delete and clone operation on given volumeID. -func (ol *OperationLock) GetExpandLock(volumeID string) error { - return ol.tryAcquire(expandOp, volumeID) -} - -// ReleaseSnapshotCreateLock releases the create lock on given volumeID. -func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) { - ol.release(createOp, volumeID) -} - -// ReleaseCloneLock releases the clone lock on given volumeID. -func (ol *OperationLock) ReleaseCloneLock(volumeID string) { - ol.release(cloneOpt, volumeID) -} - -// ReleaseDeleteLock releases the delete lock on given volumeID. -func (ol *OperationLock) ReleaseDeleteLock(volumeID string) { - ol.release(deleteOp, volumeID) -} - -// ReleaseRestoreLock releases the restore lock on given volumeID. -func (ol *OperationLock) ReleaseRestoreLock(volumeID string) { - ol.release(restoreOp, volumeID) -} - -// ReleaseExpandLock releases the expand lock on given volumeID. -func (ol *OperationLock) ReleaseExpandLock(volumeID string) { - ol.release(expandOp, volumeID) -} - // release deletes the lock on volumeID. func (ol *OperationLock) release(op operation, volumeID string) { ol.mux.Lock() diff --git a/internal/util/reftracker/radoswrapper/fakerados.go b/internal/util/reftracker/radoswrapper/fakerados.go index c7ff59862..ef0def281 100644 --- a/internal/util/reftracker/radoswrapper/fakerados.go +++ b/internal/util/reftracker/radoswrapper/fakerados.go @@ -134,15 +134,6 @@ func (c *FakeIOContext) GetLastVersion() (uint64, error) { return c.LastObjVersion, nil } -func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) { - obj, ok := c.Rados.Objs[oid] - if !ok { - return nil, rados.ErrNotFound - } - - return obj, nil -} - func (c *FakeIOContext) GetXattr(oid, key string, data []byte) (int, error) { obj, ok := c.Rados.Objs[oid] if !ok { @@ -200,6 +191,15 @@ func (c *FakeIOContext) CreateReadOp() ReadOpW { } } +func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) { + obj, ok := c.Rados.Objs[oid] + if !ok { + return nil, rados.ErrNotFound + } + + return obj, nil +} + func (r *FakeReadOp) Operate(oid string) error { r.oid = oid