etcdb/etcdb.go
2022-12-05 11:11:22 +01:00

187 lines
3.2 KiB
Go

package etcdb
import (
"bytes"
"context"
"fmt"
"path"
)
func Prefix(names ...string) string {
return path.Join(names...) + "/"
}
type Spec interface {
In(ctx context.Context) DB
}
type DB interface {
Get(key string) (value []byte, ok bool, err error)
Put(key string, value []byte) error
Del(key string) error
ForEach(callback func(key string, value []byte) (cont bool)) error
Watch(rev int64) <-chan WatchEvent
}
type WatchEvent struct {
Err error
Key string
Value []byte
Delete bool
NewRev int64
}
type TypedDB[T any] struct {
DB DB
Codec Codec
}
type TypedWatchEvent[T any] struct {
Err error
Key string
Value T
Delete bool
NewRev int64
}
func (e *TypedWatchEvent[T]) FromEvent(evt WatchEvent) {
e.Err = evt.Err
e.Key = evt.Key
e.Delete = evt.Delete
e.NewRev = evt.NewRev
}
func (tdb TypedDB[T]) Get(key string) (v T, ok bool, err error) {
raw, ok, err := tdb.DB.Get(key)
if err != nil {
return
}
if !ok {
return
}
err = tdb.Codec.Decode(raw, &v)
if err != nil {
err = fmt.Errorf("invalid value: %w", err)
ok = false
return
}
ok = true
return
}
func (tdb TypedDB[T]) Put(key string, value T) (err error) {
ba, err := tdb.Codec.Encode(value)
if err != nil {
err = fmt.Errorf("failed to encode value: %w", err)
return
}
return tdb.DB.Put(key, ba)
}
func (tdb TypedDB[T]) Del(key string) error {
return tdb.DB.Del(key)
}
func (tdb TypedDB[T]) ForEach(callback func(key string, value T) (cont bool)) (err error) {
forEachErr := tdb.DB.ForEach(func(key string, encodedValue []byte) (cont bool) {
value := new(T)
if err := tdb.Codec.Decode(encodedValue, value); err != nil {
err = fmt.Errorf("invalid value at %q: %w", key, err)
return false
}
return callback(key, *value)
})
if err == nil && forEachErr != nil {
err = forEachErr
return
}
return
}
func (tdb TypedDB[T]) Watch(rev int64) <-chan TypedWatchEvent[T] {
ch := make(chan TypedWatchEvent[T], 1)
go func() {
defer close(ch)
inCh := tdb.DB.Watch(rev)
for evt := range inCh {
tevt := TypedWatchEvent[T]{}
tevt.FromEvent(evt)
if len(evt.Value) == 0 {
ch <- tevt
continue
}
value := new(T)
if err := tdb.Codec.Decode(evt.Value, value); err != nil {
tevt.Err = fmt.Errorf("invalid value: %w", err)
ch <- tevt
continue
}
tevt.Value = *value
ch <- tevt
}
}()
return ch
}
func (tdb TypedDB[T]) Sync(allData map[string]T) (err error) {
encodedData := map[string][]byte{}
for k, v := range allData {
encodedData[k], err = tdb.Codec.Encode(v)
if err != nil {
err = fmt.Errorf("failed to encode data[%q]: %w", k, err)
return
}
}
seen := make(map[string]bool, len(allData))
forEachErr := tdb.DB.ForEach(func(key string, v []byte) (cont bool) {
seen[key] = true
newV, ok := encodedData[key]
if ok {
if !bytes.Equal(newV, v) {
err = tdb.DB.Put(key, newV)
}
} else {
err = tdb.Del(key)
}
return err == nil
})
if err == nil && forEachErr != nil {
err = forEachErr
}
if err != nil {
return
}
// add any missing data
for k, v := range encodedData {
if seen[k] {
continue
}
err = tdb.DB.Put(k, v)
if err != nil {
return
}
}
return
}