Merge pull request #347 from red-hat-storage/sync_us--devel

Syncing latest changes from upstream devel for ceph-csi
This commit is contained in:
openshift-merge-bot[bot] 2024-07-30 08:07:25 +00:00 committed by GitHub
commit a14ddd2ff3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 597 additions and 313 deletions

View File

@ -7,5 +7,8 @@
- deploy: podSecurityContexts can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4664).
- deploy: podSecurityContexts can be configured for ceph-csi-rbd chart in [PR](https://github.com/ceph/ceph-csi/pull/4668)
- deploy: instanceID can be optionally configured for ceph-csi charts in [PR](https://github.com/ceph/ceph-csi/pull/4666)
- rbd: add support for flattenMode option for replication in [PR](https://github.com/ceph/ceph-csi/pull/4678)
- cephfs: support omap data store in radosnamespace via cli argument in [PR](https://github.com/ceph/ceph-csi/pull/4652)
- deploy: radosNamespaceCephFS can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4652)
## NOTE

View File

@ -201,6 +201,7 @@ charts and their default values.
| `CSIDriver.fsGroupPolicy` | Specifies the fsGroupPolicy for the CSI driver object | `File` |
| `CSIDriver.seLinuxMount` | Specify for efficient SELinux volume relabeling | `true` |
| `instanceID` | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning. | ` ` |
| `radosNamespaceCephFS` | CephFS RadosNamespace used to store CSI specific objects and keys. | ` ` |
### Command Line

View File

@ -62,6 +62,9 @@ spec:
{{- if .Values.instanceID }}
- "--instanceid={{ .Values.instanceID }}"
{{- end }}
{{- if .Values.radosNamespaceCephFS }}
- "--radosnamespacecephfs={{ .Values.radosNamespaceCephFS }}"
{{- end }}
{{- if .Values.nodeplugin.profiling.enabled }}
- "--enableprofiling={{ .Values.nodeplugin.profiling.enabled }}"
{{- end }}

View File

@ -82,6 +82,9 @@ spec:
{{- if .Values.instanceID }}
- "--instanceid={{ .Values.instanceID }}"
{{- end }}
{{- if .Values.radosNamespaceCephFS }}
- "--radosnamespacecephfs={{ .Values.radosNamespaceCephFS }}"
{{- end }}
{{- if .Values.provisioner.profiling.enabled }}
- "--enableprofiling={{ .Values.provisioner.profiling.enabled }}"
{{- end }}

View File

@ -372,6 +372,8 @@ configMapName: ceph-csi-config
externallyManagedConfigmap: false
# Name of the configmap used for ceph.conf
cephConfConfigMapName: ceph-config
# CephFS RadosNamespace used to store CSI specific objects and keys.
# radosNamespaceCephFS: csi
# Unique ID distinguishing this instance of Ceph CSI among other instances,
# when sharing Ceph clusters across CSI instances for provisioning
# instanceID: default

View File

@ -100,6 +100,11 @@ func init() {
"kernelmountoptions",
"",
"Comma separated string of mount options accepted by cephfs kernel mounter")
flag.StringVar(
&conf.RadosNamespaceCephFS,
"radosnamespacecephfs",
"",
"CephFS RadosNamespace used to store CSI specific objects and keys.")
flag.StringVar(
&conf.FuseMountOptions,
"fusemountoptions",

View File

@ -49,6 +49,7 @@ make image-cephcsi
| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") |
| `--enable-read-affinity` | `false` | enable read affinity |
| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.<br>`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` |
| `--radosnamespacecephfs`| _empty_ | CephFS RadosNamespace used to store CSI specific objects and keys. |
**NOTE:** The parameter `-forcecephkernelclient` enables the Kernel
CephFS mounter on kernels < 4.17.

View File

@ -110,6 +110,11 @@ func (fs *Driver) Run(conf *util.Config) {
CSIInstanceID = conf.InstanceID
}
// Use passed in radosNamespace, if provided for storing CSI specific objects and keys.
if conf.RadosNamespaceCephFS != "" {
fsutil.RadosNamespace = conf.RadosNamespaceCephFS
}
if conf.IsNodeServer && k8s.RunsOnKubernetes() {
nodeLabels, err = k8s.GetNodeLabels(conf.NodeID)
if err != nil {

View File

@ -19,7 +19,5 @@ package util
// VolumeID string representation.
type VolumeID string
const (
// RadosNamespace to store CSI specific objects and keys.
RadosNamespace = "csi"
)
// RadosNamespace to store CSI specific objects and keys.
var RadosNamespace = "csi"

View File

@ -120,6 +120,12 @@ func (is *IdentityServer) GetCapabilities(
Type: identity.Capability_VolumeGroup_MODIFY_VOLUME_GROUP,
},
},
}, &identity.Capability{
Type: &identity.Capability_VolumeGroup_{
VolumeGroup: &identity.Capability_VolumeGroup{
Type: identity.Capability_VolumeGroup_GET_VOLUME_GROUP,
},
},
})
}

View File

@ -27,7 +27,9 @@ import (
"time"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -93,12 +95,17 @@ type ReplicationServer struct {
*replication.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions
*corerbd.ControllerServer
// csiID is the unique ID for this CSI-driver deployment.
csiID string
}
// NewReplicationServer creates a new ReplicationServer which handles
// the Replication Service requests from the CSI-Addons specification.
func NewReplicationServer(c *corerbd.ControllerServer) *ReplicationServer {
return &ReplicationServer{ControllerServer: c}
func NewReplicationServer(instanceID string, c *corerbd.ControllerServer) *ReplicationServer {
return &ReplicationServer{
ControllerServer: c,
csiID: instanceID,
}
}
func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) {
@ -124,18 +131,18 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er
// getFlattenMode gets flatten mode from the input GRPC request parameters.
// flattenMode is the key to check the mode in the parameters.
func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) {
func getFlattenMode(ctx context.Context, parameters map[string]string) (types.FlattenMode, error) {
val, ok := parameters[flattenModeKey]
if !ok {
log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)",
flattenModeKey, corerbd.FlattenModeNever)
flattenModeKey, types.FlattenModeNever)
return corerbd.FlattenModeNever, nil
return types.FlattenModeNever, nil
}
mode := corerbd.FlattenMode(val)
mode := types.FlattenMode(val)
switch mode {
case corerbd.FlattenModeForce, corerbd.FlattenModeNever:
case types.FlattenModeForce, types.FlattenModeNever:
return mode, nil
}
log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val)
@ -270,24 +277,27 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// extract the mirroring mode
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
if err != nil {
@ -299,21 +309,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
if info.GetState() != librbd.MirrorImageEnabled.String() {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, getGRPCError(err)
}
err = rbdVol.EnableImageMirroring(mirroringMode)
err = mirror.EnableMirroring(mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
@ -347,52 +356,54 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// extract the force option
force, err := getForceOption(ctx, req.GetParameters())
if err != nil {
return nil, err
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
switch mirroringInfo.State {
switch info.GetState() {
// image is already in disabled state
case librbd.MirrorImageDisabled:
case librbd.MirrorImageDisabled.String():
// image mirroring is still disabling
case librbd.MirrorImageDisabling:
case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled:
err = rbdVol.DisableVolumeReplication(mirroringInfo, force)
case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force)
if err != nil {
return nil, getGRPCError(err)
}
return &replication.DisableVolumeReplicationResponse{}, nil
default:
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", info.GetState())
}
return &replication.DisableVolumeReplicationResponse{}, nil
@ -422,48 +433,50 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
if info.GetState() != librbd.MirrorImageEnabled.String() {
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %d Mode",
rbdVol.VolID,
mirroringInfo.State)
"mirroring is not enabled on %s, image is in %s Mode",
volumeID,
info.GetState())
}
// promote secondary to primary
if !mirroringInfo.Primary {
if !info.IsPrimary() {
if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed
err = rbdVol.ForcePromoteImage(cr)
err = mirror.ForcePromote(cr)
} else {
err = rbdVol.PromoteImage(req.GetForce())
err = mirror.Promote(req.GetForce())
}
if err != nil {
log.ErrorLog(ctx, err.Error())
@ -483,7 +496,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
interval, startTime := getSchedulingDetails(req.GetParameters())
if interval != admin.NoInterval {
err = rbdVol.AddSnapshotScheduling(interval, startTime)
err = mirror.AddSnapshotScheduling(interval, startTime)
if err != nil {
return nil, err
}
@ -522,49 +535,51 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
creationTime, err := rbdVol.GetImageCreationTime()
creationTime, err := rbdVol.GetCreationTime()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
if info.GetState() != librbd.MirrorImageEnabled.String() {
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %d Mode",
rbdVol.VolID,
mirroringInfo.State)
"mirroring is not enabled on %s, image is in %s Mode",
volumeID,
info.GetState())
}
// demote image to secondary
if mirroringInfo.Primary {
if info.IsPrimary() {
// store the image creation time for resync
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
@ -577,7 +592,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}
err = rbdVol.DemoteImage()
err = mirror.Demote()
if err != nil {
log.ErrorLog(ctx, err.Error())
@ -590,22 +605,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
// checkRemoteSiteStatus checks the state of the remote cluster.
// It returns true if the state of the remote cluster is up and unknown.
func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool {
func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) bool {
ready := true
found := false
for _, s := range mirrorStatus.SiteStatuses {
for _, s := range mirrorStatus {
log.UsefulLog(
ctx,
"peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d",
s.MirrorUUID,
s.Up,
s.State,
s.Description,
s.LastUpdate)
if s.MirrorUUID != "" {
s.GetMirrorUUID(),
s.IsUP(),
s.GetState(),
s.GetDescription(),
s.GetLastUpdate())
if s.GetMirrorUUID() != "" {
found = true
// If ready is already "false" do not flip it based on another remote peer status
if ready && (s.State != librbd.MirrorImageStatusStateUnknown || !s.Up) {
if ready && (s.GetState() != librbd.MirrorImageStatusStateUnknown.String() || !s.IsUP()) {
ready = false
}
}
@ -639,26 +654,28 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
// in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation.
@ -667,22 +684,22 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
if info.GetState() != librbd.MirrorImageEnabled.String() {
return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled")
}
// return error if the image is still primary
if mirroringInfo.Primary {
if info.IsPrimary() {
return nil, status.Error(codes.InvalidArgument, "image is in primary state")
}
mirrorStatus, err := rbdVol.GetImageMirroringStatus()
sts, err := mirror.GetGlobalMirroringStatus()
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getImageMirroringInfo
// be executed as the error would get returned due to getMirroringInfo
// failing to find an image above.
return nil, status.Error(codes.Aborted, err.Error())
}
@ -692,22 +709,20 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
ready := false
localStatus, err := mirrorStatus.LocalStatus()
localStatus, err := sts.GetLocalSiteStatus()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, fmt.Errorf("failed to get local status: %w", err)
}
// convert the last update time to UTC
lastUpdateTime := time.Unix(localStatus.LastUpdate, 0).UTC()
log.UsefulLog(
ctx,
"local status: daemon up=%t, image mirroring state=%q, description=%q and lastUpdate=%s",
localStatus.Up,
localStatus.State,
localStatus.Description,
lastUpdateTime)
localStatus.IsUP(),
localStatus.GetState(),
localStatus.GetDescription(),
localStatus.GetLastUpdate())
// To recover from split brain (up+error) state the image need to be
// demoted and requested for resync on site-a and then the image on site-b
@ -719,11 +734,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
// If the image state on both the sites are up+unknown consider that
// complete data is synced as the last snapshot
// gets exchanged between the clusters.
if localStatus.State == librbd.MirrorImageStatusStateUnknown && localStatus.Up {
ready = checkRemoteSiteStatus(ctx, mirrorStatus)
if localStatus.GetState() == librbd.MirrorImageStatusStateUnknown.String() && localStatus.IsUP() {
ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus())
}
creationTime, err := rbdVol.GetImageCreationTime()
creationTime, err := rbdVol.GetCreationTime()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
}
@ -749,7 +764,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = rbdVol.ResyncVol(localStatus)
err = mirror.Resync()
if err != nil {
return nil, getGRPCError(err)
}
@ -853,42 +868,44 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Aborted, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
if info.GetState() != librbd.MirrorImageEnabled.String() {
return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled")
}
// return error if the image is not in primary state
if !mirroringInfo.Primary {
if !info.IsPrimary() {
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
}
mirrorStatus, err := rbdVol.GetImageMirroringStatus()
mirrorStatus, err := mirror.GetGlobalMirroringStatus()
if err != nil {
if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
@ -898,14 +915,14 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}
remoteStatus, err := RemoteStatus(ctx, mirrorStatus)
remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err)
}
description := remoteStatus.Description
description := remoteStatus.GetDescription()
resp, err := getLastSyncInfo(ctx, description)
if err != nil {
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
@ -919,36 +936,6 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return resp, nil
}
// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses
// slice that corresponds to the remote site's status. If the remote status
// is not found than the error ErrNotExist will be returned.
func RemoteStatus(ctx context.Context, gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageStatus, error) {
var (
ss librbd.SiteMirrorImageStatus
err error = librbd.ErrNotExist
)
for i := range gmis.SiteStatuses {
log.DebugLog(
ctx,
"Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t",
gmis.SiteStatuses[i].MirrorUUID,
gmis.SiteStatuses[i].State,
gmis.SiteStatuses[i].Description,
gmis.SiteStatuses[i].LastUpdate,
gmis.SiteStatuses[i].Up)
if gmis.SiteStatuses[i].MirrorUUID != "" {
ss = gmis.SiteStatuses[i]
err = nil
break
}
}
return ss, err
}
// This function gets the local snapshot time, last sync snapshot seconds
// and last sync bytes from the description of localStatus and convert
// it into required types.
@ -1015,12 +1002,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV
return &response, nil
}
func checkVolumeResyncStatus(ctx context.Context, localStatus librbd.SiteMirrorImageStatus) error {
func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error {
// we are considering local snapshot timestamp to check if the resync is
// started or not, if we dont see local_snapshot_timestamp in the
// description of localStatus, we are returning error. if we see the local
// snapshot timestamp in the description we return resyncing started.
description := localStatus.Description
description := localStatus.GetDescription()
resp, err := getLastSyncInfo(ctx, description)
if err != nil {
return fmt.Errorf("failed to get last sync info: %w", err)

View File

@ -26,6 +26,7 @@ import (
"time"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
@ -219,30 +220,36 @@ func TestCheckVolumeResyncStatus(t *testing.T) {
t.Parallel()
tests := []struct {
name string
args librbd.SiteMirrorImageStatus
args corerbd.SiteMirrorImageStatus
wantErr bool
}{
{
name: "test when local_snapshot_timestamp is non zero",
args: librbd.SiteMirrorImageStatus{
//nolint:lll // sample output cannot be split into multiple lines.
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
args: corerbd.SiteMirrorImageStatus{
SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{
//nolint:lll // sample output cannot be split into multiple lines.
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
},
wantErr: false,
},
{
name: "test when local_snapshot_timestamp is zero",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
args: corerbd.SiteMirrorImageStatus{
SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
},
wantErr: true,
},
{
name: "test when local_snapshot_timestamp is not present",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
args: corerbd.SiteMirrorImageStatus{
SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
},
wantErr: true,
},
@ -261,17 +268,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
t.Parallel()
tests := []struct {
name string
args *librbd.GlobalMirrorImageStatus
args corerbd.GlobalMirrorStatus
wantReady bool
}{
{
name: "Test a single peer in sync",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
},
},
},
@ -279,17 +288,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test a single peer in sync, including a local instance",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
},
},
},
@ -297,17 +308,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test a multiple peers in sync",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
},
},
},
@ -315,19 +328,23 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test no remote peers",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{},
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{},
},
},
wantReady: false,
},
{
name: "Test single peer not in sync",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateReplaying,
Up: true,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateReplaying,
Up: true,
},
},
},
},
@ -335,12 +352,14 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test single peer not up",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote",
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
},
},
},
},
@ -348,17 +367,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test multiple peers, when first peer is not in sync",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateStoppingReplay,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateStoppingReplay,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
},
},
},
@ -366,17 +387,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
},
{
name: "Test multiple peers, when second peer is not up",
args: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
args: corerbd.GlobalMirrorStatus{
GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "remote1",
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
{
MirrorUUID: "remote2",
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
},
},
},
},
@ -386,7 +409,7 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if ready := checkRemoteSiteStatus(context.TODO(), tt.args); ready != tt.wantReady {
if ready := checkRemoteSiteStatus(context.TODO(), tt.args.GetAllSitesStatus()); ready != tt.wantReady {
t.Errorf("checkRemoteSiteStatus() ready = %v, expect ready = %v", ready, tt.wantReady)
}
})
@ -651,7 +674,7 @@ func Test_getFlattenMode(t *testing.T) {
tests := []struct {
name string
args args
want corerbd.FlattenMode
want types.FlattenMode
wantErr bool
}{
{
@ -660,27 +683,27 @@ func Test_getFlattenMode(t *testing.T) {
ctx: context.TODO(),
parameters: map[string]string{},
},
want: corerbd.FlattenModeNever,
want: types.FlattenModeNever,
},
{
name: "flattenMode option set to never",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeNever),
flattenModeKey: string(types.FlattenModeNever),
},
},
want: corerbd.FlattenModeNever,
want: types.FlattenModeNever,
},
{
name: "flattenMode option set to force",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeForce),
flattenModeKey: string(types.FlattenModeForce),
},
},
want: corerbd.FlattenModeForce,
want: types.FlattenModeForce,
},
{

View File

@ -363,3 +363,41 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
VolumeGroup: csiVG,
}, nil
}
// ControllerGetVolumeGroup RPC call to get a volume group.
//
// From the spec:
// ControllerGetVolumeGroupResponse should contain current information of a
// volume group if it exists. If the volume group does not exist any more,
// ControllerGetVolumeGroup should return gRPC error code NOT_FOUND.
func (vs *VolumeGroupServer) ControllerGetVolumeGroup(
ctx context.Context,
req *volumegroup.ControllerGetVolumeGroupRequest,
) (*volumegroup.ControllerGetVolumeGroupResponse, error) {
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)
// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not find volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}
defer vg.Destroy(ctx)
csiVG, err := vg.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to convert volume group %q to CSI format: %v",
vg,
err)
}
return &volumegroup.ControllerGetVolumeGroupResponse{
VolumeGroup: csiVG,
}, nil
}

