rbd: provide option to disable setting metadata on rbd images

As we added support to set the metadata on the rbd images created for
the PVC and volume snapshot, by default metadata is set on all the images.

As we have seen we are hitting issues#2327 a lot of times with this,
we start to leave a lot of stale images. Currently, we rely on
`--extra-create-metadata=true` to decide to set the metadata or not,
we cannot set this option to false to disable setting metadata because we
use this for encryption too.

This changes is to provide an option to disable setting the image
metadata when starting cephcsi.

Fixes: #3009
Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2022-04-12 09:33:00 +05:30 committed by mergify[bot]
parent 8a47904e8f
commit caf4090657
8 changed files with 26 additions and 0 deletions

View File

@ -69,6 +69,7 @@ func init() {
flag.StringVar(&conf.PluginPath, "pluginpath", defaultPluginPath, "plugin path") flag.StringVar(&conf.PluginPath, "pluginpath", defaultPluginPath, "plugin path")
flag.StringVar(&conf.StagingPath, "stagingpath", defaultStagingPath, "staging path") flag.StringVar(&conf.StagingPath, "stagingpath", defaultStagingPath, "staging path")
flag.StringVar(&conf.ClusterName, "clustername", "", "name of the cluster") flag.StringVar(&conf.ClusterName, "clustername", "", "name of the cluster")
flag.BoolVar(&conf.SetMetadata, "setmetadata", false, "set metadata on the volume")
flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning") " instances, when sharing Ceph clusters across CSI instances for provisioning")
flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups")
@ -251,6 +252,7 @@ func main() {
DriverName: dname, DriverName: dname,
Namespace: conf.DriverNamespace, Namespace: conf.DriverNamespace,
ClusterName: conf.ClusterName, ClusterName: conf.ClusterName,
SetMetadata: conf.SetMetadata,
} }
// initialize all controllers before starting. // initialize all controllers before starting.
initControllers() initControllers()

View File

@ -38,6 +38,7 @@ type Config struct {
DriverName string DriverName string
Namespace string Namespace string
ClusterName string ClusterName string
SetMetadata bool
} }
// ControllerList holds the list of managers need to be started. // ControllerList holds the list of managers need to be started.

View File

@ -185,6 +185,7 @@ func (r ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.
requestName, requestName,
pvcNamespace, pvcNamespace,
r.config.ClusterName, r.config.ClusterName,
r.config.SetMetadata,
cr) cr)
if err != nil { if err != nil {
log.ErrorLogMsg("failed to regenerate journal %s", err) log.ErrorLogMsg("failed to regenerate journal %s", err)

View File

@ -55,6 +55,9 @@ type ControllerServer struct {
// Cluster name // Cluster name
ClusterName string ClusterName string
// Set metadata on volume
SetMetadata bool
} }
func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error { func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error {
@ -173,7 +176,10 @@ func (cs *ControllerServer) parseVolCreateRequest(
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// set cluster name on volume
rbdVol.ClusterName = cs.ClusterName rbdVol.ClusterName = cs.ClusterName
// set metadata on volume
rbdVol.EnableMetadata = cs.SetMetadata
// if the KMS is of type VaultToken, additional metadata is needed // if the KMS is of type VaultToken, additional metadata is needed
// depending on the tenant, the KMS can be configured with other // depending on the tenant, the KMS can be configured with other
@ -1061,6 +1067,7 @@ func (cs *ControllerServer) CreateSnapshot(
return nil, err return nil, err
} }
rbdVol.EnableMetadata = cs.SetMetadata
// Check if source volume was created with required image features for snaps // Check if source volume was created with required image features for snaps
if !rbdVol.hasSnapshotFeature() { if !rbdVol.hasSnapshotFeature() {

View File

@ -162,6 +162,7 @@ func (r *Driver) Run(conf *util.Config) {
if conf.IsControllerServer { if conf.IsControllerServer {
r.cs = NewControllerServer(r.cd) r.cs = NewControllerServer(r.cd)
r.cs.ClusterName = conf.ClusterName r.cs.ClusterName = conf.ClusterName
r.cs.SetMetadata = conf.SetMetadata
r.rs = NewReplicationServer(r.cs) r.rs = NewReplicationServer(r.cs)
} }
if !conf.IsControllerServer && !conf.IsNodeServer { if !conf.IsControllerServer && !conf.IsNodeServer {

View File

@ -543,6 +543,7 @@ func RegenerateJournal(
requestName, requestName,
owner, owner,
clusterName string, clusterName string,
setMetadata bool,
cr *util.Credentials, cr *util.Credentials,
) (string, error) { ) (string, error) {
ctx := context.Background() ctx := context.Background()
@ -557,6 +558,7 @@ func RegenerateJournal(
rbdVol = &rbdVolume{} rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID rbdVol.VolID = volumeID
rbdVol.ClusterName = clusterName rbdVol.ClusterName = clusterName
rbdVol.EnableMetadata = setMetadata
err = vi.DecomposeCSIID(rbdVol.VolID) err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil { if err != nil {

View File

@ -132,6 +132,8 @@ type rbdImage struct {
// Cluster name // Cluster name
ClusterName string ClusterName string
// Set metadata on volume
EnableMetadata bool
// encryption provides access to optional VolumeEncryption functions // encryption provides access to optional VolumeEncryption functions
encryption *util.VolumeEncryption encryption *util.VolumeEncryption
@ -2061,6 +2063,10 @@ func genVolFromVolIDWithMigration(
// setAllMetadata set all the metadata from arg parameters on RBD image. // setAllMetadata set all the metadata from arg parameters on RBD image.
func (rv *rbdVolume) setAllMetadata(parameters map[string]string) error { func (rv *rbdVolume) setAllMetadata(parameters map[string]string) error {
if !rv.EnableMetadata {
return nil
}
for k, v := range parameters { for k, v := range parameters {
err := rv.SetMetadata(k, v) err := rv.SetMetadata(k, v)
if err != nil { if err != nil {
@ -2081,6 +2087,10 @@ func (rv *rbdVolume) setAllMetadata(parameters map[string]string) error {
// unsetAllMetadata unset all the metadata from arg keys on RBD image. // unsetAllMetadata unset all the metadata from arg keys on RBD image.
func (rv *rbdVolume) unsetAllMetadata(keys []string) error { func (rv *rbdVolume) unsetAllMetadata(keys []string) error {
if !rv.EnableMetadata {
return nil
}
for _, key := range keys { for _, key := range keys {
err := rv.RemoveMetadata(key) err := rv.RemoveMetadata(key)
// TODO: replace string comparison with errno. // TODO: replace string comparison with errno.

View File

@ -101,6 +101,8 @@ type Config struct {
// cephfs related flags // cephfs related flags
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17 ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
SetMetadata bool // set metadata on the volume
// RbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before a flatten // RbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before a flatten
// occurs // occurs
RbdHardMaxCloneDepth uint RbdHardMaxCloneDepth uint