etcdb/etcd-spec.go
2022-12-05 11:12:52 +01:00

172 lines
3.1 KiB
Go

package etcdb
import (
"context"
"fmt"
"strings"
etcdclient "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/mirror"
"go.etcd.io/etcd/client/v3/namespace"
)
type EtcdSpec struct {
client *ClientSpec
prefix string
}
func (spec EtcdSpec) Sub(prefix string) EtcdSpec {
p := spec.prefix + prefix
if !strings.HasSuffix(p, "/") {
p += "/"
}
return EtcdSpec{
client: spec.client,
prefix: p,
}
}
func (spec EtcdSpec) In(ctx context.Context) DB {
client := spec.client.Client()
client.KV = namespace.NewKV(client.KV, spec.prefix)
client.Watcher = namespace.NewWatcher(client.Watcher, spec.prefix)
client.Lease = namespace.NewLease(client.Lease, spec.prefix)
return EtcdDB{
client: client,
ctx: ctx,
}
}
type EtcdDB struct {
client *etcdclient.Client
ctx context.Context
}
func (db EtcdDB) Get(key string) (value []byte, ok bool, err error) {
get, err := db.client.KV.Get(db.ctx, key)
if err != nil {
return
}
if len(get.Kvs) == 0 {
return
}
value = get.Kvs[0].Value
ok = true
return
}
func (db EtcdDB) Put(key string, val []byte) (err error) {
_, err = db.client.KV.Put(db.ctx, key, string(val))
return
}
func (db EtcdDB) Del(key string) (err error) {
_, err = db.client.KV.Delete(db.ctx, key)
return
}
func (db EtcdDB) ForEach(callback func(key string, value []byte) (cont bool)) (err error) {
withSort := etcdclient.WithSort(etcdclient.SortByKey, etcdclient.SortAscend)
opts := []etcdclient.OpOption{withSort, etcdclient.WithFromKey()}
key := "\x00"
for {
get, err := db.client.KV.Get(db.ctx, key, opts...)
if err != nil {
return err
}
for _, kv := range get.Kvs {
cont := callback(string(kv.Key), kv.Value)
if !cont {
return nil
}
}
if !get.More {
return nil
}
// wait, there's more!
key = string(append(get.Kvs[len(get.Kvs)-1].Key, 0))
}
}
func (db EtcdDB) Watch(rev int64) <-chan WatchEvent {
ch := make(chan WatchEvent, 1)
go func() {
defer close(ch)
sync := mirror.NewSyncer(db.client, "", rev)
gets, errs := sync.SyncBase(db.ctx)
var maxRev int64
for get := range gets {
for _, kv := range get.Kvs {
ch <- WatchEvent{
Key: string(kv.Key),
Value: kv.Value,
}
}
if get.Header.Revision > maxRev {
maxRev = get.Header.Revision
}
}
err, _ := <-errs
if err != nil {
ch <- WatchEvent{Err: err}
return
}
if maxRev != 0 {
ch <- WatchEvent{NewRev: maxRev}
}
watchChan := sync.SyncUpdates(db.ctx)
for watchEvent := range watchChan {
for _, evt := range watchEvent.Events {
switch evt.Type {
case etcdclient.EventTypePut:
ch <- WatchEvent{
Key: string(evt.Kv.Key),
Value: evt.Kv.Value,
}
case etcdclient.EventTypeDelete:
ch <- WatchEvent{
Key: string(evt.Kv.Key),
Delete: true,
Value: evt.Kv.Value,
}
default:
ch <- WatchEvent{
Err: fmt.Errorf("unknown event type: %d", evt.Type),
}
}
}
err := watchEvent.Err()
if err == nil {
ch <- WatchEvent{
NewRev: watchEvent.Header.Revision,
}
} else {
ch <- WatchEvent{
Err: err,
}
}
}
}()
return ch
}