View File

@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume(
func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) {
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := rbdVol.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
@ -998,7 +998,7 @@ func cleanupRBDImage(ctx context.Context,
// Mirroring is enabled on the image
// Local image is secondary
// Local image is in up+replaying state
if mirroringInfo.State == librbd.MirrorImageEnabled && !mirroringInfo.Primary {
if info.GetState() == librbd.MirrorImageEnabled.String() && !info.IsPrimary() {
// If the image is in a secondary state and its up+replaying means its
// an healthy secondary and the image is primary somewhere in the
// remote cluster and the local image is getting replayed. Delete the
@ -1007,11 +1007,18 @@ func cleanupRBDImage(ctx context.Context,
// the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, PVC and PV
// objects after failback operation.
localStatus, rErr := rbdVol.GetLocalState()
sts, rErr := rbdVol.GetGlobalMirroringStatus()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
localStatus, rErr := sts.GetLocalSiteStatus()
if rErr != nil {
log.ErrorLog(ctx, "failed to get local status for volume %s: %w", rbdVol.RbdImageName, rErr)
return nil, status.Error(codes.Internal, rErr.Error())
}
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() {
if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)",
rbdVol.RequestName, rbdVol.RbdImageName, err)
@ -1023,8 +1030,8 @@ func cleanupRBDImage(ctx context.Context,
}
log.ErrorLog(ctx,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
localStatus.IsUP(),
localStatus.GetState())
}
inUse, err := rbdVol.isInUse()

View File

@ -219,7 +219,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
fcs := casrbd.NewFenceControllerServer()
r.cas.RegisterService(fcs)
rcs := casrbd.NewReplicationServer(NewControllerServer(r.cd))
rcs := casrbd.NewReplicationServer(rbd.CSIInstanceID, NewControllerServer(r.cd))
r.cas.RegisterService(rcs)
vgcs := casrbd.NewVolumeGroupServer(conf.InstanceID)

View File

@ -77,3 +77,7 @@ func (rv *rbdVolume) RemoveFromGroup(ctx context.Context, vg types.VolumeGroup)
return librbd.GroupImageRemove(ioctx, name, rv.ioctx, rv.RbdImageName)
}
func (rv *rbdVolume) ToMirror() (types.Mirror, error) {
return rv, nil
}

View File

@ -20,21 +20,13 @@ import (
"fmt"
"time"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
librbd "github.com/ceph/go-ceph/rbd"
)
// FlattenMode is used to indicate the flatten mode for an RBD image.
type FlattenMode string
const (
// FlattenModeNever indicates that the image should never be flattened.
FlattenModeNever FlattenMode = "never"
// FlattenModeForce indicates that the image with the parent must be flattened.
FlattenModeForce FlattenMode = "force"
)
// HandleParentImageExistence checks the image's parent.
// if the parent image does not exist and is not in trash, it returns nil.
// if the flattenMode is FlattenModeForce, it flattens the image itself.
@ -42,13 +34,12 @@ const (
// if the parent image exists and is not enabled for mirroring, it returns an error.
func (rv *rbdVolume) HandleParentImageExistence(
ctx context.Context,
flattenMode FlattenMode,
mode types.FlattenMode,
) error {
if rv.ParentName == "" && !rv.ParentInTrash {
return nil
}
if flattenMode == FlattenModeForce {
if mode == types.FlattenModeForce {
// Delete temp image that exists for volume datasource since
// it is no longer required when the live image is flattened.
err := rv.DeleteTempImage(ctx)
@ -72,14 +63,13 @@ func (rv *rbdVolume) HandleParentImageExistence(
if err != nil {
return err
}
parentMirroringInfo, err := parent.GetImageMirroringInfo()
parentMirroringInfo, err := parent.GetMirroringInfo()
if err != nil {
return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w",
parent, rv, err)
}
if parentMirroringInfo.State != librbd.MirrorImageEnabled {
if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() {
return fmt.Errorf("%w: failed to enable mirroring on image %q: "+
"parent image %q is not enabled for mirroring",
ErrFailedPrecondition, rv, parent)
@ -88,8 +78,11 @@ func (rv *rbdVolume) HandleParentImageExistence(
return nil
}
// EnableImageMirroring enables mirroring on an image.
func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
// check that rbdVolume implements the types.Mirror interface.
var _ types.Mirror = &rbdVolume{}
// EnableMirroring enables mirroring on an image.
func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -104,8 +97,8 @@ func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
return nil
}
// DisableImageMirroring disables mirroring on an image.
func (ri *rbdImage) DisableImageMirroring(force bool) error {
// DisableMirroring disables mirroring on an image.
func (ri *rbdImage) DisableMirroring(force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -120,8 +113,8 @@ func (ri *rbdImage) DisableImageMirroring(force bool) error {
return nil
}
// GetImageMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
// GetMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -133,11 +126,11 @@ func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
return nil, fmt.Errorf("failed to get mirroring info of %q with error: %w", ri, err)
}
return info, nil
return ImageStatus{MirrorImageInfo: info}, nil
}
// PromoteImage promotes image to primary.
func (ri *rbdImage) PromoteImage(force bool) error {
// Promote promotes image to primary.
func (ri *rbdImage) Promote(force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -151,10 +144,10 @@ func (ri *rbdImage) PromoteImage(force bool) error {
return nil
}
// ForcePromoteImage promotes image to primary with force option with 2 minutes
// ForcePromote promotes image to primary with force option with 2 minutes
// timeout. If there is no response within 2 minutes,the rbd CLI process will be
// killed and an error is returned.
func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error {
func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
promoteArgs := []string{
"mirror", "image", "promote",
rv.String(),
@ -181,8 +174,8 @@ func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error {
return nil
}
// DemoteImage demotes image to secondary.
func (ri *rbdImage) DemoteImage() error {
// Demote demotes image to secondary.
func (ri *rbdImage) Demote() error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -196,8 +189,8 @@ func (ri *rbdImage) DemoteImage() error {
return nil
}
// resyncImage resync image to correct the split-brain.
func (ri *rbdImage) resyncImage() error {
// Resync resync image to correct the split-brain.
func (ri *rbdImage) Resync() error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -208,11 +201,14 @@ func (ri *rbdImage) resyncImage() error {
return fmt.Errorf("failed to resync image %q with error: %w", ri, err)
}
return nil
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}
// GetImageMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) {
// GetGlobalMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -223,26 +219,110 @@ func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus,
return nil, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err)
}
return &statusInfo, nil
return GlobalMirrorStatus{GlobalMirrorImageStatus: statusInfo}, nil
}
// GetLocalState returns the local state of the image.
func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) {
localStatus := librbd.SiteMirrorImageStatus{}
image, err := ri.open()
if err != nil {
return localStatus, fmt.Errorf("failed to open image %q with error: %w", ri, err)
}
defer image.Close()
statusInfo, err := image.GetGlobalMirrorStatus()
if err != nil {
return localStatus, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err)
}
localStatus, err = statusInfo.LocalStatus()
if err != nil {
return localStatus, fmt.Errorf("failed to get local status: %w", err)
}
return localStatus, nil
// ImageStatus is a wrapper around librbd.MirrorImageInfo that contains the
// image mirror status.
type ImageStatus struct {
*librbd.MirrorImageInfo
}
func (status ImageStatus) GetState() string {
return status.State.String()
}
func (status ImageStatus) IsPrimary() bool {
return status.Primary
}
// GlobalMirrorStatus is a wrapper around librbd.GlobalMirrorImageStatus that contains the
// global mirror image status.
type GlobalMirrorStatus struct {
librbd.GlobalMirrorImageStatus
}
func (status GlobalMirrorStatus) GetState() string {
return status.GlobalMirrorImageStatus.Info.State.String()
}
func (status GlobalMirrorStatus) IsPrimary() bool {
return status.GlobalMirrorImageStatus.Info.Primary
}
func (status GlobalMirrorStatus) GetLocalSiteStatus() (types.SiteStatus, error) {
s, err := status.GlobalMirrorImageStatus.LocalStatus()
if err != nil {
err = fmt.Errorf("failed to get local site status: %w", err)
}
return SiteMirrorImageStatus{
SiteMirrorImageStatus: s,
}, err
}
func (status GlobalMirrorStatus) GetAllSitesStatus() []types.SiteStatus {
var siteStatuses []types.SiteStatus
for _, ss := range status.SiteStatuses {
siteStatuses = append(siteStatuses, SiteMirrorImageStatus{SiteMirrorImageStatus: ss})
}
return siteStatuses
}
// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses
// slice that corresponds to the remote site's status. If the remote status
// is not found than the error ErrNotExist will be returned.
func (status GlobalMirrorStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) {
var (
ss librbd.SiteMirrorImageStatus
err error = librbd.ErrNotExist
)
for i := range status.SiteStatuses {
log.DebugLog(
ctx,
"Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t",
status.SiteStatuses[i].MirrorUUID,
status.SiteStatuses[i].State,
status.SiteStatuses[i].Description,
status.SiteStatuses[i].LastUpdate,
status.SiteStatuses[i].Up)
if status.SiteStatuses[i].MirrorUUID != "" {
ss = status.SiteStatuses[i]
err = nil
break
}
}
return SiteMirrorImageStatus{SiteMirrorImageStatus: ss}, err
}
// SiteMirrorImageStatus is a wrapper around librbd.SiteMirrorImageStatus that contains the
// site mirror image status.
type SiteMirrorImageStatus struct {
librbd.SiteMirrorImageStatus
}
func (status SiteMirrorImageStatus) GetMirrorUUID() string {
return status.MirrorUUID
}
func (status SiteMirrorImageStatus) GetState() string {
return status.State.String()
}
func (status SiteMirrorImageStatus) GetDescription() string {
return status.Description
}
func (status SiteMirrorImageStatus) IsUP() bool {
return status.Up
}
func (status SiteMirrorImageStatus) GetLastUpdate() time.Time {
// convert the last update time to UTC
return time.Unix(status.LastUpdate, 0).UTC()
}

View File

@ -413,6 +413,10 @@ func (ri *rbdImage) String() string {
return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName)
}
func (ri *rbdImage) GetPoolName() string {
return ri.Pool
}
// String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot.
func (rs *rbdSnapshot) String() string {
if rs.RadosNamespace != "" {
@ -1594,9 +1598,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO
return nil
}
// GetImageCreationTime returns the creation time of the image. if the image
// GetCreationTime returns the creation time of the image. if the image
// creation time is not set, it queries the image info and returns the creation time.
func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) {
func (ri *rbdImage) GetCreationTime() (*timestamppb.Timestamp, error) {
if ri.CreatedAt != nil {
return ri.CreatedAt, nil
}

View File

@ -20,20 +20,11 @@ import (
"context"
"fmt"
"github.com/ceph/ceph-csi/internal/rbd/types"
librbd "github.com/ceph/go-ceph/rbd"
)
func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error {
if err := rv.resyncImage(); err != nil {
return fmt.Errorf("failed to resync image: %w", err)
}
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}
// repairResyncedImageID updates the existing image ID with new one.
func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error {
// During resync operation the local image will get deleted and a new
@ -54,11 +45,11 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro
return rv.repairImageID(ctx, j, true)
}
func (rv *rbdVolume) DisableVolumeReplication(
mirroringInfo *librbd.MirrorImageInfo,
func DisableVolumeReplication(mirror types.Mirror,
primary,
force bool,
) error {
if !mirroringInfo.Primary {
if !primary {
// Return success if the below condition is met
// Local image is secondary
// Local image is in up+replaying state
@ -71,29 +62,35 @@ func (rv *rbdVolume) DisableVolumeReplication(
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rv.GetLocalState()
sts, rErr := mirror.GetGlobalMirroringStatus()
if rErr != nil {
return fmt.Errorf("failed to get local state: %w", rErr)
return fmt.Errorf("failed to get global state: %w", rErr)
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
localStatus, err := sts.GetLocalSiteStatus()
if err != nil {
return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument)
}
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() {
return nil
}
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.Up, localStatus.State)
ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
}
err := rv.DisableImageMirroring(force)
err := mirror.DisableMirroring(force)
if err != nil {
return fmt.Errorf("failed to disable image mirroring: %w", err)
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rv.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
return fmt.Errorf("failed to get mirroring info of image: %w", err)
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID)
if info.GetState() == librbd.MirrorImageDisabling.String() {
return fmt.Errorf("%w: image is in disabling state", ErrAborted)
}
return nil

View File

@ -0,0 +1,95 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"context"
"time"
"github.com/ceph/ceph-csi/internal/util"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
)
// FlattenMode is used to indicate the flatten mode for an RBD image.
type FlattenMode string
const (
// FlattenModeNever indicates that the image should never be flattened.
FlattenModeNever FlattenMode = "never"
// FlattenModeForce indicates that the image with the parent must be flattened.
FlattenModeForce FlattenMode = "force"
)
// Mirror is the interface for managing mirroring on an RBD image or a group.
type Mirror interface {
// EnableMirroring enables mirroring on the resource with the specified mode.
EnableMirroring(mode librbd.ImageMirrorMode) error
// DisableMirroring disables mirroring on the resource with the option to force the operation
DisableMirroring(force bool) error
// Promote promotes the resource to primary status with the option to force the operation
Promote(force bool) error
// ForcePromote promotes the resource to primary status with a timeout
ForcePromote(cr *util.Credentials) error
// Demote demotes the resource to secondary status
Demote() error
// Resync resynchronizes the resource
Resync() error
// GetMirroringInfo returns the mirroring information of the resource
GetMirroringInfo() (MirrorInfo, error)
// GetMirroringInfo returns the mirroring information of the resource
GetGlobalMirroringStatus() (GlobalStatus, error)
// AddSnapshotScheduling adds a snapshot scheduling to the resource
AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error
}
// MirrorImage is the interface for managing mirroring on an RBD image or group of images.
// This will be used to get the state of resource and it is primary or secondary.
type MirrorInfo interface {
// IsPrimary returns true if the resource is primary
IsPrimary() bool
// GetState returns the state of the resource
GetState() string
}
// GlobalStatus is the interface for fetching the global status of the mirroring.
// This will be used to get the status of the local site and remote site or all sites.
type GlobalStatus interface {
MirrorInfo
// GetLocalSiteStatus returns the local site status
GetLocalSiteStatus() (SiteStatus, error)
// GetAllSitesStatus returns the status of all sites
GetAllSitesStatus() []SiteStatus
// GetRemoteSiteStatus returns the status of the remote site
GetRemoteSiteStatus(ctx context.Context) (SiteStatus, error)
}
// SiteStatus is the interface for fetching the status of a site.
// This will be used to get the status of the local site and remote site.
type SiteStatus interface {
// GetMirrorUUID returns the mirror UUID
GetMirrorUUID() string
// IsUP returns true if the site is up
IsUP() bool
// GetState returns the state of the site
GetState() string
// GetDescription returns the description of the site
GetDescription() string
// GetLastUpdate returns the last update time
GetLastUpdate() time.Time
}

View File

@ -20,8 +20,10 @@ import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"
)
//nolint:interfacebloat // more than 10 methods are needed for the interface
type Volume interface {
// Destroy frees the resources used by the Volume.
Destroy(ctx context.Context)
@ -40,4 +42,24 @@ type Volume interface {
// RemoveFromGroup removes the Volume from the VolumeGroup.
RemoveFromGroup(ctx context.Context, vg VolumeGroup) error
// GetPoolName returns the name of the pool where the volume is stored.
GetPoolName() string
// GetCreationTime returns the creation time of the volume.
GetCreationTime() (*timestamppb.Timestamp, error)
// GetMetadata returns the value of the metadata key from the volume.
GetMetadata(key string) (string, error)
// SetMetadata sets the value of the metadata key on the volume.
SetMetadata(key, value string) error
// RepairResyncedImageID updates the existing image ID with new one in OMAP.
RepairResyncedImageID(ctx context.Context, ready bool) error
// HandleParentImageExistence checks the image's parent.
// if the parent image does not exist and is not in trash, it returns nil.
// if the flattenMode is FlattenModeForce, it flattens the image itself.
// if the parent image is in trash, it returns an error.
// if the parent image exists and is not enabled for mirroring, it returns an error.
HandleParentImageExistence(ctx context.Context, flattenMode FlattenMode) error
// ToMirror converts the Volume to a Mirror.
ToMirror() (Mirror, error)
}

View File

@ -142,9 +142,9 @@ type Config struct {
SkipForceFlatten bool
// cephfs related flags
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
SetMetadata bool // set metadata on the volume
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
RadosNamespaceCephFS string // RadosNamespace used to store CSI specific objects and keys
SetMetadata bool // set metadata on the volume
// Read affinity related options
EnableReadAffinity bool // enable OSD read affinity.