diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 564d72709..dd3a442d3 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -723,7 +723,24 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS }, }, nil } + var snaps []snapshotInfo + // check the number of snapshots on image + snaps, err = rbdVol.listSnapshots(ctx, cr) + if err != nil { + var einf ErrImageNotFound + if errors.As(err, &einf) { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + if len(snaps) > int(maxSnapshotsOnImage) { + err = flattenClonedRbdImages(ctx, snaps, rbdVol.Pool, rbdVol.Monitors, cr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return nil, status.Errorf(codes.ResourceExhausted, "rbd image %s has %d snapshots", rbdVol, len(snaps)) + } err = reserveSnap(ctx, rbdSnap, rbdVol, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index e5b0abf3c..7144eebbd 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -417,12 +417,42 @@ func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) { } } -func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error { - depth, err := rv.getCloneDepth(ctx) +func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, monitors string, cr *util.Credentials) error { + rv := &rbdVolume{ + Monitors: monitors, + Pool: pool, + } + defer rv.Destroy() + err := rv.Connect(cr) if err != nil { + klog.Errorf(util.Log(ctx, "failed to open connection %s; err %v"), rv, err) return err } - klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv) + for _, s := range snaps { + if s.Namespace.Type == "trash" { + rv.RbdImageName = s.Namespace.OriginalName + err = rv.flattenRbdImage(ctx, cr, true) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to flatten %s; err %v"), rv, err) + continue + } + } + } + return nil +} + +func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error { + var depth uint + var err error + + // skip clone depth check if request is for force flatten + if !forceFlatten { + depth, err = rv.getCloneDepth(ctx) + if err != nil { + return err + } + klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv) + } if forceFlatten || (depth >= rbdHardMaxCloneDepth) || (depth >= rbdSoftMaxCloneDepth) { args := []string{"flatten", rv.Pool + "/" + rv.RbdImageName, "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors}