mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-18 11:00:25 +00:00
util: add ConnPool for connection re-use
By using the ConnPool it is not needed to re-connect every time to the Ceph cluster when (rbd) operations are executed through the go-ceph/rbd API. Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
parent
ba99275f90
commit
397825c665
205
pkg/util/conn_pool.go
Normal file
205
pkg/util/conn_pool.go
Normal file
@ -0,0 +1,205 @@
|
||||
/*
|
||||
Copyright 2020 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/go-ceph/rados"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type connEntry struct {
|
||||
conn *rados.Conn
|
||||
lastUsed time.Time
|
||||
users int
|
||||
}
|
||||
|
||||
type ConnPool struct {
|
||||
// interval to run the garbage collector
|
||||
interval time.Duration
|
||||
// timeout for a connEntry to get garbage collected
|
||||
expiry time.Duration
|
||||
// Timer used to schedule calls to the garbage collector
|
||||
timer *time.Timer
|
||||
// Mutex for loading and touching connEntry's from the conns Map
|
||||
lock *sync.RWMutex
|
||||
// all connEntry's in this pool
|
||||
conns map[string]*connEntry
|
||||
}
|
||||
|
||||
// Create a new ConnPool instance and start the garbage collector running
|
||||
// every @interval.
|
||||
func NewConnPool(interval, expiry time.Duration) *ConnPool {
|
||||
cp := ConnPool{
|
||||
interval: interval,
|
||||
expiry: expiry,
|
||||
lock: &sync.RWMutex{},
|
||||
conns: make(map[string]*connEntry),
|
||||
}
|
||||
cp.timer = time.AfterFunc(interval, cp.gc)
|
||||
|
||||
return &cp
|
||||
}
|
||||
|
||||
// loop through all cp.conns and destroy objects that have not been used for cp.expiry.
|
||||
func (cp *ConnPool) gc() {
|
||||
cp.lock.Lock()
|
||||
defer cp.lock.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, ce := range cp.conns {
|
||||
if ce.users == 0 && (now.Sub(ce.lastUsed)) > cp.expiry {
|
||||
ce.destroy()
|
||||
delete(cp.conns, key)
|
||||
}
|
||||
}
|
||||
|
||||
// schedule the next gc() run
|
||||
cp.timer.Reset(cp.interval)
|
||||
}
|
||||
|
||||
// Stop the garbage collector and destroy all connections in the pool.
|
||||
func (cp *ConnPool) Destroy() {
|
||||
cp.timer.Stop()
|
||||
// wait until gc() has finished, in case it is running
|
||||
cp.lock.Lock()
|
||||
defer cp.lock.Unlock()
|
||||
|
||||
for key, ce := range cp.conns {
|
||||
if ce.users != 0 {
|
||||
panic("this connEntry still has users, operations" +
|
||||
"might still be in-flight")
|
||||
}
|
||||
|
||||
ce.destroy()
|
||||
delete(cp.conns, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *ConnPool) generateUniqueKey(pool, monitors, keyfile string) (string, error) {
|
||||
// the keyfile can be unique for operations, contents will be the same
|
||||
key, err := ioutil.ReadFile(keyfile) // nolint: gosec, #nosec
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "could not open keyfile %s", keyfile)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s|%s|%s", pool, monitors, string(key)), nil
|
||||
}
|
||||
|
||||
// getExisting returns the existing rados.Conn associated with the unique key.
|
||||
//
|
||||
// Requires: locked cp.lock because of ce.get()
|
||||
func (cp *ConnPool) getConn(unique string) *rados.Conn {
|
||||
ce, exists := cp.conns[unique]
|
||||
if exists {
|
||||
ce.get()
|
||||
return ce.conn
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return a rados.Conn for the given arguments. Creates a new rados.Conn in
|
||||
// case there is none in the pool. Use the returned unique string to reduce the
|
||||
// reference count with ConnPool.Put(unique).
|
||||
func (cp *ConnPool) Get(pool, monitors, keyfile string) (*rados.Conn, error) {
|
||||
unique, err := cp.generateUniqueKey(pool, monitors, keyfile)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to generate unique for connection")
|
||||
}
|
||||
|
||||
cp.lock.RLock()
|
||||
conn := cp.getConn(unique)
|
||||
cp.lock.RUnlock()
|
||||
if conn != nil {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// construct and connect a new rados.Conn
|
||||
args := []string{"-m", monitors, "--keyfile=" + keyfile}
|
||||
conn, err = rados.NewConn()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "creating a new connection failed")
|
||||
}
|
||||
err = conn.ParseCmdLineArgs(args)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "parsing cmdline args (%v) failed", args)
|
||||
}
|
||||
|
||||
err = conn.Connect()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "connecting failed")
|
||||
}
|
||||
|
||||
ce := &connEntry{
|
||||
conn: conn,
|
||||
lastUsed: time.Now(),
|
||||
users: 1,
|
||||
}
|
||||
|
||||
cp.lock.Lock()
|
||||
defer cp.lock.Unlock()
|
||||
oldConn := cp.getConn(unique)
|
||||
if oldConn != nil {
|
||||
// there was a race, oldConn already exists
|
||||
ce.destroy()
|
||||
return oldConn, nil
|
||||
}
|
||||
// this really is a new connection, add it to the map
|
||||
cp.conns[unique] = ce
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Reduce the reference count of the rados.Conn object that was returned with
|
||||
// ConnPool.Get().
|
||||
func (cp *ConnPool) Put(conn *rados.Conn) {
|
||||
cp.lock.Lock()
|
||||
defer cp.lock.Unlock()
|
||||
|
||||
for _, ce := range cp.conns {
|
||||
if ce.conn == conn {
|
||||
ce.put()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a reference to the connEntry.
|
||||
// /!\ Only call this while holding the ConnPool.lock.
|
||||
func (ce *connEntry) get() {
|
||||
ce.lastUsed = time.Now()
|
||||
ce.users++
|
||||
}
|
||||
|
||||
// Reduce number of references. If this returns true, there are no more users.
|
||||
// /!\ Only call this while holding the ConnPool.lock.
|
||||
func (ce *connEntry) put() {
|
||||
ce.users--
|
||||
// do not call ce.destroy(), let ConnPool.gc() do that
|
||||
}
|
||||
|
||||
// Destroy a connEntry object, close the connection to the Ceph cluster.
|
||||
func (ce *connEntry) destroy() {
|
||||
if ce.conn != nil {
|
||||
ce.conn.Shutdown()
|
||||
ce.conn = nil
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user