cleanup: address golangci 'funcorder' linter problems

The new 'funcorder' linter expects all public functions to be placed
before private functions of a struct. Many private functions needed
moving further down into their files.

Some files had many issues reported. To reduce the churn in those files,
they have been annotated with a `//nolint:funcorder` comment.

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2025-04-29 11:32:43 +02:00 committed by mergify[bot]
parent 0907f39d95
commit 0a22e3a186
29 changed files with 921 additions and 914 deletions

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups
package cephfs package cephfs
import ( import (

View File

@ -130,22 +130,6 @@ func (fq *fsQuiesce) GetVolumes() []Volume {
return fq.volumes return fq.volumes
} }
// getMembers returns the list of names in the format
// group/subvolume that are to be quiesced. This is the format that the
// ceph fs quiesce expects.
// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"].
func (fq *fsQuiesce) getMembers() []string {
volName := []string{}
for svg, sb := range fq.subVolumeGroupMapping {
for _, s := range sb {
name := svg + "/" + s
volName = append(volName, name)
}
}
return volName
}
func (fq *fsQuiesce) FSQuiesce( func (fq *fsQuiesce) FSQuiesce(
ctx context.Context, ctx context.Context,
reserveName string, reserveName string,
@ -248,3 +232,19 @@ func (fq *fsQuiesce) ReleaseFSQuiesce(ctx context.Context,
return nil, err return nil, err
} }
// getMembers returns the list of names in the format
// group/subvolume that are to be quiesced. This is the format that the
// ceph fs quiesce expects.
// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"].
func (fq *fsQuiesce) getMembers() []string {
volName := []string{}
for svg, sb := range fq.subVolumeGroupMapping {
for _, s := range sb {
name := svg + "/" + s
volName = append(volName, name)
}
}
return volName
}

View File

@ -58,6 +58,21 @@ func NewKernelMounter() KernelMounter {
} }
} }
func (m *kernelMounter) Mount(
ctx context.Context,
mountPoint string,
cr *util.Credentials,
volOptions *store.VolumeOptions,
) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return m.mountKernel(ctx, mountPoint, cr, volOptions)
}
func (m *kernelMounter) Name() string { return "Ceph kernel client" }
func (m *kernelMounter) mountKernel( func (m *kernelMounter) mountKernel(
ctx context.Context, ctx context.Context,
mountPoint string, mountPoint string,
@ -103,21 +118,6 @@ func (m *kernelMounter) mountKernel(
return err return err
} }
func (m *kernelMounter) Mount(
ctx context.Context,
mountPoint string,
cr *util.Credentials,
volOptions *store.VolumeOptions,
) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return m.mountKernel(ctx, mountPoint, cr, volOptions)
}
func (m *kernelMounter) Name() string { return "Ceph kernel client" }
// filesystemSupported checks if the passed name of the filesystem is included // filesystemSupported checks if the passed name of the filesystem is included
// in /proc/filesystems. // in /proc/filesystems.
func filesystemSupported(fs string) bool { func filesystemSupported(fs string) bool {

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups
package cephfs package cephfs
import ( import (

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file
package store package store
import ( import (

View File

@ -60,6 +60,103 @@ func (r *ReconcilePersistentVolume) Add(mgr manager.Manager, config ctrl.Config)
return add(mgr, newPVReconciler(mgr, config)) return add(mgr, newPVReconciler(mgr, config))
} }
// Reconcile reconciles the PersistentVolume object and creates a new omap entries
// for the volume.
func (r *ReconcilePersistentVolume) Reconcile(ctx context.Context,
request reconcile.Request,
) (reconcile.Result, error) {
pv := &corev1.PersistentVolume{}
err := r.client.Get(ctx, request.NamespacedName, pv)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Check if the object is under deletion
if !pv.GetDeletionTimestamp().IsZero() {
return reconcile.Result{}, nil
}
err = r.reconcilePV(ctx, pv)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
// reconcilePV will extract the image details from the pv spec and regenerates
// the omap data.
func (r *ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.Object) error {
pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
return nil
}
if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
return nil
}
// PV is not attached to any PVC
if pv.Spec.ClaimRef == nil {
return nil
}
pvcNamespace := pv.Spec.ClaimRef.Namespace
requestName := pv.Name
volumeHandler := pv.Spec.CSI.VolumeHandle
secretName := ""
secretNamespace := ""
// check static volume
static := checkStaticVolume(pv)
// if the volume is static, dont generate OMAP data
if static {
return nil
}
if pv.Spec.CSI.ControllerExpandSecretRef != nil {
secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name
secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace
} else if pv.Spec.CSI.NodeStageSecretRef != nil {
secretName = pv.Spec.CSI.NodeStageSecretRef.Name
secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace
}
// Take lock to process only one volumeHandle at a time.
if ok := r.Locks.TryAcquire(pv.Spec.CSI.VolumeHandle); !ok {
return fmt.Errorf(util.VolumeOperationAlreadyExistsFmt, pv.Spec.CSI.VolumeHandle)
}
defer r.Locks.Release(pv.Spec.CSI.VolumeHandle)
cr, err := r.getCredentials(ctx, secretName, secretNamespace)
if err != nil {
log.ErrorLogMsg("failed to get credentials from secret %s", err)
return err
}
defer cr.DeleteCredentials()
rbdVolID, err := rbd.RegenerateJournal(
pv.Spec.CSI.VolumeAttributes,
pv.Spec.ClaimRef.Name,
volumeHandler,
requestName,
pvcNamespace,
r.config.ClusterName,
r.config.InstanceID,
r.config.SetMetadata,
cr)
if err != nil {
log.ErrorLogMsg("failed to regenerate journal %s", err)
return err
}
if rbdVolID != volumeHandler {
log.DebugLog(ctx, "volumeHandler changed from %s to %s", volumeHandler, rbdVolID)
}
return nil
}
// newReconciler returns a ReconcilePersistentVolume. // newReconciler returns a ReconcilePersistentVolume.
func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler { func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconciler {
r := &ReconcilePersistentVolume{ r := &ReconcilePersistentVolume{
@ -133,100 +230,3 @@ func (r *ReconcilePersistentVolume) getCredentials(
func checkStaticVolume(pv *corev1.PersistentVolume) bool { func checkStaticVolume(pv *corev1.PersistentVolume) bool {
return pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" return pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true"
} }
// reconcilePV will extract the image details from the pv spec and regenerates
// the omap data.
func (r *ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.Object) error {
pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
return nil
}
if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
return nil
}
// PV is not attached to any PVC
if pv.Spec.ClaimRef == nil {
return nil
}
pvcNamespace := pv.Spec.ClaimRef.Namespace
requestName := pv.Name
volumeHandler := pv.Spec.CSI.VolumeHandle
secretName := ""
secretNamespace := ""
// check static volume
static := checkStaticVolume(pv)
// if the volume is static, dont generate OMAP data
if static {
return nil
}
if pv.Spec.CSI.ControllerExpandSecretRef != nil {
secretName = pv.Spec.CSI.ControllerExpandSecretRef.Name
secretNamespace = pv.Spec.CSI.ControllerExpandSecretRef.Namespace
} else if pv.Spec.CSI.NodeStageSecretRef != nil {
secretName = pv.Spec.CSI.NodeStageSecretRef.Name
secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace
}
// Take lock to process only one volumeHandle at a time.
if ok := r.Locks.TryAcquire(pv.Spec.CSI.VolumeHandle); !ok {
return fmt.Errorf(util.VolumeOperationAlreadyExistsFmt, pv.Spec.CSI.VolumeHandle)
}
defer r.Locks.Release(pv.Spec.CSI.VolumeHandle)
cr, err := r.getCredentials(ctx, secretName, secretNamespace)
if err != nil {
log.ErrorLogMsg("failed to get credentials from secret %s", err)
return err
}
defer cr.DeleteCredentials()
rbdVolID, err := rbd.RegenerateJournal(
pv.Spec.CSI.VolumeAttributes,
pv.Spec.ClaimRef.Name,
volumeHandler,
requestName,
pvcNamespace,
r.config.ClusterName,
r.config.InstanceID,
r.config.SetMetadata,
cr)
if err != nil {
log.ErrorLogMsg("failed to regenerate journal %s", err)
return err
}
if rbdVolID != volumeHandler {
log.DebugLog(ctx, "volumeHandler changed from %s to %s", volumeHandler, rbdVolID)
}
return nil
}
// Reconcile reconciles the PersistentVolume object and creates a new omap entries
// for the volume.
func (r *ReconcilePersistentVolume) Reconcile(ctx context.Context,
request reconcile.Request,
) (reconcile.Result, error) {
pv := &corev1.PersistentVolume{}
err := r.client.Get(ctx, request.NamespacedName, pv)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Check if the object is under deletion
if !pv.GetDeletionTimestamp().IsZero() {
return reconcile.Result{}, nil
}
err = r.reconcilePV(ctx, pv)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

View File

@ -150,6 +150,29 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return nil return nil
} }
// Reconcile reconciles the VolumeGroupReplicationContent object and creates a new omap entries
// for the volume group.
func (r *ReconcileVGRContent) Reconcile(ctx context.Context,
request reconcile.Request,
) (reconcile.Result, error) {
vgrc := &replicationv1alpha1.VolumeGroupReplicationContent{}
err := r.client.Get(ctx, request.NamespacedName, vgrc)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Proceed with reconciliation only if the object is not marked for deletion.
if vgrc.GetDeletionTimestamp().IsZero() {
err = r.reconcileVGRContent(ctx, vgrc)
}
return reconcile.Result{}, err
}
func (r *ReconcileVGRContent) getSecrets( func (r *ReconcileVGRContent) getSecrets(
ctx context.Context, ctx context.Context,
name, name,
@ -222,26 +245,3 @@ func (r *ReconcileVGRContent) reconcileVGRContent(ctx context.Context, obj runti
return nil return nil
} }
// Reconcile reconciles the VolumeGroupReplicationContent object and creates a new omap entries
// for the volume group.
func (r *ReconcileVGRContent) Reconcile(ctx context.Context,
request reconcile.Request,
) (reconcile.Result, error) {
vgrc := &replicationv1alpha1.VolumeGroupReplicationContent{}
err := r.client.Get(ctx, request.NamespacedName, vgrc)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Proceed with reconciliation only if the object is not marked for deletion.
if vgrc.GetDeletionTimestamp().IsZero() {
err = r.reconcileVGRContent(ctx, vgrc)
}
return reconcile.Result{}, err
}

View File

@ -89,30 +89,123 @@ func NewNetworkFence(
return nwFence, nil return nwFence, nil
} }
// addCephBlocklist adds an IP to ceph osd blocklist. // AddClientEviction blocks access for all the IPs in the CIDR block
func (nf *NetworkFence) addCephBlocklist(ctx context.Context, ip string, useRange bool) error { // using client eviction, it also blocks the entire CIDR.
arg := []string{ func (nf *NetworkFence) AddClientEviction(ctx context.Context) error {
"--id", nf.cr.ID, evictedIPs := make(map[string]bool)
"--keyfile=" + nf.cr.KeyFile, // fetch active clients
"-m", nf.Monitors, activeClients, err := nf.listActiveClients(ctx)
}
// TODO: add blocklist till infinity.
// Currently, ceph does not provide the functionality to blocklist IPs
// for infinite time. As a workaround, add a blocklist for 5 YEARS to
// represent infinity from ceph-csi side.
// At any point in this time, the IPs can be unblocked by an UnfenceClusterReq.
// This needs to be updated once ceph provides functionality for the same.
cmd := []string{"osd", "blocklist"}
if useRange {
cmd = append(cmd, "range")
}
cmd = append(cmd, "add", ip, blocklistTime)
cmd = append(cmd, arg...)
_, stdErr, err := util.ExecCommand(ctx, "ceph", cmd...)
if err != nil { if err != nil {
return fmt.Errorf("failed to blocklist IP %q: %w stderr: %q", ip, err, stdErr) return err
}
// iterate through CIDR blocks and check if any active client matches
for _, cidr := range nf.Cidr {
for _, client := range activeClients {
var clientIP string
clientIP, err = client.fetchIP()
if err != nil {
return fmt.Errorf("error fetching client IP: %w", err)
}
// check if the clientIP is in the CIDR block
if isIPInCIDR(ctx, clientIP, cidr) {
var clientID int
clientID, err = client.fetchID()
if err != nil {
return fmt.Errorf("error fetching client ID: %w", err)
}
// evict the client
err = nf.evictCephFSClient(ctx, clientID)
if err != nil {
return fmt.Errorf("error evicting client %d: %w", clientID, err)
}
log.DebugLog(ctx, "client %d has been evicted\n", clientID)
// add the CIDR to the list of blocklisted IPs
evictedIPs[clientIP] = true
}
}
}
// add the range based blocklist for CIDR
err = nf.AddNetworkFence(ctx)
if err != nil {
return err
}
return nil
}
// RemoveNetworkFence unblocks access for all the IPs in the IP range mentioned via the CIDR block
// using a network fence.
// Unfencing one of the protocols(CephFS or RBD) suggests the node is expected to be recovered, so
// both CephFS and RBD are expected to work again too.
// example:
// Create RBD NetworkFence CR for one IP 10.10.10.10
// Created CephFS NetworkFence CR for IP range but above IP comes in the Range
// Delete the CephFS Network Fence CR to unblocklist the IP
// So now the IP (10.10.10.10) is (un)blocklisted and can be used by both protocols.
func (nf *NetworkFence) RemoveNetworkFence(ctx context.Context) error {
hasBlocklistRangeSupport := true
// for each CIDR block, convert it into a range of IPs so as to undo blocklisting operation.
for _, cidr := range nf.Cidr {
// try range blocklist cmd, if invalid fallback to
// iterating through IP range.
if hasBlocklistRangeSupport {
err := nf.removeCephBlocklist(ctx, cidr, "", true)
if err == nil {
continue
}
if !strings.Contains(err.Error(), invalidCommandStr) {
return fmt.Errorf("failed to remove blocklist range %q: %w", cidr, err)
}
hasBlocklistRangeSupport = false
}
// fetch the list of IPs from a CIDR block
hosts, err := getIPRange(cidr)
if err != nil {
return fmt.Errorf("failed to convert CIDR block %s to corresponding IP range", cidr)
}
// remove ceph blocklist for each IP in the range mentioned by the CIDR
for _, host := range hosts {
// 0 is used as nonce here to tell ceph
// to remove the blocklist entry matching: <host>:0/0
// it is same as telling ceph to remove just the IP
// without specifying any port or nonce with it.
err := nf.removeCephBlocklist(ctx, host, "0", false)
if err != nil {
return err
}
}
}
return nil
}
func (nf *NetworkFence) RemoveClientEviction(ctx context.Context) error {
// Remove the CIDR block first
err := nf.RemoveNetworkFence(ctx)
if err != nil {
return err
}
// Get the ceph blocklist
blocklist, err := nf.getCephBlocklist(ctx)
if err != nil {
return err
}
// For each CIDR block, remove the IPs in the blocklist
// that fall under the CIDR with nonce
for _, cidr := range nf.Cidr {
hosts := nf.parseBlocklistForCIDR(ctx, blocklist, cidr)
log.DebugLog(ctx, "parsed blocklist for CIDR %s: %+v", cidr, hosts)
for _, host := range hosts {
err := nf.removeCephBlocklist(ctx, host.IP, host.Nonce, false)
if err != nil {
return err
}
}
} }
log.DebugLog(ctx, "blocklisted IP %q successfully", ip)
return nil return nil
} }
@ -153,6 +246,34 @@ func (nf *NetworkFence) AddNetworkFence(ctx context.Context) error {
return nil return nil
} }
// addCephBlocklist adds an IP to ceph osd blocklist.
func (nf *NetworkFence) addCephBlocklist(ctx context.Context, ip string, useRange bool) error {
arg := []string{
"--id", nf.cr.ID,
"--keyfile=" + nf.cr.KeyFile,
"-m", nf.Monitors,
}
// TODO: add blocklist till infinity.
// Currently, ceph does not provide the functionality to blocklist IPs
// for infinite time. As a workaround, add a blocklist for 5 YEARS to
// represent infinity from ceph-csi side.
// At any point in this time, the IPs can be unblocked by an UnfenceClusterReq.
// This needs to be updated once ceph provides functionality for the same.
cmd := []string{"osd", "blocklist"}
if useRange {
cmd = append(cmd, "range")
}
cmd = append(cmd, "add", ip, blocklistTime)
cmd = append(cmd, arg...)
_, stdErr, err := util.ExecCommand(ctx, "ceph", cmd...)
if err != nil {
return fmt.Errorf("failed to blocklist IP %q: %w stderr: %q", ip, err, stdErr)
}
log.DebugLog(ctx, "blocklisted IP %q successfully", ip)
return nil
}
func (nf *NetworkFence) listActiveClients(ctx context.Context) ([]activeClient, error) { func (nf *NetworkFence) listActiveClients(ctx context.Context) ([]activeClient, error) {
arg := []string{ arg := []string{
"--id", nf.cr.ID, "--id", nf.cr.ID,
@ -238,51 +359,6 @@ func (ac *activeClient) fetchID() (int, error) {
return 0, fmt.Errorf("failed to extract client ID, incorrect format: %s", clientInfo) return 0, fmt.Errorf("failed to extract client ID, incorrect format: %s", clientInfo)
} }
// AddClientEviction blocks access for all the IPs in the CIDR block
// using client eviction, it also blocks the entire CIDR.
func (nf *NetworkFence) AddClientEviction(ctx context.Context) error {
evictedIPs := make(map[string]bool)
// fetch active clients
activeClients, err := nf.listActiveClients(ctx)
if err != nil {
return err
}
// iterate through CIDR blocks and check if any active client matches
for _, cidr := range nf.Cidr {
for _, client := range activeClients {
var clientIP string
clientIP, err = client.fetchIP()
if err != nil {
return fmt.Errorf("error fetching client IP: %w", err)
}
// check if the clientIP is in the CIDR block
if isIPInCIDR(ctx, clientIP, cidr) {
var clientID int
clientID, err = client.fetchID()
if err != nil {
return fmt.Errorf("error fetching client ID: %w", err)
}
// evict the client
err = nf.evictCephFSClient(ctx, clientID)
if err != nil {
return fmt.Errorf("error evicting client %d: %w", clientID, err)
}
log.DebugLog(ctx, "client %d has been evicted\n", clientID)
// add the CIDR to the list of blocklisted IPs
evictedIPs[clientIP] = true
}
}
}
// add the range based blocklist for CIDR
err = nf.AddNetworkFence(ctx)
if err != nil {
return err
}
return nil
}
// getIPRange returns a list of IPs from the IP range // getIPRange returns a list of IPs from the IP range
// corresponding to a CIDR block. // corresponding to a CIDR block.
func getIPRange(cidr string) ([]string, error) { func getIPRange(cidr string) ([]string, error) {
@ -357,82 +433,6 @@ func (nf *NetworkFence) removeCephBlocklist(ctx context.Context, ip, nonce strin
return nil return nil
} }
// RemoveNetworkFence unblocks access for all the IPs in the IP range mentioned via the CIDR block
// using a network fence.
// Unfencing one of the protocols(CephFS or RBD) suggests the node is expected to be recovered, so
// both CephFS and RBD are expected to work again too.
// example:
// Create RBD NetworkFence CR for one IP 10.10.10.10
// Created CephFS NetworkFence CR for IP range but above IP comes in the Range
// Delete the CephFS Network Fence CR to unblocklist the IP
// So now the IP (10.10.10.10) is (un)blocklisted and can be used by both protocols.
func (nf *NetworkFence) RemoveNetworkFence(ctx context.Context) error {
hasBlocklistRangeSupport := true
// for each CIDR block, convert it into a range of IPs so as to undo blocklisting operation.
for _, cidr := range nf.Cidr {
// try range blocklist cmd, if invalid fallback to
// iterating through IP range.
if hasBlocklistRangeSupport {
err := nf.removeCephBlocklist(ctx, cidr, "", true)
if err == nil {
continue
}
if !strings.Contains(err.Error(), invalidCommandStr) {
return fmt.Errorf("failed to remove blocklist range %q: %w", cidr, err)
}
hasBlocklistRangeSupport = false
}
// fetch the list of IPs from a CIDR block
hosts, err := getIPRange(cidr)
if err != nil {
return fmt.Errorf("failed to convert CIDR block %s to corresponding IP range", cidr)
}
// remove ceph blocklist for each IP in the range mentioned by the CIDR
for _, host := range hosts {
// 0 is used as nonce here to tell ceph
// to remove the blocklist entry matching: <host>:0/0
// it is same as telling ceph to remove just the IP
// without specifying any port or nonce with it.
err := nf.removeCephBlocklist(ctx, host, "0", false)
if err != nil {
return err
}
}
}
return nil
}
func (nf *NetworkFence) RemoveClientEviction(ctx context.Context) error {
// Remove the CIDR block first
err := nf.RemoveNetworkFence(ctx)
if err != nil {
return err
}
// Get the ceph blocklist
blocklist, err := nf.getCephBlocklist(ctx)
if err != nil {
return err
}
// For each CIDR block, remove the IPs in the blocklist
// that fall under the CIDR with nonce
for _, cidr := range nf.Cidr {
hosts := nf.parseBlocklistForCIDR(ctx, blocklist, cidr)
log.DebugLog(ctx, "parsed blocklist for CIDR %s: %+v", cidr, hosts)
for _, host := range hosts {
err := nf.removeCephBlocklist(ctx, host.IP, host.Nonce, false)
if err != nil {
return err
}
}
}
return nil
}
// getCephBlocklist fetches the ceph blocklist and returns it as a string. // getCephBlocklist fetches the ceph blocklist and returns it as a string.
func (nf *NetworkFence) getCephBlocklist(ctx context.Context) (string, error) { func (nf *NetworkFence) getCephBlocklist(ctx context.Context) (string, error) {
arg := []string{ arg := []string{

View File

@ -108,6 +108,15 @@ func (cas *CSIAddonsServer) Start(middlewareConfig csicommon.MiddlewareServerOpt
return nil return nil
} }
// Stop can be used to stop the internal gRPC server.
func (cas *CSIAddonsServer) Stop() {
if cas.server == nil {
return
}
cas.server.GracefulStop()
}
// serve starts the actual process of listening for requests on the gRPC // serve starts the actual process of listening for requests on the gRPC
// server. This is a blocking call, so it should get executed in a go-routine. // server. This is a blocking call, so it should get executed in a go-routine.
func (cas *CSIAddonsServer) serve(listener net.Listener) { func (cas *CSIAddonsServer) serve(listener net.Listener) {
@ -121,12 +130,3 @@ func (cas *CSIAddonsServer) serve(listener net.Listener) {
log.DefaultLog("the CSI-Addons server at %q has been stopped", listener.Addr()) log.DefaultLog("the CSI-Addons server at %q has been stopped", listener.Addr())
} }
// Stop can be used to stop the internal gRPC server.
func (cas *CSIAddonsServer) Stop() {
if cas.server == nil {
return
}
cas.server.GracefulStop()
}

View File

@ -99,6 +99,46 @@ func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerTyp
return hcm.createChecker(volumeID, path, ct, false) return hcm.createChecker(volumeID, path, ct, false)
} }
func (hcm *healthCheckManager) StopSharedChecker(volumeID string) {
hcm.StopChecker(volumeID, "")
}
func (hcm *healthCheckManager) StopChecker(volumeID, path string) {
old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path))
if !ok {
// nothing was loaded, nothing to do
return
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
// failed to cast, should not be possible
return
}
cc.stop()
}
func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) {
// load the 'old' ConditionChecker if it exists
old, ok := hcm.checkers.Load(volumeID)
if !ok {
// try fallback which include an optional (unique) path (usually publishTargetPath)
old, ok = hcm.checkers.Load(fallbackKey(volumeID, path))
if !ok {
return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID)
}
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
}
return cc.isHealthy()
}
// createChecker decides based on the CheckerType what checker to start for // createChecker decides based on the CheckerType what checker to start for
// the volume. // the volume.
func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error { func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error {
@ -158,46 +198,6 @@ func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path
return nil return nil
} }
func (hcm *healthCheckManager) StopSharedChecker(volumeID string) {
hcm.StopChecker(volumeID, "")
}
func (hcm *healthCheckManager) StopChecker(volumeID, path string) {
old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path))
if !ok {
// nothing was loaded, nothing to do
return
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
// failed to cast, should not be possible
return
}
cc.stop()
}
func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) {
// load the 'old' ConditionChecker if it exists
old, ok := hcm.checkers.Load(volumeID)
if !ok {
// try fallback which include an optional (unique) path (usually publishTargetPath)
old, ok = hcm.checkers.Load(fallbackKey(volumeID, path))
if !ok {
return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID)
}
}
// 'old' was loaded, cast it to ConditionChecker
cc, ok := old.(ConditionChecker)
if !ok {
return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
}
return cc.isHealthy()
}
// fallbackKey returns the key for a checker in the map. If the path is empty, // fallbackKey returns the key for a checker in the map. If the path is empty,
// it is assumed that the key'd checked is shared. // it is assumed that the key'd checked is shared.
func fallbackKey(volumeID, path string) string { func fallbackKey(volumeID, path string) string {

View File

@ -109,11 +109,6 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig {
} }
} }
// SetNamespace sets the namespace for the journal.
func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) {
vgc.Config.namespace = ns
}
// NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for
// volume groups using a predetermined namespace value. // volume groups using a predetermined namespace value.
func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig { func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig {
@ -123,6 +118,11 @@ func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal
return j return j
} }
// SetNamespace sets the namespace for the journal.
func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) {
vgc.Config.namespace = ns
}
// Connect establishes a new connection to a ceph cluster for journal metadata. // Connect establishes a new connection to a ceph cluster for journal metadata.
func (vgc *VolumeGroupJournalConfig) Connect( func (vgc *VolumeGroupJournalConfig) Connect(
monitors, monitors,

View File

@ -124,35 +124,6 @@ func initAWSMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) {
return kms, nil return kms, nil
} }
func (kms *awsMetadataKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to Kubernetes to "+
"get Secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Secret %s/%s: %w",
kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case awsSecretAccessKey, awsAccessKey, awsSessionToken, awsCMK:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS "+
"provider %q: %s", kmsTypeAWSMetadata, k)
}
}
return config, nil
}
func (kms *awsMetadataKMS) Destroy() { func (kms *awsMetadataKMS) Destroy() {
// Nothing to do. // Nothing to do.
} }
@ -164,24 +135,6 @@ func (kms *awsMetadataKMS) RequiresDEKStore() DEKStoreType {
return DEKStoreMetadata return DEKStoreMetadata
} }
func (kms *awsMetadataKMS) getService() (*awsKMS.KMS, error) {
creds := awsCreds.NewStaticCredentials(kms.accessKey,
kms.secretAccessKey, kms.sessionToken)
sess, err := awsSession.NewSessionWithOptions(awsSession.Options{
SharedConfigState: awsSession.SharedConfigDisable,
Config: aws.Config{
Credentials: creds,
Region: aws.String(kms.region),
},
})
if err != nil {
return nil, fmt.Errorf("failed to create AWS session: %w", err)
}
return awsKMS.New(sess), nil
}
// EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK. // EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK.
func (kms *awsMetadataKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) { func (kms *awsMetadataKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) {
svc, err := kms.getService() svc, err := kms.getService()
@ -230,3 +183,50 @@ func (kms *awsMetadataKMS) DecryptDEK(ctx context.Context, volumeID, encryptedDE
func (kms *awsMetadataKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { func (kms *awsMetadataKMS) GetSecret(ctx context.Context, volumeID string) (string, error) {
return "", ErrGetSecretUnsupported return "", ErrGetSecretUnsupported
} }
func (kms *awsMetadataKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to Kubernetes to "+
"get Secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Secret %s/%s: %w",
kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case awsSecretAccessKey, awsAccessKey, awsSessionToken, awsCMK:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS "+
"provider %q: %s", kmsTypeAWSMetadata, k)
}
}
return config, nil
}
func (kms *awsMetadataKMS) getService() (*awsKMS.KMS, error) {
creds := awsCreds.NewStaticCredentials(kms.accessKey,
kms.secretAccessKey, kms.sessionToken)
sess, err := awsSession.NewSessionWithOptions(awsSession.Options{
SharedConfigState: awsSession.SharedConfigDisable,
Config: aws.Config{
Credentials: creds,
Region: aws.String(kms.region),
},
})
if err != nil {
return nil, fmt.Errorf("failed to create AWS session: %w", err)
}
return awsKMS.New(sess), nil
}

View File

@ -115,6 +115,49 @@ func initAWSSTSMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) {
return kms, nil return kms, nil
} }
// EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK.
func (as *awsSTSMetadataKMS) EncryptDEK(ctx context.Context, _, plainDEK string) (string, error) {
svc, err := as.getServiceWithSTS()
if err != nil {
return "", fmt.Errorf("failed to get KMS service: %w", err)
}
result, err := svc.Encrypt(&awsKMS.EncryptInput{
KeyId: aws.String(as.cmk),
Plaintext: []byte(plainDEK),
})
if err != nil {
return "", fmt.Errorf("failed to encrypt DEK: %w", err)
}
// base64 encode the encrypted DEK, so that storing it should not have
// issues
return base64.StdEncoding.EncodeToString(result.CiphertextBlob), nil
}
// DecryptDEK uses the Amazon KMS and the configured CMK to decrypt the DEK.
func (as *awsSTSMetadataKMS) DecryptDEK(ctx context.Context, _, encryptedDEK string) (string, error) {
svc, err := as.getServiceWithSTS()
if err != nil {
return "", fmt.Errorf("failed to get KMS service: %w", err)
}
ciphertextBlob, err := base64.StdEncoding.DecodeString(encryptedDEK)
if err != nil {
return "", fmt.Errorf("failed to decode base64 cipher: %w",
err)
}
result, err := svc.Decrypt(&awsKMS.DecryptInput{
CiphertextBlob: ciphertextBlob,
})
if err != nil {
return "", fmt.Errorf("failed to decrypt DEK: %w", err)
}
return string(result.Plaintext), nil
}
// getSecrets returns required STS configuration options from the Kubernetes Secret. // getSecrets returns required STS configuration options from the Kubernetes Secret.
func (as *awsSTSMetadataKMS) getSecrets() (map[string]string, error) { func (as *awsSTSMetadataKMS) getSecrets() (map[string]string, error) {
c, err := k8s.NewK8sClient() c, err := k8s.NewK8sClient()
@ -191,46 +234,3 @@ func (as *awsSTSMetadataKMS) getServiceWithSTS() (*awsKMS.KMS, error) {
return awsKMS.New(sess), nil return awsKMS.New(sess), nil
} }
// EncryptDEK uses the Amazon KMS and the configured CMK to encrypt the DEK.
func (as *awsSTSMetadataKMS) EncryptDEK(ctx context.Context, _, plainDEK string) (string, error) {
svc, err := as.getServiceWithSTS()
if err != nil {
return "", fmt.Errorf("failed to get KMS service: %w", err)
}
result, err := svc.Encrypt(&awsKMS.EncryptInput{
KeyId: aws.String(as.cmk),
Plaintext: []byte(plainDEK),
})
if err != nil {
return "", fmt.Errorf("failed to encrypt DEK: %w", err)
}
// base64 encode the encrypted DEK, so that storing it should not have
// issues
return base64.StdEncoding.EncodeToString(result.CiphertextBlob), nil
}
// DecryptDEK uses the Amazon KMS and the configured CMK to decrypt the DEK.
func (as *awsSTSMetadataKMS) DecryptDEK(ctx context.Context, _, encryptedDEK string) (string, error) {
svc, err := as.getServiceWithSTS()
if err != nil {
return "", fmt.Errorf("failed to get KMS service: %w", err)
}
ciphertextBlob, err := base64.StdEncoding.DecodeString(encryptedDEK)
if err != nil {
return "", fmt.Errorf("failed to decode base64 cipher: %w",
err)
}
result, err := svc.Decrypt(&awsKMS.DecryptInput{
CiphertextBlob: ciphertextBlob,
})
if err != nil {
return "", fmt.Errorf("failed to decrypt DEK: %w", err)
}
return string(result.Plaintext), nil
}

View File

@ -117,50 +117,6 @@ func (kms *azureKMS) Destroy() {
// Nothing to do. // Nothing to do.
} }
func (kms *azureKMS) getService() (*azsecrets.Client, error) {
certs, key, err := azidentity.ParseCertificates([]byte(kms.clientCertificate), []byte{})
if err != nil {
return nil, fmt.Errorf("failed to parse Azure client certificate: %w", err)
}
creds, err := azidentity.NewClientCertificateCredential(kms.tenantID, kms.clientID, certs, key, nil)
if err != nil {
return nil, fmt.Errorf("failed to create Azure credentials: %w", err)
}
azClient, err := azsecrets.NewClient(kms.vaultURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to create Azure client: %w", err)
}
return azClient, nil
}
func (kms *azureKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to kubernetes to "+
"get secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case azureClientCertificate:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS provider %q: %s", kmsTypeAzure, k)
}
}
return config, nil
}
// FetchDEK returns passphrase from Azure key vault. // FetchDEK returns passphrase from Azure key vault.
func (kms *azureKMS) FetchDEK(ctx context.Context, key string) (string, error) { func (kms *azureKMS) FetchDEK(ctx context.Context, key string) (string, error) {
svc, err := kms.getService() svc, err := kms.getService()
@ -208,3 +164,47 @@ func (kms *azureKMS) RemoveDEK(ctx context.Context, key string) error {
return nil return nil
} }
func (kms *azureKMS) getService() (*azsecrets.Client, error) {
certs, key, err := azidentity.ParseCertificates([]byte(kms.clientCertificate), []byte{})
if err != nil {
return nil, fmt.Errorf("failed to parse Azure client certificate: %w", err)
}
creds, err := azidentity.NewClientCertificateCredential(kms.tenantID, kms.clientID, certs, key, nil)
if err != nil {
return nil, fmt.Errorf("failed to create Azure credentials: %w", err)
}
azClient, err := azsecrets.NewClient(kms.vaultURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to create Azure client: %w", err)
}
return azClient, nil
}
func (kms *azureKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to kubernetes to "+
"get secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case azureClientCertificate:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS provider %q: %s", kmsTypeAzure, k)
}
}
return config, nil
}

