diff --git a/PendingReleaseNotes.md b/PendingReleaseNotes.md index 61686484f..9e2f46a3b 100644 --- a/PendingReleaseNotes.md +++ b/PendingReleaseNotes.md @@ -7,5 +7,8 @@ - deploy: podSecurityContexts can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4664). - deploy: podSecurityContexts can be configured for ceph-csi-rbd chart in [PR](https://github.com/ceph/ceph-csi/pull/4668) - deploy: instanceID can be optionally configured for ceph-csi charts in [PR](https://github.com/ceph/ceph-csi/pull/4666) +- rbd: add support for flattenMode option for replication in [PR](https://github.com/ceph/ceph-csi/pull/4678) +- cephfs: support omap data store in radosnamespace via cli argument in [PR](https://github.com/ceph/ceph-csi/pull/4652) +- deploy: radosNamespaceCephFS can be configured for ceph-csi-cephfs chart in [PR](https://github.com/ceph/ceph-csi/pull/4652) ## NOTE diff --git a/charts/ceph-csi-cephfs/README.md b/charts/ceph-csi-cephfs/README.md index 9ba8e0527..703b923ff 100644 --- a/charts/ceph-csi-cephfs/README.md +++ b/charts/ceph-csi-cephfs/README.md @@ -201,6 +201,7 @@ charts and their default values. | `CSIDriver.fsGroupPolicy` | Specifies the fsGroupPolicy for the CSI driver object | `File` | | `CSIDriver.seLinuxMount` | Specify for efficient SELinux volume relabeling | `true` | | `instanceID` | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning. | ` ` | +| `radosNamespaceCephFS` | CephFS RadosNamespace used to store CSI specific objects and keys. | ` ` | ### Command Line diff --git a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml index ed984017b..b91b8047a 100644 --- a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml @@ -62,6 +62,9 @@ spec: {{- if .Values.instanceID }} - "--instanceid={{ .Values.instanceID }}" {{- end }} +{{- if .Values.radosNamespaceCephFS }} + - "--radosnamespacecephfs={{ .Values.radosNamespaceCephFS }}" +{{- end }} {{- if .Values.nodeplugin.profiling.enabled }} - "--enableprofiling={{ .Values.nodeplugin.profiling.enabled }}" {{- end }} diff --git a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml index c6c7c8278..3257705af 100644 --- a/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml +++ b/charts/ceph-csi-cephfs/templates/provisioner-deployment.yaml @@ -82,6 +82,9 @@ spec: {{- if .Values.instanceID }} - "--instanceid={{ .Values.instanceID }}" {{- end }} +{{- if .Values.radosNamespaceCephFS }} + - "--radosnamespacecephfs={{ .Values.radosNamespaceCephFS }}" +{{- end }} {{- if .Values.provisioner.profiling.enabled }} - "--enableprofiling={{ .Values.provisioner.profiling.enabled }}" {{- end }} diff --git a/charts/ceph-csi-cephfs/values.yaml b/charts/ceph-csi-cephfs/values.yaml index 685cc1983..f2cc91b25 100644 --- a/charts/ceph-csi-cephfs/values.yaml +++ b/charts/ceph-csi-cephfs/values.yaml @@ -372,6 +372,8 @@ configMapName: ceph-csi-config externallyManagedConfigmap: false # Name of the configmap used for ceph.conf cephConfConfigMapName: ceph-config +# CephFS RadosNamespace used to store CSI specific objects and keys. +# radosNamespaceCephFS: csi # Unique ID distinguishing this instance of Ceph CSI among other instances, # when sharing Ceph clusters across CSI instances for provisioning # instanceID: default diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index 88276a99f..9ef607be4 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -100,6 +100,11 @@ func init() { "kernelmountoptions", "", "Comma separated string of mount options accepted by cephfs kernel mounter") + flag.StringVar( + &conf.RadosNamespaceCephFS, + "radosnamespacecephfs", + "", + "CephFS RadosNamespace used to store CSI specific objects and keys.") flag.StringVar( &conf.FuseMountOptions, "fusemountoptions", diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 89f1495ca..da798cbae 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -49,6 +49,7 @@ make image-cephcsi | `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") | | `--enable-read-affinity` | `false` | enable read affinity | | `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.
`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` | +| `--radosnamespacecephfs`| _empty_ | CephFS RadosNamespace used to store CSI specific objects and keys. | **NOTE:** The parameter `-forcecephkernelclient` enables the Kernel CephFS mounter on kernels < 4.17. diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index dd78cf5c8..162b027ee 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -110,6 +110,11 @@ func (fs *Driver) Run(conf *util.Config) { CSIInstanceID = conf.InstanceID } + // Use passed in radosNamespace, if provided for storing CSI specific objects and keys. + if conf.RadosNamespaceCephFS != "" { + fsutil.RadosNamespace = conf.RadosNamespaceCephFS + } + if conf.IsNodeServer && k8s.RunsOnKubernetes() { nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) if err != nil { diff --git a/internal/cephfs/util/util.go b/internal/cephfs/util/util.go index 02344d68c..5be4d8fa9 100644 --- a/internal/cephfs/util/util.go +++ b/internal/cephfs/util/util.go @@ -19,7 +19,5 @@ package util // VolumeID string representation. type VolumeID string -const ( - // RadosNamespace to store CSI specific objects and keys. - RadosNamespace = "csi" -) +// RadosNamespace to store CSI specific objects and keys. +var RadosNamespace = "csi" diff --git a/internal/csi-addons/rbd/identity.go b/internal/csi-addons/rbd/identity.go index 6a8cbfe67..68d7882e6 100644 --- a/internal/csi-addons/rbd/identity.go +++ b/internal/csi-addons/rbd/identity.go @@ -120,6 +120,12 @@ func (is *IdentityServer) GetCapabilities( Type: identity.Capability_VolumeGroup_MODIFY_VOLUME_GROUP, }, }, + }, &identity.Capability{ + Type: &identity.Capability_VolumeGroup_{ + VolumeGroup: &identity.Capability_VolumeGroup{ + Type: identity.Capability_VolumeGroup_GET_VOLUME_GROUP, + }, + }, }) } diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 544907b4f..df5c3732b 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -27,7 +27,9 @@ import ( "time" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -93,12 +95,17 @@ type ReplicationServer struct { *replication.UnimplementedControllerServer // Embed ControllerServer as it implements helper functions *corerbd.ControllerServer + // csiID is the unique ID for this CSI-driver deployment. + csiID string } // NewReplicationServer creates a new ReplicationServer which handles // the Replication Service requests from the CSI-Addons specification. -func NewReplicationServer(c *corerbd.ControllerServer) *ReplicationServer { - return &ReplicationServer{ControllerServer: c} +func NewReplicationServer(instanceID string, c *corerbd.ControllerServer) *ReplicationServer { + return &ReplicationServer{ + ControllerServer: c, + csiID: instanceID, + } } func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) { @@ -124,18 +131,18 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er // getFlattenMode gets flatten mode from the input GRPC request parameters. // flattenMode is the key to check the mode in the parameters. -func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) { +func getFlattenMode(ctx context.Context, parameters map[string]string) (types.FlattenMode, error) { val, ok := parameters[flattenModeKey] if !ok { log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)", - flattenModeKey, corerbd.FlattenModeNever) + flattenModeKey, types.FlattenModeNever) - return corerbd.FlattenModeNever, nil + return types.FlattenModeNever, nil } - mode := corerbd.FlattenMode(val) + mode := types.FlattenMode(val) switch mode { - case corerbd.FlattenModeForce, corerbd.FlattenModeNever: + case types.FlattenModeForce, types.FlattenModeNever: return mode, nil } log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val) @@ -270,24 +277,27 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) if err != nil { @@ -299,21 +309,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { err = rbdVol.HandleParentImageExistence(ctx, flattenMode) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, getGRPCError(err) } - err = rbdVol.EnableImageMirroring(mirroringMode) + err = mirror.EnableMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -347,52 +356,54 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // extract the force option force, err := getForceOption(ctx, req.GetParameters()) if err != nil { return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - switch mirroringInfo.State { + switch info.GetState() { // image is already in disabled state - case librbd.MirrorImageDisabled: + case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling - case librbd.MirrorImageDisabling: + case librbd.MirrorImageDisabling.String(): return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) - case librbd.MirrorImageEnabled: - err = rbdVol.DisableVolumeReplication(mirroringInfo, force) + case librbd.MirrorImageEnabled.String(): + err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) if err != nil { return nil, getGRPCError(err) } return &replication.DisableVolumeReplicationResponse{}, nil default: - return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) + return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", info.GetState()) } return &replication.DisableVolumeReplicationResponse{}, nil @@ -422,48 +433,50 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // promote secondary to primary - if !mirroringInfo.Primary { + if !info.IsPrimary() { if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = rbdVol.ForcePromoteImage(cr) + err = mirror.ForcePromote(cr) } else { - err = rbdVol.PromoteImage(req.GetForce()) + err = mirror.Promote(req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -483,7 +496,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, interval, startTime := getSchedulingDetails(req.GetParameters()) if interval != admin.NoInterval { - err = rbdVol.AddSnapshotScheduling(interval, startTime) + err = mirror.AddSnapshotScheduling(interval, startTime) if err != nil { return nil, err } @@ -522,49 +535,51 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - creationTime, err := rbdVol.GetImageCreationTime() + creationTime, err := rbdVol.GetCreationTime() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // demote image to secondary - if mirroringInfo.Primary { + if info.IsPrimary() { // store the image creation time for resync _, err = rbdVol.GetMetadata(imageCreationTimeKey) if err != nil && errors.Is(err, librbd.ErrNotFound) { @@ -577,7 +592,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = rbdVol.DemoteImage() + err = mirror.Demote() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -590,22 +605,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, // checkRemoteSiteStatus checks the state of the remote cluster. // It returns true if the state of the remote cluster is up and unknown. -func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool { +func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) bool { ready := true found := false - for _, s := range mirrorStatus.SiteStatuses { + for _, s := range mirrorStatus { log.UsefulLog( ctx, "peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d", - s.MirrorUUID, - s.Up, - s.State, - s.Description, - s.LastUpdate) - if s.MirrorUUID != "" { + s.GetMirrorUUID(), + s.IsUP(), + s.GetState(), + s.GetDescription(), + s.GetLastUpdate()) + if s.GetMirrorUUID() != "" { found = true // If ready is already "false" do not flip it based on another remote peer status - if ready && (s.State != librbd.MirrorImageStatusStateUnknown || !s.Up) { + if ready && (s.GetState() != librbd.MirrorImageStatusStateUnknown.String() || !s.IsUP()) { ready = false } } @@ -639,26 +654,28 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -667,22 +684,22 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is still primary - if mirroringInfo.Primary { + if info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + sts, err := mirror.GetGlobalMirroringStatus() if err != nil { // the image gets recreated after issuing resync if errors.Is(err, corerbd.ErrImageNotFound) { // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not - // be executed as the error would get returned due to getImageMirroringInfo + // be executed as the error would get returned due to getMirroringInfo // failing to find an image above. return nil, status.Error(codes.Aborted, err.Error()) } @@ -692,22 +709,20 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } ready := false - localStatus, err := mirrorStatus.LocalStatus() + localStatus, err := sts.GetLocalSiteStatus() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, fmt.Errorf("failed to get local status: %w", err) } - // convert the last update time to UTC - lastUpdateTime := time.Unix(localStatus.LastUpdate, 0).UTC() log.UsefulLog( ctx, "local status: daemon up=%t, image mirroring state=%q, description=%q and lastUpdate=%s", - localStatus.Up, - localStatus.State, - localStatus.Description, - lastUpdateTime) + localStatus.IsUP(), + localStatus.GetState(), + localStatus.GetDescription(), + localStatus.GetLastUpdate()) // To recover from split brain (up+error) state the image need to be // demoted and requested for resync on site-a and then the image on site-b @@ -719,11 +734,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, // If the image state on both the sites are up+unknown consider that // complete data is synced as the last snapshot // gets exchanged between the clusters. - if localStatus.State == librbd.MirrorImageStatusStateUnknown && localStatus.Up { - ready = checkRemoteSiteStatus(ctx, mirrorStatus) + if localStatus.GetState() == librbd.MirrorImageStatusStateUnknown.String() && localStatus.IsUP() { + ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetImageCreationTime() + creationTime, err := rbdVol.GetCreationTime() if err != nil { return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) } @@ -749,7 +764,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = rbdVol.ResyncVol(localStatus) + err = mirror.Resync() if err != nil { return nil, getGRPCError(err) } @@ -853,42 +868,44 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is not in primary state - if !mirroringInfo.Primary { + if !info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + mirrorStatus, err := mirror.GetGlobalMirroringStatus() if err != nil { if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) @@ -898,14 +915,14 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - remoteStatus, err := RemoteStatus(ctx, mirrorStatus) + remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err) } - description := remoteStatus.Description + description := remoteStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) { @@ -919,36 +936,6 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return resp, nil } -// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses -// slice that corresponds to the remote site's status. If the remote status -// is not found than the error ErrNotExist will be returned. -func RemoteStatus(ctx context.Context, gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageStatus, error) { - var ( - ss librbd.SiteMirrorImageStatus - err error = librbd.ErrNotExist - ) - - for i := range gmis.SiteStatuses { - log.DebugLog( - ctx, - "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", - gmis.SiteStatuses[i].MirrorUUID, - gmis.SiteStatuses[i].State, - gmis.SiteStatuses[i].Description, - gmis.SiteStatuses[i].LastUpdate, - gmis.SiteStatuses[i].Up) - - if gmis.SiteStatuses[i].MirrorUUID != "" { - ss = gmis.SiteStatuses[i] - err = nil - - break - } - } - - return ss, err -} - // This function gets the local snapshot time, last sync snapshot seconds // and last sync bytes from the description of localStatus and convert // it into required types. @@ -1015,12 +1002,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV return &response, nil } -func checkVolumeResyncStatus(ctx context.Context, localStatus librbd.SiteMirrorImageStatus) error { +func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error { // we are considering local snapshot timestamp to check if the resync is // started or not, if we dont see local_snapshot_timestamp in the // description of localStatus, we are returning error. if we see the local // snapshot timestamp in the description we return resyncing started. - description := localStatus.Description + description := localStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { return fmt.Errorf("failed to get last sync info: %w", err) diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index c678c6377..6da1b929e 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -26,6 +26,7 @@ import ( "time" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" librbd "github.com/ceph/go-ceph/rbd" "github.com/ceph/go-ceph/rbd/admin" @@ -219,30 +220,36 @@ func TestCheckVolumeResyncStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args librbd.SiteMirrorImageStatus + args corerbd.SiteMirrorImageStatus wantErr bool }{ { name: "test when local_snapshot_timestamp is non zero", - args: librbd.SiteMirrorImageStatus{ - //nolint:lll // sample output cannot be split into multiple lines. - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + //nolint:lll // sample output cannot be split into multiple lines. + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: false, }, { name: "test when local_snapshot_timestamp is zero", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, { name: "test when local_snapshot_timestamp is not present", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, @@ -261,17 +268,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args *librbd.GlobalMirrorImageStatus + args corerbd.GlobalMirrorStatus wantReady bool }{ { name: "Test a single peer in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -279,17 +288,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a single peer in sync, including a local instance", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -297,17 +308,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a multiple peers in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -315,19 +328,23 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test no remote peers", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{}, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{}, + }, }, wantReady: false, }, { name: "Test single peer not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateReplaying, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateReplaying, + Up: true, + }, }, }, }, @@ -335,12 +352,14 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test single peer not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -348,17 +367,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when first peer is not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateStoppingReplay, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateStoppingReplay, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -366,17 +387,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when second peer is not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -386,7 +409,7 @@ func TestCheckRemoteSiteStatus(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - if ready := checkRemoteSiteStatus(context.TODO(), tt.args); ready != tt.wantReady { + if ready := checkRemoteSiteStatus(context.TODO(), tt.args.GetAllSitesStatus()); ready != tt.wantReady { t.Errorf("checkRemoteSiteStatus() ready = %v, expect ready = %v", ready, tt.wantReady) } }) @@ -651,7 +674,7 @@ func Test_getFlattenMode(t *testing.T) { tests := []struct { name string args args - want corerbd.FlattenMode + want types.FlattenMode wantErr bool }{ { @@ -660,27 +683,27 @@ func Test_getFlattenMode(t *testing.T) { ctx: context.TODO(), parameters: map[string]string{}, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to never", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeNever), + flattenModeKey: string(types.FlattenModeNever), }, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to force", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeForce), + flattenModeKey: string(types.FlattenModeForce), }, }, - want: corerbd.FlattenModeForce, + want: types.FlattenModeForce, }, { diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index e96831b5d..95d438f7b 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -363,3 +363,41 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( VolumeGroup: csiVG, }, nil } + +// ControllerGetVolumeGroup RPC call to get a volume group. +// +// From the spec: +// ControllerGetVolumeGroupResponse should contain current information of a +// volume group if it exists. If the volume group does not exist any more, +// ControllerGetVolumeGroup should return gRPC error code NOT_FOUND. +func (vs *VolumeGroupServer) ControllerGetVolumeGroup( + ctx context.Context, + req *volumegroup.ControllerGetVolumeGroupRequest, +) (*volumegroup.ControllerGetVolumeGroupResponse, error) { + mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets()) + defer mgr.Destroy(ctx) + + // resolve the volume group + vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId()) + if err != nil { + return nil, status.Errorf( + codes.NotFound, + "could not find volume group %q: %s", + req.GetVolumeGroupId(), + err.Error()) + } + defer vg.Destroy(ctx) + + csiVG, err := vg.ToCSI(ctx) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to convert volume group %q to CSI format: %v", + vg, + err) + } + + return &volumegroup.ControllerGetVolumeGroupResponse{ + VolumeGroup: csiVG, + }, nil +} diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 2ad9fe854..cdf1443a5 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := rbdVol.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -998,7 +998,7 @@ func cleanupRBDImage(ctx context.Context, // Mirroring is enabled on the image // Local image is secondary // Local image is in up+replaying state - if mirroringInfo.State == librbd.MirrorImageEnabled && !mirroringInfo.Primary { + if info.GetState() == librbd.MirrorImageEnabled.String() && !info.IsPrimary() { // If the image is in a secondary state and its up+replaying means its // an healthy secondary and the image is primary somewhere in the // remote cluster and the local image is getting replayed. Delete the @@ -1007,11 +1007,18 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - localStatus, rErr := rbdVol.GetLocalState() + sts, rErr := rbdVol.GetGlobalMirroringStatus() if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + + localStatus, rErr := sts.GetLocalSiteStatus() + if rErr != nil { + log.ErrorLog(ctx, "failed to get local status for volume %s: %w", rbdVol.RbdImageName, rErr) + + return nil, status.Error(codes.Internal, rErr.Error()) + } + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { if err = undoVolReservation(ctx, rbdVol, cr); err != nil { log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)", rbdVol.RequestName, rbdVol.RbdImageName, err) @@ -1023,8 +1030,8 @@ func cleanupRBDImage(ctx context.Context, } log.ErrorLog(ctx, "secondary image status is up=%t and state=%s", - localStatus.Up, - localStatus.State) + localStatus.IsUP(), + localStatus.GetState()) } inUse, err := rbdVol.isInUse() diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 979da3b1c..fd0dda43a 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -219,7 +219,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { fcs := casrbd.NewFenceControllerServer() r.cas.RegisterService(fcs) - rcs := casrbd.NewReplicationServer(NewControllerServer(r.cd)) + rcs := casrbd.NewReplicationServer(rbd.CSIInstanceID, NewControllerServer(r.cd)) r.cas.RegisterService(rcs) vgcs := casrbd.NewVolumeGroupServer(conf.InstanceID) diff --git a/internal/rbd/group.go b/internal/rbd/group.go index 46c7739d7..d0a390322 100644 --- a/internal/rbd/group.go +++ b/internal/rbd/group.go @@ -77,3 +77,7 @@ func (rv *rbdVolume) RemoveFromGroup(ctx context.Context, vg types.VolumeGroup) return librbd.GroupImageRemove(ioctx, name, rv.ioctx, rv.RbdImageName) } + +func (rv *rbdVolume) ToMirror() (types.Mirror, error) { + return rv, nil +} diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 106a9eb77..905db1c44 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -20,21 +20,13 @@ import ( "fmt" "time" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" librbd "github.com/ceph/go-ceph/rbd" ) -// FlattenMode is used to indicate the flatten mode for an RBD image. -type FlattenMode string - -const ( - // FlattenModeNever indicates that the image should never be flattened. - FlattenModeNever FlattenMode = "never" - // FlattenModeForce indicates that the image with the parent must be flattened. - FlattenModeForce FlattenMode = "force" -) - // HandleParentImageExistence checks the image's parent. // if the parent image does not exist and is not in trash, it returns nil. // if the flattenMode is FlattenModeForce, it flattens the image itself. @@ -42,13 +34,12 @@ const ( // if the parent image exists and is not enabled for mirroring, it returns an error. func (rv *rbdVolume) HandleParentImageExistence( ctx context.Context, - flattenMode FlattenMode, + mode types.FlattenMode, ) error { if rv.ParentName == "" && !rv.ParentInTrash { return nil } - - if flattenMode == FlattenModeForce { + if mode == types.FlattenModeForce { // Delete temp image that exists for volume datasource since // it is no longer required when the live image is flattened. err := rv.DeleteTempImage(ctx) @@ -72,14 +63,13 @@ func (rv *rbdVolume) HandleParentImageExistence( if err != nil { return err } - parentMirroringInfo, err := parent.GetImageMirroringInfo() + parentMirroringInfo, err := parent.GetMirroringInfo() if err != nil { return fmt.Errorf( "failed to get mirroring info of parent %q of image %q: %w", parent, rv, err) } - - if parentMirroringInfo.State != librbd.MirrorImageEnabled { + if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() { return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ "parent image %q is not enabled for mirroring", ErrFailedPrecondition, rv, parent) @@ -88,8 +78,11 @@ func (rv *rbdVolume) HandleParentImageExistence( return nil } -// EnableImageMirroring enables mirroring on an image. -func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { +// check that rbdVolume implements the types.Mirror interface. +var _ types.Mirror = &rbdVolume{} + +// EnableMirroring enables mirroring on an image. +func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -104,8 +97,8 @@ func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { return nil } -// DisableImageMirroring disables mirroring on an image. -func (ri *rbdImage) DisableImageMirroring(force bool) error { +// DisableMirroring disables mirroring on an image. +func (ri *rbdImage) DisableMirroring(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -120,8 +113,8 @@ func (ri *rbdImage) DisableImageMirroring(force bool) error { return nil } -// GetImageMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { +// GetMirroringInfo gets mirroring information of an image. +func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -133,11 +126,11 @@ func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { return nil, fmt.Errorf("failed to get mirroring info of %q with error: %w", ri, err) } - return info, nil + return ImageStatus{MirrorImageInfo: info}, nil } -// PromoteImage promotes image to primary. -func (ri *rbdImage) PromoteImage(force bool) error { +// Promote promotes image to primary. +func (ri *rbdImage) Promote(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -151,10 +144,10 @@ func (ri *rbdImage) PromoteImage(force bool) error { return nil } -// ForcePromoteImage promotes image to primary with force option with 2 minutes +// ForcePromote promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -181,8 +174,8 @@ func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { return nil } -// DemoteImage demotes image to secondary. -func (ri *rbdImage) DemoteImage() error { +// Demote demotes image to secondary. +func (ri *rbdImage) Demote() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -196,8 +189,8 @@ func (ri *rbdImage) DemoteImage() error { return nil } -// resyncImage resync image to correct the split-brain. -func (ri *rbdImage) resyncImage() error { +// Resync resync image to correct the split-brain. +func (ri *rbdImage) Resync() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -208,11 +201,14 @@ func (ri *rbdImage) resyncImage() error { return fmt.Errorf("failed to resync image %q with error: %w", ri, err) } - return nil + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) } -// GetImageMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { +// GetGlobalMirroringStatus get the mirroring status of an image. +func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -223,26 +219,110 @@ func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, return nil, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) } - return &statusInfo, nil + return GlobalMirrorStatus{GlobalMirrorImageStatus: statusInfo}, nil } -// GetLocalState returns the local state of the image. -func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) { - localStatus := librbd.SiteMirrorImageStatus{} - image, err := ri.open() - if err != nil { - return localStatus, fmt.Errorf("failed to open image %q with error: %w", ri, err) - } - defer image.Close() - - statusInfo, err := image.GetGlobalMirrorStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) - } - localStatus, err = statusInfo.LocalStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get local status: %w", err) - } - - return localStatus, nil +// ImageStatus is a wrapper around librbd.MirrorImageInfo that contains the +// image mirror status. +type ImageStatus struct { + *librbd.MirrorImageInfo +} + +func (status ImageStatus) GetState() string { + return status.State.String() +} + +func (status ImageStatus) IsPrimary() bool { + return status.Primary +} + +// GlobalMirrorStatus is a wrapper around librbd.GlobalMirrorImageStatus that contains the +// global mirror image status. +type GlobalMirrorStatus struct { + librbd.GlobalMirrorImageStatus +} + +func (status GlobalMirrorStatus) GetState() string { + return status.GlobalMirrorImageStatus.Info.State.String() +} + +func (status GlobalMirrorStatus) IsPrimary() bool { + return status.GlobalMirrorImageStatus.Info.Primary +} + +func (status GlobalMirrorStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorImageStatus.LocalStatus() + if err != nil { + err = fmt.Errorf("failed to get local site status: %w", err) + } + + return SiteMirrorImageStatus{ + SiteMirrorImageStatus: s, + }, err +} + +func (status GlobalMirrorStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for _, ss := range status.SiteStatuses { + siteStatuses = append(siteStatuses, SiteMirrorImageStatus{SiteMirrorImageStatus: ss}) + } + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status GlobalMirrorStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorImageStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } + } + + return SiteMirrorImageStatus{SiteMirrorImageStatus: ss}, err +} + +// SiteMirrorImageStatus is a wrapper around librbd.SiteMirrorImageStatus that contains the +// site mirror image status. +type SiteMirrorImageStatus struct { + librbd.SiteMirrorImageStatus +} + +func (status SiteMirrorImageStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status SiteMirrorImageStatus) GetState() string { + return status.State.String() +} + +func (status SiteMirrorImageStatus) GetDescription() string { + return status.Description +} + +func (status SiteMirrorImageStatus) IsUP() bool { + return status.Up +} + +func (status SiteMirrorImageStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() } diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 884d66a2b..28437a921 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -413,6 +413,10 @@ func (ri *rbdImage) String() string { return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName) } +func (ri *rbdImage) GetPoolName() string { + return ri.Pool +} + // String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot. func (rs *rbdSnapshot) String() string { if rs.RadosNamespace != "" { @@ -1594,9 +1598,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO return nil } -// GetImageCreationTime returns the creation time of the image. if the image +// GetCreationTime returns the creation time of the image. if the image // creation time is not set, it queries the image info and returns the creation time. -func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) { +func (ri *rbdImage) GetCreationTime() (*timestamppb.Timestamp, error) { if ri.CreatedAt != nil { return ri.CreatedAt, nil } diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index c6b4c55dd..86c31cd85 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -20,20 +20,11 @@ import ( "context" "fmt" + "github.com/ceph/ceph-csi/internal/rbd/types" + librbd "github.com/ceph/go-ceph/rbd" ) -func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error { - if err := rv.resyncImage(); err != nil { - return fmt.Errorf("failed to resync image: %w", err) - } - - // If we issued a resync, return a non-final error as image needs to be recreated - // locally. Caller retries till RBD syncs an initial version of the image to - // report its status in the resync request. - return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) -} - // repairResyncedImageID updates the existing image ID with new one. func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error { // During resync operation the local image will get deleted and a new @@ -54,11 +45,11 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro return rv.repairImageID(ctx, j, true) } -func (rv *rbdVolume) DisableVolumeReplication( - mirroringInfo *librbd.MirrorImageInfo, +func DisableVolumeReplication(mirror types.Mirror, + primary, force bool, ) error { - if !mirroringInfo.Primary { + if !primary { // Return success if the below condition is met // Local image is secondary // Local image is in up+replaying state @@ -71,29 +62,35 @@ func (rv *rbdVolume) DisableVolumeReplication( // disabled the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the volume // replication Kubernetes artifacts after failback operation. - localStatus, rErr := rv.GetLocalState() + sts, rErr := mirror.GetGlobalMirroringStatus() if rErr != nil { - return fmt.Errorf("failed to get local state: %w", rErr) + return fmt.Errorf("failed to get global state: %w", rErr) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + + localStatus, err := sts.GetLocalSiteStatus() + if err != nil { + return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument) + } + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { return nil } return fmt.Errorf("%w: secondary image status is up=%t and state=%s", - ErrInvalidArgument, localStatus.Up, localStatus.State) + ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } - err := rv.DisableImageMirroring(force) + err := mirror.DisableMirroring(force) if err != nil { return fmt.Errorf("failed to disable image mirroring: %w", err) } // the image state can be still disabling once we disable the mirroring // check the mirroring is disabled or not - mirroringInfo, err = rv.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { return fmt.Errorf("failed to get mirroring info of image: %w", err) } - if mirroringInfo.State == librbd.MirrorImageDisabling { - return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID) + + if info.GetState() == librbd.MirrorImageDisabling.String() { + return fmt.Errorf("%w: image is in disabling state", ErrAborted) } return nil diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go new file mode 100644 index 000000000..131fad844 --- /dev/null +++ b/internal/rbd/types/mirror.go @@ -0,0 +1,95 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "context" + "time" + + "github.com/ceph/ceph-csi/internal/util" + + librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" +) + +// FlattenMode is used to indicate the flatten mode for an RBD image. +type FlattenMode string + +const ( + // FlattenModeNever indicates that the image should never be flattened. + FlattenModeNever FlattenMode = "never" + // FlattenModeForce indicates that the image with the parent must be flattened. + FlattenModeForce FlattenMode = "force" +) + +// Mirror is the interface for managing mirroring on an RBD image or a group. +type Mirror interface { + // EnableMirroring enables mirroring on the resource with the specified mode. + EnableMirroring(mode librbd.ImageMirrorMode) error + // DisableMirroring disables mirroring on the resource with the option to force the operation + DisableMirroring(force bool) error + // Promote promotes the resource to primary status with the option to force the operation + Promote(force bool) error + // ForcePromote promotes the resource to primary status with a timeout + ForcePromote(cr *util.Credentials) error + // Demote demotes the resource to secondary status + Demote() error + // Resync resynchronizes the resource + Resync() error + // GetMirroringInfo returns the mirroring information of the resource + GetMirroringInfo() (MirrorInfo, error) + // GetMirroringInfo returns the mirroring information of the resource + GetGlobalMirroringStatus() (GlobalStatus, error) + // AddSnapshotScheduling adds a snapshot scheduling to the resource + AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error +} + +// MirrorImage is the interface for managing mirroring on an RBD image or group of images. +// This will be used to get the state of resource and it is primary or secondary. +type MirrorInfo interface { + // IsPrimary returns true if the resource is primary + IsPrimary() bool + // GetState returns the state of the resource + GetState() string +} + +// GlobalStatus is the interface for fetching the global status of the mirroring. +// This will be used to get the status of the local site and remote site or all sites. +type GlobalStatus interface { + MirrorInfo + // GetLocalSiteStatus returns the local site status + GetLocalSiteStatus() (SiteStatus, error) + // GetAllSitesStatus returns the status of all sites + GetAllSitesStatus() []SiteStatus + // GetRemoteSiteStatus returns the status of the remote site + GetRemoteSiteStatus(ctx context.Context) (SiteStatus, error) +} + +// SiteStatus is the interface for fetching the status of a site. +// This will be used to get the status of the local site and remote site. +type SiteStatus interface { + // GetMirrorUUID returns the mirror UUID + GetMirrorUUID() string + // IsUP returns true if the site is up + IsUP() bool + // GetState returns the state of the site + GetState() string + // GetDescription returns the description of the site + GetDescription() string + // GetLastUpdate returns the last update time + GetLastUpdate() time.Time +} diff --git a/internal/rbd/types/volume.go b/internal/rbd/types/volume.go index 2ea6e54ca..7f534091f 100644 --- a/internal/rbd/types/volume.go +++ b/internal/rbd/types/volume.go @@ -20,8 +20,10 @@ import ( "context" "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/protobuf/types/known/timestamppb" ) +//nolint:interfacebloat // more than 10 methods are needed for the interface type Volume interface { // Destroy frees the resources used by the Volume. Destroy(ctx context.Context) @@ -40,4 +42,24 @@ type Volume interface { // RemoveFromGroup removes the Volume from the VolumeGroup. RemoveFromGroup(ctx context.Context, vg VolumeGroup) error + + // GetPoolName returns the name of the pool where the volume is stored. + GetPoolName() string + // GetCreationTime returns the creation time of the volume. + GetCreationTime() (*timestamppb.Timestamp, error) + // GetMetadata returns the value of the metadata key from the volume. + GetMetadata(key string) (string, error) + // SetMetadata sets the value of the metadata key on the volume. + SetMetadata(key, value string) error + // RepairResyncedImageID updates the existing image ID with new one in OMAP. + RepairResyncedImageID(ctx context.Context, ready bool) error + // HandleParentImageExistence checks the image's parent. + // if the parent image does not exist and is not in trash, it returns nil. + // if the flattenMode is FlattenModeForce, it flattens the image itself. + // if the parent image is in trash, it returns an error. + // if the parent image exists and is not enabled for mirroring, it returns an error. + HandleParentImageExistence(ctx context.Context, flattenMode FlattenMode) error + + // ToMirror converts the Volume to a Mirror. + ToMirror() (Mirror, error) } diff --git a/internal/util/util.go b/internal/util/util.go index 777458e5f..869df991b 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -142,9 +142,9 @@ type Config struct { SkipForceFlatten bool // cephfs related flags - ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17 - - SetMetadata bool // set metadata on the volume + ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17 + RadosNamespaceCephFS string // RadosNamespace used to store CSI specific objects and keys + SetMetadata bool // set metadata on the volume // Read affinity related options EnableReadAffinity bool // enable OSD read affinity.