diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index f43741ec0..c4363cfd8 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -30,7 +30,6 @@ import ( "github.com/ceph/ceph-csi/internal/util" - "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" @@ -100,8 +99,8 @@ type rbdVolume struct { Encrypted bool KMS util.EncryptionKMS - // connection - conn *rados.Conn + // conn is a connection to the Ceph cluster obtained from a ConnPool + conn *util.ClusterConnection } // rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics @@ -129,15 +128,31 @@ type rbdSnapshot struct { var ( supportedFeatures = sets.NewString("layering") - - // large interval and timeout, it should be longer than the maximum - // time an operation can take (until refcounting of the connections is - // available) - cpInterval = 15 * time.Minute - cpExpiry = 10 * time.Minute - connPool = util.NewConnPool(cpInterval, cpExpiry) ) +// Connect an rbdVolume to the Ceph cluster +func (rv *rbdVolume) Connect(cr *util.Credentials) error { + if rv.conn != nil { + return nil + } + + conn := &util.ClusterConnection{} + if err := conn.Connect(rv.Monitors, cr); err != nil { + return err + } + + rv.conn = conn + return nil +} + +// Destroy cleans up the rbdVolume and closes the connection to the Ceph +// cluster in case one was setup. +func (rv *rbdVolume) Destroy() { + if rv.conn != nil { + rv.conn.Destroy() + } +} + // createImage creates a new ceph image with provision and volume options. func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error { volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(pOpts.VolSize)) @@ -162,7 +177,12 @@ func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er } } - ioctx, err := pOpts.getIoctx(cr) + err := pOpts.Connect(cr) + if err != nil { + return err + } + + ioctx, err := pOpts.conn.GetIoctx(pOpts.Pool) if err != nil { return errors.Wrapf(err, "failed to get IOContext") } @@ -177,29 +197,28 @@ func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er return nil } -func (rv *rbdVolume) getIoctx(cr *util.Credentials) (*rados.IOContext, error) { - if rv.conn == nil { - conn, err := connPool.Get(rv.Pool, rv.Monitors, cr.ID, cr.KeyFile) +// Open the rbdVolume after it has been connected. +func (rv *rbdVolume) open() (*librbd.Image, error) { + if rv.RbdImageName == "" { + var vi util.CSIIdentifier + err := vi.DecomposeCSIID(rv.VolID) if err != nil { - return nil, errors.Wrapf(err, "failed to get connection") + err = fmt.Errorf("error decoding volume ID (%s) (%s)", rv.VolID, err) + return nil, ErrInvalidVolID{err} } - - rv.conn = conn + rv.RbdImageName = volJournal.GetNameForUUID(rv.NamePrefix, vi.ObjectUUID, false) } - ioctx, err := rv.conn.OpenIOContext(rv.Pool) + ioctx, err := rv.conn.GetIoctx(rv.Pool) if err != nil { - connPool.Put(rv.conn) - return nil, errors.Wrapf(err, "failed to open IOContext for pool %s", rv.Pool) + return nil, err } - return ioctx, nil -} - -func (rv *rbdVolume) Destroy() { - if rv.conn != nil { - connPool.Put(rv.conn) + image, err := librbd.OpenImage(ioctx, rv.RbdImageName, librbd.NoSnapshot) + if err != nil { + return nil, err } + return image, nil } // rbdStatus checks if there is watcher on the image. diff --git a/internal/util/conn_pool.go b/internal/util/conn_pool.go index 7e6996648..ceb634a1e 100644 --- a/internal/util/conn_pool.go +++ b/internal/util/conn_pool.go @@ -95,14 +95,14 @@ func (cp *ConnPool) Destroy() { } } -func (cp *ConnPool) generateUniqueKey(pool, monitors, user, keyfile string) (string, error) { +func (cp *ConnPool) generateUniqueKey(monitors, user, keyfile string) (string, error) { // the keyfile can be unique for operations, contents will be the same key, err := ioutil.ReadFile(keyfile) // nolint: gosec, #nosec if err != nil { return "", errors.Wrapf(err, "could not open keyfile %s", keyfile) } - return fmt.Sprintf("%s|%s|%s|%s", pool, monitors, user, string(key)), nil + return fmt.Sprintf("%s|%s|%s", monitors, user, string(key)), nil } // getExisting returns the existing rados.Conn associated with the unique key. @@ -118,10 +118,10 @@ func (cp *ConnPool) getConn(unique string) *rados.Conn { } // Get returns a rados.Conn for the given arguments. Creates a new rados.Conn in -// case there is none in the pool. Use the returned unique string to reduce the -// reference count with ConnPool.Put(unique). -func (cp *ConnPool) Get(pool, monitors, user, keyfile string) (*rados.Conn, error) { - unique, err := cp.generateUniqueKey(pool, monitors, user, keyfile) +// case there is none. Use the returned rados.Conn to reduce the reference +// count with ConnPool.Put(unique). +func (cp *ConnPool) Get(monitors, user, keyfile string) (*rados.Conn, error) { + unique, err := cp.generateUniqueKey(monitors, user, keyfile) if err != nil { return nil, errors.Wrapf(err, "failed to generate unique for connection") } diff --git a/internal/util/conn_pool_test.go b/internal/util/conn_pool_test.go index fa59ef7ed..4544c8689 100644 --- a/internal/util/conn_pool_test.go +++ b/internal/util/conn_pool_test.go @@ -34,8 +34,8 @@ const ( // working Ceph cluster to connect to. // // This is mostly a copy of ConnPool.Get() -func (cp *ConnPool) fakeGet(pool, monitors, user, keyfile string) (*rados.Conn, string, error) { - unique, err := cp.generateUniqueKey(pool, monitors, user, keyfile) +func (cp *ConnPool) fakeGet(monitors, user, keyfile string) (*rados.Conn, string, error) { + unique, err := cp.generateUniqueKey(monitors, user, keyfile) if err != nil { return nil, "", err } @@ -91,7 +91,7 @@ func TestConnPool(t *testing.T) { var unique string t.Run("fakeGet", func(t *testing.T) { - conn, unique, err = cp.fakeGet("pool", "monitors", "user", keyfile) + conn, unique, err = cp.fakeGet("monitors", "user", keyfile) if err != nil { t.Errorf("failed to get connection: %v", err) } @@ -115,7 +115,7 @@ func TestConnPool(t *testing.T) { t.Run("doubleFakeGet", func(t *testing.T) { // after a 2nd get, there should still be a single conn in cp.conns - _, _, err = cp.fakeGet("pool", "monitors", "user", keyfile) + _, _, err = cp.fakeGet("monitors", "user", keyfile) if err != nil { t.Errorf("failed to get connection: %v", err) } diff --git a/internal/util/connection.go b/internal/util/connection.go new file mode 100644 index 000000000..4d3cfd68e --- /dev/null +++ b/internal/util/connection.go @@ -0,0 +1,78 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" + + "github.com/ceph/go-ceph/rados" + "github.com/pkg/errors" +) + +type ClusterConnection struct { + // connection + conn *rados.Conn + + // FIXME: temporary reference for credentials. Remove this when go-ceph + // is used for operations. + Creds *Credentials +} + +var ( + // large interval and timeout, it should be longer than the maximum + // time an operation can take (until refcounting of the connections is + // available) + cpInterval = 15 * time.Minute + cpExpiry = 10 * time.Minute + connPool = NewConnPool(cpInterval, cpExpiry) +) + +// rbdVol.Connect() connects to the Ceph cluster and sets rbdVol.conn for further usage. +func (cc *ClusterConnection) Connect(monitors string, cr *Credentials) error { + if cc.conn == nil { + conn, err := connPool.Get(monitors, cr.ID, cr.KeyFile) + if err != nil { + return errors.Wrapf(err, "failed to get connection") + } + + cc.conn = conn + + // FIXME: remove .Creds from ClusterConnection + cc.Creds = cr + } + + return nil +} + +func (cc *ClusterConnection) Destroy() { + if cc.conn != nil { + connPool.Put(cc.conn) + } +} + +func (cc *ClusterConnection) GetIoctx(pool string) (*rados.IOContext, error) { + if cc.conn == nil { + return nil, errors.New("cluster is not connected yet") + } + + ioctx, err := cc.conn.OpenIOContext(pool) + if err != nil { + return nil, errors.Wrapf(err, "failed to open IOContext for pool %s", pool) + } + + return ioctx, nil +}