mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
84
vendor/k8s.io/apimachinery/pkg/watch/filter_test.go
generated
vendored
84
vendor/k8s.io/apimachinery/pkg/watch/filter_test.go
generated
vendored
@ -1,84 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
. "k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
func TestFilter(t *testing.T) {
|
||||
table := []Event{
|
||||
{Type: Added, Object: testType("foo")},
|
||||
{Type: Added, Object: testType("bar")},
|
||||
{Type: Added, Object: testType("baz")},
|
||||
{Type: Added, Object: testType("qux")},
|
||||
{Type: Added, Object: testType("zoo")},
|
||||
}
|
||||
|
||||
source := NewFake()
|
||||
filtered := Filter(source, func(e Event) (Event, bool) {
|
||||
return e, e.Object.(testType)[0] != 'b'
|
||||
})
|
||||
|
||||
go func() {
|
||||
for _, item := range table {
|
||||
source.Action(item.Type, item.Object)
|
||||
}
|
||||
source.Stop()
|
||||
}()
|
||||
|
||||
var got []string
|
||||
for {
|
||||
event, ok := <-filtered.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
got = append(got, string(event.Object.(testType)))
|
||||
}
|
||||
|
||||
if e, a := []string{"foo", "qux", "zoo"}, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("got %v, wanted %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterStop(t *testing.T) {
|
||||
source := NewFake()
|
||||
filtered := Filter(source, func(e Event) (Event, bool) {
|
||||
return e, e.Object.(testType)[0] != 'b'
|
||||
})
|
||||
|
||||
go func() {
|
||||
source.Add(testType("foo"))
|
||||
filtered.Stop()
|
||||
}()
|
||||
|
||||
var got []string
|
||||
for {
|
||||
event, ok := <-filtered.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
got = append(got, string(event.Object.(testType)))
|
||||
}
|
||||
|
||||
if e, a := []string{"foo"}, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("got %v, wanted %v", e, a)
|
||||
}
|
||||
}
|
176
vendor/k8s.io/apimachinery/pkg/watch/mux_test.go
generated
vendored
176
vendor/k8s.io/apimachinery/pkg/watch/mux_test.go
generated
vendored
@ -1,176 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
. "k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type myType struct {
|
||||
ID string
|
||||
Value string
|
||||
}
|
||||
|
||||
func (obj *myType) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
|
||||
func (obj *myType) DeepCopyObject() runtime.Object {
|
||||
if obj == nil {
|
||||
return nil
|
||||
}
|
||||
clone := *obj
|
||||
return &clone
|
||||
}
|
||||
|
||||
func TestBroadcaster(t *testing.T) {
|
||||
table := []Event{
|
||||
{Type: Added, Object: &myType{"foo", "hello world 1"}},
|
||||
{Type: Added, Object: &myType{"bar", "hello world 2"}},
|
||||
{Type: Modified, Object: &myType{"foo", "goodbye world 3"}},
|
||||
{Type: Deleted, Object: &myType{"bar", "hello world 4"}},
|
||||
}
|
||||
|
||||
// The broadcaster we're testing
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
|
||||
// Add a bunch of watchers
|
||||
const testWatchers = 2
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(testWatchers)
|
||||
for i := 0; i < testWatchers; i++ {
|
||||
// Verify that each watcher gets the events in the correct order
|
||||
go func(watcher int, w Interface) {
|
||||
tableLine := 0
|
||||
for {
|
||||
event, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if e, a := table[tableLine], event; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Watcher %v, line %v: Expected (%v, %#v), got (%v, %#v)",
|
||||
watcher, tableLine, e.Type, e.Object, a.Type, a.Object)
|
||||
} else {
|
||||
t.Logf("Got (%v, %#v)", event.Type, event.Object)
|
||||
}
|
||||
tableLine++
|
||||
}
|
||||
wg.Done()
|
||||
}(i, m.Watch())
|
||||
}
|
||||
|
||||
for i, item := range table {
|
||||
t.Logf("Sending %v", i)
|
||||
m.Action(item.Type, item.Object)
|
||||
}
|
||||
|
||||
m.Shutdown()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestBroadcasterWatcherClose(t *testing.T) {
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
w := m.Watch()
|
||||
w2 := m.Watch()
|
||||
w.Stop()
|
||||
m.Shutdown()
|
||||
if _, open := <-w.ResultChan(); open {
|
||||
t.Errorf("Stop didn't work?")
|
||||
}
|
||||
if _, open := <-w2.ResultChan(); open {
|
||||
t.Errorf("Shutdown didn't work?")
|
||||
}
|
||||
// Extra stops don't hurt things
|
||||
w.Stop()
|
||||
w2.Stop()
|
||||
}
|
||||
|
||||
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
||||
done := make(chan bool)
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
go func(w0, w1 Interface) {
|
||||
// We know Broadcaster is in the distribute loop once one watcher receives
|
||||
// an event. Stop the other watcher while distribute is trying to
|
||||
// send to it.
|
||||
select {
|
||||
case <-w0.ResultChan():
|
||||
w1.Stop()
|
||||
case <-w1.ResultChan():
|
||||
w0.Stop()
|
||||
}
|
||||
close(done)
|
||||
}(m.Watch(), m.Watch())
|
||||
m.Action(Added, &myType{})
|
||||
select {
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Error("timeout: deadlocked")
|
||||
case <-done:
|
||||
}
|
||||
m.Shutdown()
|
||||
}
|
||||
|
||||
func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||
m := NewBroadcaster(1, DropIfChannelFull)
|
||||
|
||||
event1 := Event{Type: Added, Object: &myType{"foo", "hello world 1"}}
|
||||
event2 := Event{Type: Added, Object: &myType{"bar", "hello world 2"}}
|
||||
|
||||
// Add a couple watchers
|
||||
watches := make([]Interface, 2)
|
||||
for i := range watches {
|
||||
watches[i] = m.Watch()
|
||||
}
|
||||
|
||||
// Send a couple events before closing the broadcast channel.
|
||||
t.Log("Sending event 1")
|
||||
m.Action(event1.Type, event1.Object)
|
||||
t.Log("Sending event 2")
|
||||
m.Action(event2.Type, event2.Object)
|
||||
m.Shutdown()
|
||||
|
||||
// Pull events from the queue.
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(watches))
|
||||
for i := range watches {
|
||||
// Verify that each watcher only gets the first event because its watch
|
||||
// queue of length one was full from the first one.
|
||||
go func(watcher int, w Interface) {
|
||||
defer wg.Done()
|
||||
e1, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Errorf("Watcher %v failed to retrieve first event.", watcher)
|
||||
}
|
||||
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
|
||||
watcher, e.Type, e.Object, a.Type, a.Object)
|
||||
}
|
||||
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
||||
e2, ok := <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
|
||||
watcher, e2.Type, e2.Object)
|
||||
}
|
||||
}(i, watches[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
68
vendor/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go
generated
vendored
68
vendor/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go
generated
vendored
@ -1,68 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch_test
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
. "k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type fakeDecoder struct {
|
||||
items chan Event
|
||||
}
|
||||
|
||||
func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) {
|
||||
item, open := <-f.items
|
||||
if !open {
|
||||
return action, nil, io.EOF
|
||||
}
|
||||
return item.Type, item.Object, nil
|
||||
}
|
||||
|
||||
func (f fakeDecoder) Close() {
|
||||
close(f.items)
|
||||
}
|
||||
|
||||
func TestStreamWatcher(t *testing.T) {
|
||||
table := []Event{
|
||||
{Type: Added, Object: testType("foo")},
|
||||
}
|
||||
|
||||
fd := fakeDecoder{make(chan Event, 5)}
|
||||
sw := NewStreamWatcher(fd)
|
||||
|
||||
for _, item := range table {
|
||||
fd.items <- item
|
||||
got, open := <-sw.ResultChan()
|
||||
if !open {
|
||||
t.Errorf("unexpected early close")
|
||||
}
|
||||
if e, a := item, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
sw.Stop()
|
||||
_, open := <-sw.ResultChan()
|
||||
if open {
|
||||
t.Errorf("Unexpected failure to close")
|
||||
}
|
||||
}
|
175
vendor/k8s.io/apimachinery/pkg/watch/watch_test.go
generated
vendored
175
vendor/k8s.io/apimachinery/pkg/watch/watch_test.go
generated
vendored
@ -1,175 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
. "k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type testType string
|
||||
|
||||
func (obj testType) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
|
||||
func (obj testType) DeepCopyObject() runtime.Object { return obj }
|
||||
|
||||
func TestFake(t *testing.T) {
|
||||
f := NewFake()
|
||||
|
||||
table := []struct {
|
||||
t EventType
|
||||
s testType
|
||||
}{
|
||||
{Added, testType("foo")},
|
||||
{Modified, testType("qux")},
|
||||
{Modified, testType("bar")},
|
||||
{Deleted, testType("bar")},
|
||||
{Error, testType("error: blah")},
|
||||
}
|
||||
|
||||
// Prove that f implements Interface by phrasing this as a function.
|
||||
consumer := func(w Interface) {
|
||||
for _, expect := range table {
|
||||
got, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Fatalf("closed early")
|
||||
}
|
||||
if e, a := expect.t, got.Type; e != a {
|
||||
t.Fatalf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if a, ok := got.Object.(testType); !ok || a != expect.s {
|
||||
t.Fatalf("Expected %v, got %v", expect.s, a)
|
||||
}
|
||||
}
|
||||
_, stillOpen := <-w.ResultChan()
|
||||
if stillOpen {
|
||||
t.Fatal("Never stopped")
|
||||
}
|
||||
}
|
||||
|
||||
sender := func() {
|
||||
f.Add(testType("foo"))
|
||||
f.Action(Modified, testType("qux"))
|
||||
f.Modify(testType("bar"))
|
||||
f.Delete(testType("bar"))
|
||||
f.Error(testType("error: blah"))
|
||||
f.Stop()
|
||||
}
|
||||
|
||||
go sender()
|
||||
consumer(f)
|
||||
}
|
||||
|
||||
func TestRaceFreeFake(t *testing.T) {
|
||||
f := NewRaceFreeFake()
|
||||
|
||||
table := []struct {
|
||||
t EventType
|
||||
s testType
|
||||
}{
|
||||
{Added, testType("foo")},
|
||||
{Modified, testType("qux")},
|
||||
{Modified, testType("bar")},
|
||||
{Deleted, testType("bar")},
|
||||
{Error, testType("error: blah")},
|
||||
}
|
||||
|
||||
// Prove that f implements Interface by phrasing this as a function.
|
||||
consumer := func(w Interface) {
|
||||
for _, expect := range table {
|
||||
got, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Fatalf("closed early")
|
||||
}
|
||||
if e, a := expect.t, got.Type; e != a {
|
||||
t.Fatalf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if a, ok := got.Object.(testType); !ok || a != expect.s {
|
||||
t.Fatalf("Expected %v, got %v", expect.s, a)
|
||||
}
|
||||
}
|
||||
_, stillOpen := <-w.ResultChan()
|
||||
if stillOpen {
|
||||
t.Fatal("Never stopped")
|
||||
}
|
||||
}
|
||||
|
||||
sender := func() {
|
||||
f.Add(testType("foo"))
|
||||
f.Action(Modified, testType("qux"))
|
||||
f.Modify(testType("bar"))
|
||||
f.Delete(testType("bar"))
|
||||
f.Error(testType("error: blah"))
|
||||
f.Stop()
|
||||
}
|
||||
|
||||
go sender()
|
||||
consumer(f)
|
||||
}
|
||||
|
||||
func TestEmpty(t *testing.T) {
|
||||
w := NewEmptyWatch()
|
||||
_, ok := <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("unexpected result channel result")
|
||||
}
|
||||
w.Stop()
|
||||
_, ok = <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("unexpected result channel result")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxyWatcher(t *testing.T) {
|
||||
events := []Event{
|
||||
{Added, testType("foo")},
|
||||
{Modified, testType("qux")},
|
||||
{Modified, testType("bar")},
|
||||
{Deleted, testType("bar")},
|
||||
{Error, testType("error: blah")},
|
||||
}
|
||||
|
||||
ch := make(chan Event, len(events))
|
||||
w := NewProxyWatcher(ch)
|
||||
|
||||
for _, e := range events {
|
||||
ch <- e
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
g := <-w.ResultChan()
|
||||
if !reflect.DeepEqual(e, g) {
|
||||
t.Errorf("Expected %#v, got %#v", e, g)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
w.Stop()
|
||||
|
||||
select {
|
||||
// Closed channel always reads immediately
|
||||
case <-w.StopChan():
|
||||
default:
|
||||
t.Error("Channel isn't closed")
|
||||
}
|
||||
|
||||
// Test double close
|
||||
w.Stop()
|
||||
}
|
Reference in New Issue
Block a user