View File

@ -147,35 +147,6 @@ func initKeyProtectKMS(args ProviderInitArgs) (EncryptionKMS, error) {
return kms, nil return kms, nil
} }
func (kms *keyProtectKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to Kubernetes to "+
"get Secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Secret %s/%s: %w",
kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case keyProtectServiceAPIKey, KeyProtectCustomerRootKey, keyProtectSessionToken, keyProtectCRK:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS "+
"provider %q: %s", kmsTypeKeyProtectMetadata, k)
}
}
return config, nil
}
func (kms *keyProtectKMS) Destroy() { func (kms *keyProtectKMS) Destroy() {
// Nothing to do. // Nothing to do.
} }
@ -184,25 +155,6 @@ func (kms *keyProtectKMS) RequiresDEKStore() DEKStoreType {
return DEKStoreMetadata return DEKStoreMetadata
} }
func (kms *keyProtectKMS) getService() error {
// Use your Service API Key and your KeyProtect Service Instance ID to create a ClientConfig
cc := kp.ClientConfig{
BaseURL: kms.baseURL,
TokenURL: kms.tokenURL,
APIKey: kms.serviceAPIKey,
InstanceID: kms.serviceInstanceID,
}
// Build a new client from the config
client, err := kp.New(cc, kp.DefaultTransport())
if err != nil {
return fmt.Errorf("failed to create keyprotect client: %w", err)
}
kms.client = client
return nil
}
// EncryptDEK uses the KeyProtect KMS and the configured CRK to encrypt the DEK. // EncryptDEK uses the KeyProtect KMS and the configured CRK to encrypt the DEK.
func (kms *keyProtectKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) { func (kms *keyProtectKMS) EncryptDEK(ctx context.Context, volumeID, plainDEK string) (string, error) {
if err := kms.getService(); err != nil { if err := kms.getService(); err != nil {
@ -246,3 +198,51 @@ func (kms *keyProtectKMS) DecryptDEK(ctx context.Context, volumeID, encryptedDEK
func (kms *keyProtectKMS) GetSecret(ctx context.Context, volumeID string) (string, error) { func (kms *keyProtectKMS) GetSecret(ctx context.Context, volumeID string) (string, error) {
return "", ErrGetSecretUnsupported return "", ErrGetSecretUnsupported
} }
func (kms *keyProtectKMS) getSecrets() (map[string]interface{}, error) {
c, err := k8s.NewK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to connect to Kubernetes to "+
"get Secret %s/%s: %w", kms.namespace, kms.secretName, err)
}
secret, err := c.CoreV1().Secrets(kms.namespace).Get(context.TODO(),
kms.secretName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Secret %s/%s: %w",
kms.namespace, kms.secretName, err)
}
config := make(map[string]interface{})
for k, v := range secret.Data {
switch k {
case keyProtectServiceAPIKey, KeyProtectCustomerRootKey, keyProtectSessionToken, keyProtectCRK:
config[k] = string(v)
default:
return nil, fmt.Errorf("unsupported option for KMS "+
"provider %q: %s", kmsTypeKeyProtectMetadata, k)
}
}
return config, nil
}
func (kms *keyProtectKMS) getService() error {
// Use your Service API Key and your KeyProtect Service Instance ID to create a ClientConfig
cc := kp.ClientConfig{
BaseURL: kms.baseURL,
TokenURL: kms.tokenURL,
APIKey: kms.serviceAPIKey,
InstanceID: kms.serviceInstanceID,
}
// Build a new client from the config
client, err := kp.New(cc, kp.DefaultTransport())
if err != nil {
return fmt.Errorf("failed to create keyprotect client: %w", err)
}
kms.client = client
return nil
}

View File

@ -293,6 +293,10 @@ func (kms *kmipKMS) RequiresDEKStore() DEKStoreType {
return DEKStoreMetadata return DEKStoreMetadata
} }
func (kms *kmipKMS) GetSecret(ctx context.Context, volumeID string) (string, error) {
return "", ErrGetSecretUnsupported
}
// getSecrets returns required options from the Kubernetes Secret. // getSecrets returns required options from the Kubernetes Secret.
func (kms *kmipKMS) getSecrets() (map[string]string, error) { func (kms *kmipKMS) getSecrets() (map[string]string, error) {
c, err := k8s.NewK8sClient() c, err := k8s.NewK8sClient()
@ -500,10 +504,6 @@ func (kms *kmipKMS) verifyResponse(
return &batchItem, nil return &batchItem, nil
} }
func (kms *kmipKMS) GetSecret(ctx context.Context, volumeID string) (string, error) {
return "", ErrGetSecretUnsupported
}
// TODO: use the following structs from https://github.com/gemalto/kmip-go // TODO: use the following structs from https://github.com/gemalto/kmip-go
// when https://github.com/ThalesGroup/kmip-go/issues/21 is resolved. // when https://github.com/ThalesGroup/kmip-go/issues/21 is resolved.
// refer: https://docs.oasis-open.org/kmip/spec/v1.4/kmip-spec-v1.4.html. // refer: https://docs.oasis-open.org/kmip/spec/v1.4/kmip-spec-v1.4.html.

View File

@ -116,53 +116,6 @@ func initSecretsMetadataKMS(args ProviderInitArgs) (EncryptionKMS, error) {
return smKMS, nil return smKMS, nil
} }
// fetchEncryptionPassphrase fetches encryptionPassphrase from user provided secret.
func (kms secretsMetadataKMS) fetchEncryptionPassphrase(
config map[string]interface{},
defaultNamespace string,
) (string, error) {
var (
secretName string
secretNamespace string
)
err := setConfigString(&secretName, config, metadataSecretNameKey)
if err != nil {
return "", err
}
err = setConfigString(&secretNamespace, config, metadataSecretNamespaceKey)
if err != nil {
if !errors.Is(err, errConfigOptionMissing) {
return "", err
}
// if 'secretNamespace' option is not specified, defaults to namespace in
// which PVC was created
secretNamespace = defaultNamespace
}
c, err := k8s.NewK8sClient()
if err != nil {
return "", fmt.Errorf("can not get Secret %s/%s, failed to "+
"connect to Kubernetes: %w", secretNamespace, secretName, err)
}
secret, err := c.CoreV1().Secrets(secretNamespace).Get(context.TODO(),
secretName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get Secret %s/%s: %w",
secretNamespace, secretName, err)
}
passphraseValue, ok := secret.Data[encryptionPassphraseKey]
if !ok {
return "", fmt.Errorf("missing %q in Secret %s/%s",
encryptionPassphraseKey, secretNamespace, secretName)
}
return string(passphraseValue), nil
}
// Destroy frees all used resources. // Destroy frees all used resources.
func (kms secretsMetadataKMS) Destroy() { func (kms secretsMetadataKMS) Destroy() {
// nothing to do // nothing to do
@ -287,6 +240,53 @@ func (kms secretsMetadataKMS) GetSecret(ctx context.Context, volumeID string) (s
return kms.FetchDEK(ctx, volumeID) return kms.FetchDEK(ctx, volumeID)
} }
// fetchEncryptionPassphrase fetches encryptionPassphrase from user provided secret.
func (kms secretsMetadataKMS) fetchEncryptionPassphrase(
config map[string]interface{},
defaultNamespace string,
) (string, error) {
var (
secretName string
secretNamespace string
)
err := setConfigString(&secretName, config, metadataSecretNameKey)
if err != nil {
return "", err
}
err = setConfigString(&secretNamespace, config, metadataSecretNamespaceKey)
if err != nil {
if !errors.Is(err, errConfigOptionMissing) {
return "", err
}
// if 'secretNamespace' option is not specified, defaults to namespace in
// which PVC was created
secretNamespace = defaultNamespace
}
c, err := k8s.NewK8sClient()
if err != nil {
return "", fmt.Errorf("can not get Secret %s/%s, failed to "+
"connect to Kubernetes: %w", secretNamespace, secretName, err)
}
secret, err := c.CoreV1().Secrets(secretNamespace).Get(context.TODO(),
secretName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get Secret %s/%s: %w",
secretNamespace, secretName, err)
}
passphraseValue, ok := secret.Data[encryptionPassphraseKey]
if !ok {
return "", fmt.Errorf("missing %q in Secret %s/%s",
encryptionPassphraseKey, secretNamespace, secretName)
}
return string(passphraseValue), nil
}
// generateCipher returns a AEAD cipher based on a passphrase and salt // generateCipher returns a AEAD cipher based on a passphrase and salt
// (volumeID). The cipher can then be used to encrypt/decrypt the DEK. // (volumeID). The cipher can then be used to encrypt/decrypt the DEK.
func generateCipher(passphrase, salt string) (cipher.AEAD, error) { func generateCipher(passphrase, salt string) (cipher.AEAD, error) {

View File

@ -122,6 +122,19 @@ func setConfigString(option *string, config map[string]interface{}, key string)
return nil return nil
} }
// Destroy frees allocated resources. For a vaultConnection that means removing
// the created temporary files.
func (vc *vaultConnection) Destroy() {
if vc.vaultConfig != nil {
tmpFile, ok := vc.vaultConfig[api.EnvVaultCACert]
if ok {
// ignore error on failure to remove tmpfile (gosec complains)
//nolint:forcetypeassert,errcheck // ignore error on failure to remove tmpfile
_ = os.Remove(tmpFile.(string))
}
}
}
// initConnection sets VAULT_* environment variables in the vc.vaultConfig map, // initConnection sets VAULT_* environment variables in the vc.vaultConfig map,
// these settings will be used when connecting to the Vault service with // these settings will be used when connecting to the Vault service with
// vc.connectVault(). // vc.connectVault().
@ -298,19 +311,6 @@ func (vc *vaultConnection) connectVault() error {
return nil return nil
} }
// Destroy frees allocated resources. For a vaultConnection that means removing
// the created temporary files.
func (vc *vaultConnection) Destroy() {
if vc.vaultConfig != nil {
tmpFile, ok := vc.vaultConfig[api.EnvVaultCACert]
if ok {
// ignore error on failure to remove tmpfile (gosec complains)
//nolint:forcetypeassert,errcheck // ignore error on failure to remove tmpfile
_ = os.Remove(tmpFile.(string))
}
}
}
// getDeleteKeyContext creates a new KeyContext that has an optional value set // getDeleteKeyContext creates a new KeyContext that has an optional value set
// to destroy the contents of secrets. This is configurable with the // to destroy the contents of secrets. This is configurable with the
// `vaultDestroyKeys` configuration parameter. // `vaultDestroyKeys` configuration parameter.

