package etcdb import ( "bytes" "context" "fmt" "path" ) func Prefix(names ...string) string { return path.Join(names...) + "/" } type Spec interface { In(ctx context.Context) DB } type DB interface { Get(key string) (value []byte, ok bool, err error) Put(key string, value []byte) error Del(key string) error ForEach(callback func(key string, value []byte) (cont bool)) error Watch(rev int64) <-chan WatchEvent } type WatchEvent struct { Err error Key string Value []byte Delete bool NewRev int64 } type TypedDB[T any] struct { DB DB Codec Codec } type TypedWatchEvent[T any] struct { Err error Key string Value T Delete bool NewRev int64 } func (e *TypedWatchEvent[T]) FromEvent(evt WatchEvent) { e.Err = evt.Err e.Key = evt.Key e.Delete = evt.Delete e.NewRev = evt.NewRev } func (tdb TypedDB[T]) Get(key string) (v T, ok bool, err error) { raw, ok, err := tdb.DB.Get(key) if err != nil { return } if !ok { return } err = tdb.Codec.Decode(raw, &v) if err != nil { err = fmt.Errorf("invalid value: %w", err) ok = false return } ok = true return } func (tdb TypedDB[T]) Put(key string, value T) (err error) { ba, err := tdb.Codec.Encode(value) if err != nil { err = fmt.Errorf("failed to encode value: %w", err) return } return tdb.DB.Put(key, ba) } func (tdb TypedDB[T]) Del(key string) error { return tdb.DB.Del(key) } func (tdb TypedDB[T]) ForEach(callback func(key string, value T) (cont bool)) (err error) { forEachErr := tdb.DB.ForEach(func(key string, encodedValue []byte) (cont bool) { value := new(T) if err := tdb.Codec.Decode(encodedValue, value); err != nil { err = fmt.Errorf("invalid value at %q: %w", key, err) return false } return callback(key, *value) }) if err == nil && forEachErr != nil { err = forEachErr return } return } func (tdb TypedDB[T]) Watch(rev int64) <-chan TypedWatchEvent[T] { ch := make(chan TypedWatchEvent[T], 1) go func() { defer close(ch) inCh := tdb.DB.Watch(rev) for evt := range inCh { tevt := TypedWatchEvent[T]{} tevt.FromEvent(evt) if len(evt.Value) == 0 { ch <- tevt continue } value := new(T) if err := tdb.Codec.Decode(evt.Value, value); err != nil { tevt.Err = fmt.Errorf("invalid value: %w", err) ch <- tevt continue } tevt.Value = *value ch <- tevt } }() return ch } func (tdb TypedDB[T]) Sync(allData map[string]T) (err error) { encodedData := map[string][]byte{} for k, v := range allData { encodedData[k], err = tdb.Codec.Encode(v) if err != nil { err = fmt.Errorf("failed to encode data[%q]: %w", k, err) return } } seen := make(map[string]bool, len(allData)) forEachErr := tdb.DB.ForEach(func(key string, v []byte) (cont bool) { seen[key] = true newV, ok := encodedData[key] if ok { if !bytes.Equal(newV, v) { err = tdb.DB.Put(key, newV) } } else { err = tdb.Del(key) } return err == nil }) if err == nil && forEachErr != nil { err = forEachErr } if err != nil { return } // add any missing data for k, v := range encodedData { if seen[k] { continue } err = tdb.DB.Put(k, v) if err != nil { return } } return }