mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-18 11:00:25 +00:00
rbd: use go-ceph for getImageMirroringStatus
Currently, getImageMirroringStatus() is using RBD CLI. This commit converts RBD CLI to go-ceph API. Fixes: #2120 Signed-off-by: Yati Padia <ypadia@redhat.com>
This commit is contained in:
parent
4e6d9be826
commit
6691951453
@ -16,12 +16,7 @@ limitations under the License.
|
|||||||
package rbd
|
package rbd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/ceph/ceph-csi/internal/util"
|
|
||||||
|
|
||||||
librbd "github.com/ceph/go-ceph/rbd"
|
librbd "github.com/ceph/go-ceph/rbd"
|
||||||
)
|
)
|
||||||
@ -119,51 +114,17 @@ func (ri *rbdImage) resyncImage() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type imageMirrorStatus struct {
|
// getImageMirroingStatus get the mirroring status of an image.
|
||||||
Name string `json:"name"` // name of the rbd image
|
func (ri *rbdImage) getImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) {
|
||||||
State string `json:"state"` // rbd image state
|
image, err := ri.open()
|
||||||
Description string `json:"description"`
|
|
||||||
LastUpdate string `json:"last_update"`
|
|
||||||
PeerSites []struct {
|
|
||||||
SiteName string `json:"site_name"`
|
|
||||||
State string `json:"state"`
|
|
||||||
Description string `json:"description"`
|
|
||||||
LastUpdate string `json:"last_update"`
|
|
||||||
} `json:"peer_sites"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: once https://github.com/ceph/go-ceph/issues/460 is fixed use go-ceph.
|
|
||||||
// getImageMirroringStatus get the mirroring status of an image.
|
|
||||||
func (ri *rbdImage) getImageMirroringStatus() (*imageMirrorStatus, error) {
|
|
||||||
// rbd mirror image status --format=json info [image-spec | snap-spec]
|
|
||||||
var imgStatus imageMirrorStatus
|
|
||||||
stdout, stderr, err := util.ExecCommand(
|
|
||||||
context.TODO(),
|
|
||||||
"rbd",
|
|
||||||
"-m", ri.Monitors,
|
|
||||||
"--id", ri.conn.Creds.ID,
|
|
||||||
"--keyfile="+ri.conn.Creds.KeyFile,
|
|
||||||
"-c", util.CephConfigPath,
|
|
||||||
"--format="+"json",
|
|
||||||
"mirror",
|
|
||||||
"image",
|
|
||||||
"status",
|
|
||||||
ri.String())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.Contains(stderr, "rbd: error opening image "+ri.RbdImageName+
|
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
|
||||||
": (2) No such file or directory") {
|
}
|
||||||
return nil, util.JoinErrors(ErrImageNotFound, err)
|
defer image.Close()
|
||||||
}
|
statusInfo, err := image.GetGlobalMirrorStatus()
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stdout != "" {
|
return &statusInfo, nil
|
||||||
err = json.Unmarshal([]byte(stdout), &imgStatus)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unmarshal failed (%w), raw buffer response: %s", err, stdout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &imgStatus, nil
|
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ package rbd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -45,9 +46,9 @@ const (
|
|||||||
type imageMirroringState string
|
type imageMirroringState string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// If the state is up+unknown means the rbd-mirror daemon is
|
// If the state is unknown means the rbd-mirror daemon is
|
||||||
// running and the image is demoted on both the clusters.
|
// running and the image is demoted on both the clusters.
|
||||||
upAndUnknown imageMirroringState = "up+unknown"
|
unknown imageMirroringState = "unknown"
|
||||||
|
|
||||||
// If the state is error means image need resync.
|
// If the state is error means image need resync.
|
||||||
errorState imageMirroringState = "error"
|
errorState imageMirroringState = "error"
|
||||||
@ -487,6 +488,29 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
|
|||||||
return &replication.DemoteVolumeResponse{}, nil
|
return &replication.DemoteVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkRemoteSiteStatus checks the state of the remote cluster.
|
||||||
|
// It returns true if the state of the remote cluster is up and unknown.
|
||||||
|
func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool {
|
||||||
|
ready := true
|
||||||
|
for _, s := range mirrorStatus.SiteStatuses {
|
||||||
|
if s.MirrorUUID != "" {
|
||||||
|
if imageMirroringState(s.State.String()) != unknown && !s.Up {
|
||||||
|
util.UsefulLog(
|
||||||
|
ctx,
|
||||||
|
"peer site mirrorUUID=%s, mirroring state=%s, description=%s and lastUpdate=%s",
|
||||||
|
s.MirrorUUID,
|
||||||
|
s.State.String(),
|
||||||
|
s.Description,
|
||||||
|
s.LastUpdate)
|
||||||
|
|
||||||
|
ready = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ready
|
||||||
|
}
|
||||||
|
|
||||||
// ResyncVolume extracts the RBD volume information from the volumeID, If the
|
// ResyncVolume extracts the RBD volume information from the volumeID, If the
|
||||||
// image is present, mirroring is enabled and the image is in demoted state.
|
// image is present, mirroring is enabled and the image is in demoted state.
|
||||||
// If yes it will resync the image to correct the split-brain.
|
// If yes it will resync the image to correct the split-brain.
|
||||||
@ -559,7 +583,14 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
ready := false
|
ready := false
|
||||||
state := imageMirroringState(mirrorStatus.State)
|
|
||||||
|
localStatus, err := mirrorStatus.LocalStatus()
|
||||||
|
if err != nil {
|
||||||
|
util.ErrorLog(ctx, err.Error())
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to get local status: %w", err)
|
||||||
|
}
|
||||||
|
state := imageMirroringState(localStatus.State.String())
|
||||||
// To recover from split brain (up+error) state the image need to be
|
// To recover from split brain (up+error) state the image need to be
|
||||||
// demoted and requested for resync on site-a and then the image on site-b
|
// demoted and requested for resync on site-a and then the image on site-b
|
||||||
// should be demoted. The volume should be marked to ready=true when the
|
// should be demoted. The volume should be marked to ready=true when the
|
||||||
@ -570,24 +601,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
// If the image state on both the sites are up+unknown consider that
|
// If the image state on both the sites are up+unknown consider that
|
||||||
// complete data is synced as the last snapshot
|
// complete data is synced as the last snapshot
|
||||||
// gets exchanged between the clusters.
|
// gets exchanged between the clusters.
|
||||||
if state == upAndUnknown {
|
if state == unknown && localStatus.Up {
|
||||||
ready = true
|
ready = checkRemoteSiteStatus(ctx, mirrorStatus)
|
||||||
for _, s := range mirrorStatus.PeerSites {
|
|
||||||
if imageMirroringState(s.State) != upAndUnknown {
|
|
||||||
util.UsefulLog(
|
|
||||||
ctx,
|
|
||||||
"peer site name=%s, mirroring state=%s, description=%s and lastUpdate=%s",
|
|
||||||
s.SiteName,
|
|
||||||
s.State,
|
|
||||||
s.Description,
|
|
||||||
s.LastUpdate)
|
|
||||||
ready = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// resync only if the image is in error state
|
// resync only if the image is in error state
|
||||||
if strings.Contains(mirrorStatus.State, string(errorState)) {
|
if strings.Contains(localStatus.State.String(), string(errorState)) {
|
||||||
err = rbdVol.resyncImage()
|
err = rbdVol.resyncImage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.ErrorLog(ctx, err.Error())
|
util.ErrorLog(ctx, err.Error())
|
||||||
@ -599,9 +618,9 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
util.UsefulLog(
|
util.UsefulLog(
|
||||||
ctx,
|
ctx,
|
||||||
"image mirroring state=%s, description=%s and lastUpdate=%s",
|
"image mirroring state=%s, description=%s and lastUpdate=%s",
|
||||||
mirrorStatus.State,
|
localStatus.State.String(),
|
||||||
mirrorStatus.Description,
|
localStatus.Description,
|
||||||
mirrorStatus.LastUpdate)
|
localStatus.LastUpdate)
|
||||||
resp := &replication.ResyncVolumeResponse{
|
resp := &replication.ResyncVolumeResponse{
|
||||||
Ready: ready,
|
Ready: ready,
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user