View File

@ -281,6 +281,54 @@ func initVaultTokensKMS(args ProviderInitArgs) (EncryptionKMS, error) {
return kms, nil return kms, nil
} }
// FetchDEK returns passphrase from Vault. The passphrase is stored in a
// data.data.passphrase structure.
func (vtc *vaultTenantConnection) FetchDEK(ctx context.Context, key string) (string, error) {
// Since the second return variable loss.Version is not used, there it is ignored.
s, _, err := vtc.secrets.GetSecret(key, vtc.keyContext)
if err != nil {
return "", err
}
data, ok := s["data"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("failed parsing data for get passphrase request for %s", key)
}
passphrase, ok := data["passphrase"].(string)
if !ok {
return "", fmt.Errorf("failed parsing passphrase for get passphrase request for %s", key)
}
return passphrase, nil
}
// StoreDEK saves new passphrase in Vault.
func (vtc *vaultTenantConnection) StoreDEK(ctx context.Context, key, value string) error {
data := map[string]interface{}{
"data": map[string]string{
"passphrase": value,
},
}
// Since the first return variable loss.Version is not used, there it is ignored.
_, err := vtc.secrets.PutSecret(key, data, vtc.keyContext)
if err != nil {
return fmt.Errorf("saving passphrase at %s request to vault failed: %w", key, err)
}
return nil
}
// RemoveDEK deletes passphrase from Vault.
func (vtc *vaultTenantConnection) RemoveDEK(ctx context.Context, key string) error {
err := vtc.secrets.DeleteSecret(key, vtc.getDeleteKeyContext())
if err != nil {
return fmt.Errorf("delete passphrase at %s request to vault failed: %w", key, err)
}
return nil
}
func (kms *vaultTokensKMS) configureTenant(config map[string]interface{}, tenant string) error { func (kms *vaultTokensKMS) configureTenant(config map[string]interface{}, tenant string) error {
kms.Tenant = tenant kms.Tenant = tenant
tenantConfig, found := fetchTenantConfig(config, tenant) tenantConfig, found := fetchTenantConfig(config, tenant)
@ -461,54 +509,6 @@ func (vtc *vaultTenantConnection) getK8sClient() (*kubernetes.Clientset, error)
return vtc.client, nil return vtc.client, nil
} }
// FetchDEK returns passphrase from Vault. The passphrase is stored in a
// data.data.passphrase structure.
func (vtc *vaultTenantConnection) FetchDEK(ctx context.Context, key string) (string, error) {
// Since the second return variable loss.Version is not used, there it is ignored.
s, _, err := vtc.secrets.GetSecret(key, vtc.keyContext)
if err != nil {
return "", err
}
data, ok := s["data"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("failed parsing data for get passphrase request for %s", key)
}
passphrase, ok := data["passphrase"].(string)
if !ok {
return "", fmt.Errorf("failed parsing passphrase for get passphrase request for %s", key)
}
return passphrase, nil
}
// StoreDEK saves new passphrase in Vault.
func (vtc *vaultTenantConnection) StoreDEK(ctx context.Context, key, value string) error {
data := map[string]interface{}{
"data": map[string]string{
"passphrase": value,
},
}
// Since the first return variable loss.Version is not used, there it is ignored.
_, err := vtc.secrets.PutSecret(key, data, vtc.keyContext)
if err != nil {
return fmt.Errorf("saving passphrase at %s request to vault failed: %w", key, err)
}
return nil
}
// RemoveDEK deletes passphrase from Vault.
func (vtc *vaultTenantConnection) RemoveDEK(ctx context.Context, key string) error {
err := vtc.secrets.DeleteSecret(key, vtc.getDeleteKeyContext())
if err != nil {
return fmt.Errorf("delete passphrase at %s request to vault failed: %w", key, err)
}
return nil
}
func (kms *vaultTokensKMS) getToken() (string, error) { func (kms *vaultTokensKMS) getToken() (string, error) {
c, err := kms.getK8sClient() c, err := kms.getK8sClient()
if err != nil { if err != nil {

View File

@ -187,26 +187,6 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
return nil return nil
} }
// createExportCommand returns the "ceph nfs export create ..." command
// arguments (without "ceph"). The order of the parameters matches old Ceph
// releases, new Ceph releases added --option formats, which can be added when
// passing the parameters to this function.
func (nv *NFSVolume) createExportCommand(nfsCluster, fs, export, path string) []string {
return []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"export",
"create",
"cephfs",
fs,
nfsCluster,
export,
path,
}
}
// DeleteExport removes the NFS-export from the Ceph managed NFS-server. // DeleteExport removes the NFS-export from the Ceph managed NFS-server.
func (nv *NFSVolume) DeleteExport() error { func (nv *NFSVolume) DeleteExport() error {
if !nv.connected { if !nv.connected {
@ -250,6 +230,26 @@ func (nv *NFSVolume) DeleteExport() error {
return nil return nil
} }
// createExportCommand returns the "ceph nfs export create ..." command
// arguments (without "ceph"). The order of the parameters matches old Ceph
// releases, new Ceph releases added --option formats, which can be added when
// passing the parameters to this function.
func (nv *NFSVolume) createExportCommand(nfsCluster, fs, export, path string) []string {
return []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"export",
"create",
"cephfs",
fs,
nfsCluster,
export,
path,
}
}
// deleteExportCommand returns the "ceph nfs export delete ..." command // deleteExportCommand returns the "ceph nfs export delete ..." command
// arguments (without "ceph"). Old releases of Ceph expect "delete" as cmd, // arguments (without "ceph"). Old releases of Ceph expect "delete" as cmd,
// newer releases use "rm". // newer releases use "rm".

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups
package rbd package rbd
import ( import (

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file
package group package group
import ( import (

View File

@ -67,113 +67,6 @@ func (mgr *rbdManager) Destroy(ctx context.Context) {
} }
} }
// getCredentials sets up credentials and connects to the journal.
func (mgr *rbdManager) getCredentials() (*util.Credentials, error) {
if mgr.creds != nil {
return mgr.creds, nil
}
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return nil, fmt.Errorf("failed to get credentials: %w", err)
}
mgr.creds = creds
return creds, nil
}
// getVolumeGroupNamePrefix returns the prefix for the volume group if set, or
// an empty string if none is configured.
func (mgr *rbdManager) getVolumeGroupNamePrefix() string {
return mgr.parameters["volumeGroupNamePrefix"]
}
func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) {
if mgr.vgJournal != nil {
return mgr.vgJournal, nil
}
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}
ns, err := util.GetRBDRadosNamespace(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err)
}
vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.driverInstance, ns)
vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds)
if err != nil {
return nil, fmt.Errorf("failed to connect to journal: %w", err)
}
mgr.vgJournal = vgJournal
return vgJournal, nil
}
// getGroupUUID checks if a UUID in the volume group journal is already
// reserved. If none is reserved, a new reservation is made. Upon exit of
// getGroupUUID, the function returns:
// 1. the UUID that was reserved
// 2. an undo() function that reverts the reservation (if that succeeded), should be called in a defer
// 3. an error or nil.
func (mgr *rbdManager) getGroupUUID(
ctx context.Context,
clusterID, journalPool, name string,
) (string, func(), error) {
nothingToUndo := func() {
// the reservation was not done, no need to undo the reservation
}
prefix := mgr.getVolumeGroupNamePrefix()
vgJournal, err := mgr.getVolumeGroupJournal(clusterID)
if err != nil {
return "", nothingToUndo, err
}
vgsData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix)
if err != nil {
return "", nothingToUndo, fmt.Errorf("failed to check reservation for group %q: %w", name, err)
}
var uuid string
if vgsData != nil && vgsData.GroupUUID != "" {
uuid = vgsData.GroupUUID
} else {
log.DebugLog(ctx, "the journal does not contain a reservation for group %q yet", name)
uuid, _ /*vgsName*/, err = vgJournal.ReserveName(ctx, journalPool, name, uuid, prefix)
if err != nil {
return "", nothingToUndo, fmt.Errorf("failed to reserve a UUID for group %q: %w", name, err)
}
}
log.DebugLog(ctx, "got UUID %q for group %q", uuid, name)
// undo contains the cleanup that should be done by the caller when the
// reservation was made, and further actions fulfilling the final
// request failed
undo := func() {
err = vgJournal.UndoReservation(ctx, journalPool, uuid, name)
if err != nil {
log.ErrorLog(ctx, "failed to undo the reservation for group %q: %w", name, err)
}
}
return uuid, undo, nil
}
func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) { func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) {
creds, err := mgr.getCredentials() creds, err := mgr.getCredentials()
if err != nil { if err != nil {
@ -725,3 +618,110 @@ func (mgr *rbdManager) VolumesInSameGroup(ctx context.Context, volumes []types.V
return true, nil return true, nil
} }
// getCredentials sets up credentials and connects to the journal.
func (mgr *rbdManager) getCredentials() (*util.Credentials, error) {
if mgr.creds != nil {
return mgr.creds, nil
}
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return nil, fmt.Errorf("failed to get credentials: %w", err)
}
mgr.creds = creds
return creds, nil
}
// getVolumeGroupNamePrefix returns the prefix for the volume group if set, or
// an empty string if none is configured.
func (mgr *rbdManager) getVolumeGroupNamePrefix() string {
return mgr.parameters["volumeGroupNamePrefix"]
}
func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) {
if mgr.vgJournal != nil {
return mgr.vgJournal, nil
}
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}
ns, err := util.GetRBDRadosNamespace(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err)
}
vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.driverInstance, ns)
vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds)
if err != nil {
return nil, fmt.Errorf("failed to connect to journal: %w", err)
}
mgr.vgJournal = vgJournal
return vgJournal, nil
}
// getGroupUUID checks if a UUID in the volume group journal is already
// reserved. If none is reserved, a new reservation is made. Upon exit of
// getGroupUUID, the function returns:
// 1. the UUID that was reserved
// 2. an undo() function that reverts the reservation (if that succeeded), should be called in a defer
// 3. an error or nil.
func (mgr *rbdManager) getGroupUUID(
ctx context.Context,
clusterID, journalPool, name string,
) (string, func(), error) {
nothingToUndo := func() {
// the reservation was not done, no need to undo the reservation
}
prefix := mgr.getVolumeGroupNamePrefix()
vgJournal, err := mgr.getVolumeGroupJournal(clusterID)
if err != nil {
return "", nothingToUndo, err
}
vgsData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix)
if err != nil {
return "", nothingToUndo, fmt.Errorf("failed to check reservation for group %q: %w", name, err)
}
var uuid string
if vgsData != nil && vgsData.GroupUUID != "" {
uuid = vgsData.GroupUUID
} else {
log.DebugLog(ctx, "the journal does not contain a reservation for group %q yet", name)
uuid, _ /*vgsName*/, err = vgJournal.ReserveName(ctx, journalPool, name, uuid, prefix)
if err != nil {
return "", nothingToUndo, fmt.Errorf("failed to reserve a UUID for group %q: %w", name, err)
}
}
log.DebugLog(ctx, "got UUID %q for group %q", uuid, name)
// undo contains the cleanup that should be done by the caller when the
// reservation was made, and further actions fulfilling the final
// request failed
undo := func() {
err = vgJournal.UndoReservation(ctx, journalPool, uuid, name)
if err != nil {
log.ErrorLog(ctx, "failed to undo the reservation for group %q: %w", name, err)
}
}
return uuid, undo, nil
}

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups
package rbd package rbd
import ( import (

View File

@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
//nolint:funcorder // reordering causes a lot of churn in this file, needs cleanups
package rbd package rbd
import ( import (

View File

@ -59,23 +59,6 @@ func NewConnPool(interval, expiry time.Duration) *ConnPool {
return &cp return &cp
} }
// loop through all cp.conns and destroy objects that have not been used for cp.expiry.
func (cp *ConnPool) gc() {
cp.lock.Lock()
defer cp.lock.Unlock()
now := time.Now()
for key, ce := range cp.conns {
if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry {
ce.destroy()
delete(cp.conns, key)
}
}
// schedule the next gc() run
cp.timer.Reset(cp.interval)
}
// Destroy stops the garbage collector and destroys all connections in the pool. // Destroy stops the garbage collector and destroys all connections in the pool.
func (cp *ConnPool) Destroy() { func (cp *ConnPool) Destroy() {
cp.timer.Stop() cp.timer.Stop()
@ -94,30 +77,6 @@ func (cp *ConnPool) Destroy() {
} }
} }
func (cp *ConnPool) generateUniqueKey(monitors, user, keyfile string) (string, error) {
// the keyfile can be unique for operations, contents will be the same
key, err := os.ReadFile(keyfile) // #nosec:G304, file inclusion via variable.
if err != nil {
return "", fmt.Errorf("could not open keyfile %s: %w", keyfile, err)
}
return fmt.Sprintf("%s|%s|%s", monitors, user, string(key)), nil
}
// getExisting returns the existing rados.Conn associated with the unique key.
//
// Requires: locked cp.lock because of ce.get().
func (cp *ConnPool) getConn(unique string) *rados.Conn {
ce, exists := cp.conns[unique]
if exists {
ce.get()
return ce.conn
}
return nil
}
// Get returns a rados.Conn for the given arguments. Creates a new rados.Conn in // Get returns a rados.Conn for the given arguments. Creates a new rados.Conn in
// case there is none. Use the returned rados.Conn to reduce the reference // case there is none. Use the returned rados.Conn to reduce the reference
// count with ConnPool.Put(unique). // count with ConnPool.Put(unique).
@ -206,6 +165,47 @@ func (cp *ConnPool) Put(conn *rados.Conn) {
} }
} }
// loop through all cp.conns and destroy objects that have not been used for cp.expiry.
func (cp *ConnPool) gc() {
cp.lock.Lock()
defer cp.lock.Unlock()
now := time.Now()
for key, ce := range cp.conns {
if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry {
ce.destroy()
delete(cp.conns, key)
}
}
// schedule the next gc() run
cp.timer.Reset(cp.interval)
}
func (cp *ConnPool) generateUniqueKey(monitors, user, keyfile string) (string, error) {
// the keyfile can be unique for operations, contents will be the same
key, err := os.ReadFile(keyfile) // #nosec:G304, file inclusion via variable.
if err != nil {
return "", fmt.Errorf("could not open keyfile %s: %w", keyfile, err)
}
return fmt.Sprintf("%s|%s|%s", monitors, user, string(key)), nil
}
// getExisting returns the existing rados.Conn associated with the unique key.
//
// Requires: locked cp.lock because of ce.get().
func (cp *ConnPool) getConn(unique string) *rados.Conn {
ce, exists := cp.conns[unique]
if exists {
ce.get()
return ce.conn
}
return nil
}
// Add a reference to the connEntry. // Add a reference to the connEntry.
// /!\ Only call this while holding the ConnPool.lock. // /!\ Only call this while holding the ConnPool.lock.
func (ce *connEntry) get() { func (ce *connEntry) get() {

View File

@ -43,6 +43,48 @@ type Credentials struct {
KeyFile string KeyFile string
} }
// NewUserCredentials creates new user credentials from secret.
func NewUserCredentials(secrets map[string]string) (*Credentials, error) {
return newCredentialsFromSecret(credUserID, credUserKey, secrets)
}
// NewAdminCredentials creates new admin credentials from secret.
func NewAdminCredentials(secrets map[string]string) (*Credentials, error) {
// Use userID and userKey if found else fallback to adminID and adminKey
if cred, err := newCredentialsFromSecret(credUserID, credUserKey, secrets); err == nil {
return cred, nil
}
log.WarningLogMsg("adminID and adminKey are deprecated, please use userID and userKey instead")
return newCredentialsFromSecret(credAdminID, credAdminKey, secrets)
}
// NewUserCredentialsWithMigration takes secret map from the request and validate it is
// a migration secret, if yes, it continues to create CR from it after parsing the migration
// secret. If it is not a migration it will continue the attempt to create credentials from it
// without parsing the secret. This function returns credentials and error.
func NewUserCredentialsWithMigration(secrets map[string]string) (*Credentials, error) {
if isMigrationSecret(secrets) {
migSecret, err := ParseAndSetSecretMapFromMigSecret(secrets)
if err != nil {
return nil, err
}
secrets = migSecret
}
cr, cErr := NewUserCredentials(secrets)
if cErr != nil {
return nil, cErr
}
return cr, nil
}
// DeleteCredentials removes the KeyFile.
func (cr *Credentials) DeleteCredentials() {
// don't complain about unhandled error
_ = os.Remove(cr.KeyFile)
}
func storeKey(key string) (string, error) { func storeKey(key string) (string, error) {
tmpfile, err := os.CreateTemp(tmpKeyFileLocation, tmpKeyFileNamePrefix) tmpfile, err := os.CreateTemp(tmpKeyFileLocation, tmpKeyFileNamePrefix)
if err != nil { if err != nil {
@ -99,28 +141,6 @@ func newCredentialsFromSecret(idField, keyField string, secrets map[string]strin
return c, err return c, err
} }
// DeleteCredentials removes the KeyFile.
func (cr *Credentials) DeleteCredentials() {
// don't complain about unhandled error
_ = os.Remove(cr.KeyFile)
}
// NewUserCredentials creates new user credentials from secret.
func NewUserCredentials(secrets map[string]string) (*Credentials, error) {
return newCredentialsFromSecret(credUserID, credUserKey, secrets)
}
// NewAdminCredentials creates new admin credentials from secret.
func NewAdminCredentials(secrets map[string]string) (*Credentials, error) {
// Use userID and userKey if found else fallback to adminID and adminKey
if cred, err := newCredentialsFromSecret(credUserID, credUserKey, secrets); err == nil {
return cred, nil
}
log.WarningLogMsg("adminID and adminKey are deprecated, please use userID and userKey instead")
return newCredentialsFromSecret(credAdminID, credAdminKey, secrets)
}
// GetMonValFromSecret returns monitors from secret. // GetMonValFromSecret returns monitors from secret.
func GetMonValFromSecret(secrets map[string]string) (string, error) { func GetMonValFromSecret(secrets map[string]string) (string, error) {
if mons, ok := secrets[credMonitors]; ok { if mons, ok := secrets[credMonitors]; ok {
@ -160,23 +180,3 @@ func isMigrationSecret(secrets map[string]string) bool {
// was hit on migration request compared to general one. // was hit on migration request compared to general one.
return len(secrets) != 0 && secrets[migUserKey] != "" return len(secrets) != 0 && secrets[migUserKey] != ""
} }
// NewUserCredentialsWithMigration takes secret map from the request and validate it is
// a migration secret, if yes, it continues to create CR from it after parsing the migration
// secret. If it is not a migration it will continue the attempt to create credentials from it
// without parsing the secret. This function returns credentials and error.
func NewUserCredentialsWithMigration(secrets map[string]string) (*Credentials, error) {
if isMigrationSecret(secrets) {
migSecret, err := ParseAndSetSecretMapFromMigSecret(secrets)
if err != nil {
return nil, err
}
secrets = migSecret
}
cr, cErr := NewUserCredentials(secrets)
if cErr != nil {
return nil, cErr
}
return cr, nil
}

View File

@ -108,6 +108,59 @@ func NewOperationLock() *OperationLock {
} }
} }
// GetSnapshotCreateLock gets the snapshot lock on given volumeID.
func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error {
return ol.tryAcquire(createOp, volumeID)
}
// GetCloneLock gets the clone lock on given volumeID.
func (ol *OperationLock) GetCloneLock(volumeID string) error {
return ol.tryAcquire(cloneOpt, volumeID)
}
// GetDeleteLock gets the delete lock on given volumeID,ensures that there is
// no clone,restore and expand operation on given volumeID.
func (ol *OperationLock) GetDeleteLock(volumeID string) error {
return ol.tryAcquire(deleteOp, volumeID)
}
// GetRestoreLock gets the restore lock on given volumeID,ensures that there is
// no delete operation on given volumeID.
func (ol *OperationLock) GetRestoreLock(volumeID string) error {
return ol.tryAcquire(restoreOp, volumeID)
}
// GetExpandLock gets the expand lock on given volumeID,ensures that there is
// no delete and clone operation on given volumeID.
func (ol *OperationLock) GetExpandLock(volumeID string) error {
return ol.tryAcquire(expandOp, volumeID)
}
// ReleaseSnapshotCreateLock releases the create lock on given volumeID.
func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) {
ol.release(createOp, volumeID)
}
// ReleaseCloneLock releases the clone lock on given volumeID.
func (ol *OperationLock) ReleaseCloneLock(volumeID string) {
ol.release(cloneOpt, volumeID)
}
// ReleaseDeleteLock releases the delete lock on given volumeID.
func (ol *OperationLock) ReleaseDeleteLock(volumeID string) {
ol.release(deleteOp, volumeID)
}
// ReleaseRestoreLock releases the restore lock on given volumeID.
func (ol *OperationLock) ReleaseRestoreLock(volumeID string) {
ol.release(restoreOp, volumeID)
}
// ReleaseExpandLock releases the expand lock on given volumeID.
func (ol *OperationLock) ReleaseExpandLock(volumeID string) {
ol.release(expandOp, volumeID)
}
// tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful. // tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
// If another operation is already using volumeID, returns false. // If another operation is already using volumeID, returns false.
func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { func (ol *OperationLock) tryAcquire(op operation, volumeID string) error {
@ -178,59 +231,6 @@ func (ol *OperationLock) tryAcquire(op operation, volumeID string) error {
return nil return nil
} }
// GetSnapshotCreateLock gets the snapshot lock on given volumeID.
func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error {
return ol.tryAcquire(createOp, volumeID)
}
// GetCloneLock gets the clone lock on given volumeID.
func (ol *OperationLock) GetCloneLock(volumeID string) error {
return ol.tryAcquire(cloneOpt, volumeID)
}
// GetDeleteLock gets the delete lock on given volumeID,ensures that there is
// no clone,restore and expand operation on given volumeID.
func (ol *OperationLock) GetDeleteLock(volumeID string) error {
return ol.tryAcquire(deleteOp, volumeID)
}
// GetRestoreLock gets the restore lock on given volumeID,ensures that there is
// no delete operation on given volumeID.
func (ol *OperationLock) GetRestoreLock(volumeID string) error {
return ol.tryAcquire(restoreOp, volumeID)
}
// GetExpandLock gets the expand lock on given volumeID,ensures that there is
// no delete and clone operation on given volumeID.
func (ol *OperationLock) GetExpandLock(volumeID string) error {
return ol.tryAcquire(expandOp, volumeID)
}
// ReleaseSnapshotCreateLock releases the create lock on given volumeID.
func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) {
ol.release(createOp, volumeID)
}
// ReleaseCloneLock releases the clone lock on given volumeID.
func (ol *OperationLock) ReleaseCloneLock(volumeID string) {
ol.release(cloneOpt, volumeID)
}
// ReleaseDeleteLock releases the delete lock on given volumeID.
func (ol *OperationLock) ReleaseDeleteLock(volumeID string) {
ol.release(deleteOp, volumeID)
}
// ReleaseRestoreLock releases the restore lock on given volumeID.
func (ol *OperationLock) ReleaseRestoreLock(volumeID string) {
ol.release(restoreOp, volumeID)
}
// ReleaseExpandLock releases the expand lock on given volumeID.
func (ol *OperationLock) ReleaseExpandLock(volumeID string) {
ol.release(expandOp, volumeID)
}
// release deletes the lock on volumeID. // release deletes the lock on volumeID.
func (ol *OperationLock) release(op operation, volumeID string) { func (ol *OperationLock) release(op operation, volumeID string) {
ol.mux.Lock() ol.mux.Lock()

View File

@ -134,15 +134,6 @@ func (c *FakeIOContext) GetLastVersion() (uint64, error) {
return c.LastObjVersion, nil return c.LastObjVersion, nil
} }
func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) {
obj, ok := c.Rados.Objs[oid]
if !ok {
return nil, rados.ErrNotFound
}
return obj, nil
}
func (c *FakeIOContext) GetXattr(oid, key string, data []byte) (int, error) { func (c *FakeIOContext) GetXattr(oid, key string, data []byte) (int, error) {
obj, ok := c.Rados.Objs[oid] obj, ok := c.Rados.Objs[oid]
if !ok { if !ok {
@ -200,6 +191,15 @@ func (c *FakeIOContext) CreateReadOp() ReadOpW {
} }
} }
func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) {
obj, ok := c.Rados.Objs[oid]
if !ok {
return nil, rados.ErrNotFound
}
return obj, nil
}
func (r *FakeReadOp) Operate(oid string) error { func (r *FakeReadOp) Operate(oid string) error {
r.oid = oid r.oid = oid