package etcdb import ( "context" "fmt" "strings" etcdclient "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/mirror" "go.etcd.io/etcd/client/v3/namespace" ) type EtcdSpec struct { client *ClientSpec prefix string } func (spec EtcdSpec) Sub(prefix string) EtcdSpec { p := spec.prefix + prefix if !strings.HasSuffix(p, "/") { p += "/" } return EtcdSpec{ client: spec.client, prefix: p, } } func (spec EtcdSpec) In(ctx context.Context) DB { client := spec.client.Client() client.KV = namespace.NewKV(client.KV, spec.prefix) client.Watcher = namespace.NewWatcher(client.Watcher, spec.prefix) client.Lease = namespace.NewLease(client.Lease, spec.prefix) return EtcdDB{ client: client, ctx: ctx, } } type EtcdDB struct { client *etcdclient.Client ctx context.Context } func (db EtcdDB) Get(key string) (value []byte, ok bool, err error) { get, err := db.client.KV.Get(db.ctx, key) if err != nil { return } if len(get.Kvs) == 0 { return } value = get.Kvs[0].Value ok = true return } func (db EtcdDB) Put(key string, val []byte) (err error) { _, err = db.client.KV.Put(db.ctx, key, string(val)) return } func (db EtcdDB) Del(key string) (err error) { _, err = db.client.KV.Delete(db.ctx, key) return } func (db EtcdDB) ForEach(callback func(key string, value []byte) (cont bool)) (err error) { withSort := etcdclient.WithSort(etcdclient.SortByKey, etcdclient.SortAscend) opts := []etcdclient.OpOption{withSort, etcdclient.WithFromKey()} key := "\x00" for { get, err := db.client.KV.Get(db.ctx, key, opts...) if err != nil { return err } for _, kv := range get.Kvs { cont := callback(string(kv.Key), kv.Value) if !cont { return nil } } if !get.More { return nil } // wait, there's more! key = string(append(get.Kvs[len(get.Kvs)-1].Key, 0)) } } func (db EtcdDB) Watch(rev int64) <-chan WatchEvent { ch := make(chan WatchEvent, 1) go func() { defer close(ch) sync := mirror.NewSyncer(db.client, "", rev) gets, errs := sync.SyncBase(db.ctx) var maxRev int64 for get := range gets { for _, kv := range get.Kvs { ch <- WatchEvent{ Key: string(kv.Key), Value: kv.Value, } } if get.Header.Revision > maxRev { maxRev = get.Header.Revision } } err, _ := <-errs if err != nil { ch <- WatchEvent{Err: err} return } if maxRev != 0 { ch <- WatchEvent{NewRev: maxRev} } watchChan := sync.SyncUpdates(db.ctx) for watchEvent := range watchChan { for _, evt := range watchEvent.Events { switch evt.Type { case etcdclient.EventTypePut: ch <- WatchEvent{ Key: string(evt.Kv.Key), Value: evt.Kv.Value, } case etcdclient.EventTypeDelete: ch <- WatchEvent{ Key: string(evt.Kv.Key), Delete: true, Value: evt.Kv.Value, } default: ch <- WatchEvent{ Err: fmt.Errorf("unknown event type: %d", evt.Type), } } } err := watchEvent.Err() if err == nil { ch <- WatchEvent{ NewRev: watchEvent.Header.Revision, } } else { ch <- WatchEvent{ Err: err, } } } }() return ch }