package rados /* #cgo LDFLAGS: -lrados #include #include extern void watchNotifyCb(void*, uint64_t, uint64_t, uint64_t, void*, size_t); extern void watchErrorCb(void*, uint64_t, int); */ import "C" import ( "encoding/binary" "fmt" "math" "sync" "time" "unsafe" "github.com/ceph/go-ceph/internal/log" ) type ( // WatcherID is the unique id of a Watcher. WatcherID uint64 // NotifyID is the unique id of a NotifyEvent. NotifyID uint64 // NotifierID is the unique id of a notifying client. NotifierID uint64 ) // NotifyEvent is received by a watcher for each notification. type NotifyEvent struct { ID NotifyID WatcherID WatcherID NotifierID NotifierID Data []byte } // NotifyAck represents an acknowleged notification. type NotifyAck struct { WatcherID WatcherID NotifierID NotifierID Response []byte } // NotifyTimeout represents an unacknowleged notification. type NotifyTimeout struct { WatcherID WatcherID NotifierID NotifierID } // Watcher receives all notifications for certain object. type Watcher struct { id WatcherID oid string ioctx *IOContext events chan NotifyEvent errors chan error done chan struct{} } var ( watchers = map[WatcherID]*Watcher{} watchersMtx sync.RWMutex ) // Watch creates a Watcher for the specified object. // // A Watcher receives all notifications that are sent to the object on which it // has been created. It exposes two read-only channels: Events() receives all // the NotifyEvents and Errors() receives all occuring errors. A typical code // creating a Watcher could look like this: // // watcher, err := ioctx.Watch(oid) // go func() { // event handler // for ne := range watcher.Events() { // ... // ne.Ack([]byte("response data...")) // ... // } // }() // go func() { // error handler // for err := range watcher.Errors() { // ... handle err ... // } // }() // // CAUTION: the Watcher references the IOContext in which it has been created. // Therefore all watchers must be deleted with the Delete() method before the // IOContext is being destroyed. // // Implements: // // int rados_watch2(rados_ioctx_t io, const char* o, uint64_t* cookie, // rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, void* arg) func (ioctx *IOContext) Watch(obj string) (*Watcher, error) { return ioctx.WatchWithTimeout(obj, 0) } // WatchWithTimeout creates a watcher on an object. Same as Watcher(), but // different timeout than the default can be specified. // // Implements: // // int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, // rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, // void *arg); func (ioctx *IOContext) WatchWithTimeout(oid string, timeout time.Duration) (*Watcher, error) { cObj := C.CString(oid) defer C.free(unsafe.Pointer(cObj)) var id C.uint64_t watchersMtx.Lock() defer watchersMtx.Unlock() ret := C.rados_watch3( ioctx.ioctx, cObj, &id, (C.rados_watchcb2_t)(C.watchNotifyCb), (C.rados_watcherrcb_t)(C.watchErrorCb), C.uint32_t(timeout.Milliseconds()/1000), nil, ) if err := getError(ret); err != nil { return nil, err } evCh := make(chan NotifyEvent) errCh := make(chan error) w := &Watcher{ id: WatcherID(id), ioctx: ioctx, oid: oid, events: evCh, errors: errCh, done: make(chan struct{}), } watchers[WatcherID(id)] = w return w, nil } // ID returns the WatcherId of the Watcher func (w *Watcher) ID() WatcherID { return w.id } // Events returns a read-only channel, that receives all notifications that are // sent to the object of the Watcher. func (w *Watcher) Events() <-chan NotifyEvent { return w.events } // Errors returns a read-only channel, that receives all errors for the Watcher. func (w *Watcher) Errors() <-chan error { return w.errors } // Check on the status of a Watcher. // // Returns the time since it was last confirmed. If there is an error, the // Watcher is no longer valid, and should be destroyed with the Delete() method. // // Implements: // // int rados_watch_check(rados_ioctx_t io, uint64_t cookie) func (w *Watcher) Check() (time.Duration, error) { ret := C.rados_watch_check(w.ioctx.ioctx, C.uint64_t(w.id)) if ret < 0 { return 0, getError(ret) } return time.Millisecond * time.Duration(ret), nil } // Delete the watcher. This closes both the event and error channel. // // Implements: // // int rados_unwatch2(rados_ioctx_t io, uint64_t cookie) func (w *Watcher) Delete() error { watchersMtx.Lock() _, ok := watchers[w.id] if ok { delete(watchers, w.id) } watchersMtx.Unlock() if !ok { return nil } ret := C.rados_unwatch2(w.ioctx.ioctx, C.uint64_t(w.id)) if ret != 0 { return getError(ret) } close(w.done) // unblock blocked callbacks close(w.events) close(w.errors) return nil } // Notify sends a notification with the provided data to all Watchers of the // specified object. // // CAUTION: even if the error is not nil. the returned slices // might still contain data. func (ioctx *IOContext) Notify(obj string, data []byte) ([]NotifyAck, []NotifyTimeout, error) { return ioctx.NotifyWithTimeout(obj, data, 0) } // NotifyWithTimeout is like Notify() but with a different timeout than the // default. // // Implements: // // int rados_notify2(rados_ioctx_t io, const char* o, const char* buf, int buf_len, // uint64_t timeout_ms, char** reply_buffer, size_t* reply_buffer_len) func (ioctx *IOContext) NotifyWithTimeout(obj string, data []byte, timeout time.Duration) ([]NotifyAck, []NotifyTimeout, error) { cObj := C.CString(obj) defer C.free(unsafe.Pointer(cObj)) var cResponse *C.char defer C.rados_buffer_free(cResponse) var responseLen C.size_t var dataPtr *C.char if len(data) > 0 { dataPtr = (*C.char)(unsafe.Pointer(&data[0])) } ret := C.rados_notify2( ioctx.ioctx, cObj, dataPtr, C.int(len(data)), C.uint64_t(timeout.Milliseconds()), &cResponse, &responseLen, ) // cResponse has been set even if an error is returned, so we decode it anyway acks, timeouts := decodeNotifyResponse(cResponse, responseLen) return acks, timeouts, getError(ret) } // Ack sends an acknowledgement with the specified response data to the notfier // of the NotifyEvent. If a notify is not ack'ed, the originating Notify() call // blocks and eventiually times out. // // Implements: // // int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, // uint64_t cookie, const char *buf, int buf_len) func (ne *NotifyEvent) Ack(response []byte) error { watchersMtx.RLock() w, ok := watchers[ne.WatcherID] watchersMtx.RUnlock() if !ok { return fmt.Errorf("can't ack on deleted watcher %v", ne.WatcherID) } cOID := C.CString(w.oid) defer C.free(unsafe.Pointer(cOID)) var respPtr *C.char if len(response) > 0 { respPtr = (*C.char)(unsafe.Pointer(&response[0])) } ret := C.rados_notify_ack( w.ioctx.ioctx, cOID, C.uint64_t(ne.ID), C.uint64_t(ne.WatcherID), respPtr, C.int(len(response)), ) return getError(ret) } // WatcherFlush flushes all pending notifications of the cluster. // // Implements: // // int rados_watch_flush(rados_t cluster) func (c *Conn) WatcherFlush() error { if !c.connected { return ErrNotConnected } ret := C.rados_watch_flush(c.cluster) return getError(ret) } // decoder for this notify response format: // // le32 num_acks // { // le64 gid global id for the client (for client.1234 that's 1234) // le64 cookie cookie for the client // le32 buflen length of reply message buffer // u8 buflen payload // } num_acks // le32 num_timeouts // { // le64 gid global id for the client // le64 cookie cookie for the client // } num_timeouts // // NOTE: starting with pacific this is implemented as a C function and this can // be replaced later func decodeNotifyResponse(response *C.char, length C.size_t) ([]NotifyAck, []NotifyTimeout) { if length == 0 || response == nil { return nil, nil } b := (*[math.MaxInt32]byte)(unsafe.Pointer(response))[:length:length] pos := 0 num := binary.LittleEndian.Uint32(b[pos:]) pos += 4 acks := make([]NotifyAck, num) for i := range acks { acks[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) pos += 8 acks[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) pos += 8 dataLen := binary.LittleEndian.Uint32(b[pos:]) pos += 4 if dataLen > 0 { acks[i].Response = C.GoBytes(unsafe.Pointer(&b[pos]), C.int(dataLen)) pos += int(dataLen) } } num = binary.LittleEndian.Uint32(b[pos:]) pos += 4 timeouts := make([]NotifyTimeout, num) for i := range timeouts { timeouts[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) pos += 8 timeouts[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) pos += 8 } return acks, timeouts } //export watchNotifyCb func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t, notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) { ev := NotifyEvent{ ID: NotifyID(notifyID), WatcherID: WatcherID(id), NotifierID: NotifierID(notifierID), } if dataLen > 0 { ev.Data = C.GoBytes(cData, C.int(dataLen)) } watchersMtx.RLock() w, ok := watchers[WatcherID(id)] watchersMtx.RUnlock() if !ok { // usually this should not happen, but who knows log.Warnf("received notification for unknown watcher ID: %#v", ev) return } select { case <-w.done: // unblock when deleted case w.events <- ev: } } //export watchErrorCb func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) { watchersMtx.RLock() w, ok := watchers[WatcherID(id)] watchersMtx.RUnlock() if !ok { // usually this should not happen, but who knows log.Warnf("received error for unknown watcher ID: id=%d err=%#v", id, err) return } select { case <-w.done: // unblock when deleted case w.errors <- getError(err): } }