Skip to content

Commit

Permalink
Propagate errors
Browse files Browse the repository at this point in the history
  • Loading branch information
boz committed Sep 21, 2017
1 parent 3b9f330 commit 144f4ca
Show file tree
Hide file tree
Showing 26 changed files with 679 additions and 234 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
- make install-deps

env:
- KCACHE_TEST_ASYNC_DURATION=40ms
- KCACHE_TEST_ASYNC_DURATION=50ms

script: |
make test-full && \
Expand Down
40 changes: 24 additions & 16 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
lifecycle "github.com/boz/go-lifecycle"
logutil "github.com/boz/go-logutil"
"github.com/boz/kcache/filter"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -18,10 +19,11 @@ type CacheReader interface {

type cache interface {
CacheReader
sync([]metav1.Object) []Event
update(Event) []Event
refilter([]metav1.Object, filter.Filter) []Event
sync([]metav1.Object) ([]Event, error)
update(Event) ([]Event, error)
refilter([]metav1.Object, filter.Filter) ([]Event, error)
Done() <-chan struct{}
Error() error
}

type cacheKey struct {
Expand Down Expand Up @@ -94,55 +96,61 @@ func newCache(ctx context.Context, log logutil.Log, stopch <-chan struct{}, filt
return c
}

func (c *_cache) sync(list []metav1.Object) []Event {
func (c *_cache) sync(list []metav1.Object) ([]Event, error) {
resultch := make(chan []Event, 1)
request := syncRequest{list, resultch}

select {
case <-c.lc.ShuttingDown():
return nil
return nil, errors.WithStack(ErrNotRunning)
case c.syncch <- request:
}

return <-resultch
return <-resultch, nil
}

func (c *_cache) update(evt Event) []Event {
func (c *_cache) update(evt Event) ([]Event, error) {

resultch := make(chan []Event, 1)
request := updateRequest{evt, resultch}

select {
case <-c.lc.ShuttingDown():
return nil
return nil, errors.WithStack(ErrNotRunning)
case c.updatech <- request:
}

return <-resultch
return <-resultch, nil

}

func (c *_cache) refilter(list []metav1.Object, filter filter.Filter) []Event {
func (c *_cache) refilter(list []metav1.Object, filter filter.Filter) ([]Event, error) {
resultch := make(chan []Event, 1)
request := refilterRequest{list, filter, resultch}

select {
case <-c.lc.ShuttingDown():
return nil
return nil, errors.WithStack(ErrNotRunning)
case c.refilterch <- request:
}

return <-resultch
return <-resultch, nil
}

func (c *_cache) Done() <-chan struct{} {
return c.lc.Done()
}

func (c *_cache) Error() error {
return c.lc.Error()
}

func (c *_cache) List() ([]metav1.Object, error) {
resultch := make(chan []metav1.Object, 1)

select {
case <-c.lc.ShuttingDown():
return nil, ErrNotRunning
return nil, errors.WithStack(ErrNotRunning)
case c.listch <- resultch:
}

Expand All @@ -159,7 +167,7 @@ func (c *_cache) Get(ns, name string) (metav1.Object, error) {
request := getRequest{key, resultch}
select {
case <-c.lc.ShuttingDown():
return nil, ErrNotRunning
return nil, errors.WithStack(ErrNotRunning)
case c.getch <- request:
}
return <-resultch, nil
Expand All @@ -183,8 +191,8 @@ func (c *_cache) run() {
} else {
request.resultch <- nil
}
case <-c.lc.ShutdownRequest():
c.lc.ShutdownInitiated()
case err := <-c.lc.ShutdownRequest():
c.lc.ShutdownInitiated(err)
return
}
}
Expand Down
97 changes: 70 additions & 27 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
logutil "github.com/boz/go-logutil"
"github.com/boz/kcache/filter"
"github.com/boz/kcache/testutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -35,10 +36,12 @@ func TestCache_Sync(t *testing.T) {

cache := newCache(ctx, log, stopch, filter)

assert.Len(t, cache.sync(initial), len(initial))

events := cache.sync(secondary)
evs, err := cache.sync(initial)
assert.NoError(t, err)
assert.Len(t, evs, len(initial))

events, err := cache.sync(secondary)
assert.NoError(t, err)
require.Len(t, events, 3)

found := make(map[string]bool)
Expand Down Expand Up @@ -102,24 +105,29 @@ func TestCache_update(t *testing.T) {
cache := newCache(ctx, log, stopch, filter)

// first sync returns zero events
assert.NotEmpty(t, cache.sync(initial))
evs, err := cache.sync(initial)
assert.NoError(t, err)
assert.NotEmpty(t, evs)

{
events := cache.update(testGenEvent(EventTypeUpdate, "default", "pod-1", "3"))
events, err := cache.update(testGenEvent(EventTypeUpdate, "default", "pod-1", "3"))
assert.NoError(t, err)
require.Len(t, events, 1)
assert.Equal(t, EventTypeUpdate, events[0].Type())
assert.Equal(t, "pod-1", events[0].Resource().GetName())
}

{
events := cache.update(testGenEvent(EventTypeDelete, "default", "pod-2", "4"))
events, err := cache.update(testGenEvent(EventTypeDelete, "default", "pod-2", "4"))
assert.NoError(t, err)
require.Len(t, events, 1)
assert.Equal(t, EventTypeDelete, events[0].Type())
assert.Equal(t, "pod-2", events[0].Resource().GetName())
}

{
events := cache.update(testGenEvent(EventTypeCreate, "default", "pod-3", "5"))
events, err := cache.update(testGenEvent(EventTypeCreate, "default", "pod-3", "5"))
assert.NoError(t, err)
require.Len(t, events, 1)
assert.Equal(t, EventTypeCreate, events[0].Type())
assert.Equal(t, "pod-3", events[0].Resource().GetName())
Expand Down Expand Up @@ -161,16 +169,20 @@ func TestCache_refilter(t *testing.T) {
cache := newCache(ctx, log, stopch, filter.Null())

// first sync returns zero events
assert.NotEmpty(t, cache.sync(initial))
evts, err := cache.sync(initial)
assert.NoError(t, err)
assert.NotEmpty(t, evts)

filter := filter.FN(func(obj metav1.Object) bool {
return obj.GetNamespace() == "default" &&
obj.GetName() == "pod-1" &&
obj.GetResourceVersion() < "5"
})

events := cache.refilter(initial, filter)
events, err := cache.refilter(initial, filter)
assert.NoError(t, err)
require.Len(t, events, 1)

evt := events[0]
assert.Equal(t, EventTypeDelete, evt.Type())
assert.Equal(t, "pod-2", evt.Resource().GetName())
Expand All @@ -181,15 +193,28 @@ func TestCache_refilter(t *testing.T) {
obj := list[0]
require.Equal(t, "pod-1", obj.GetName())

assert.Empty(t, cache.update(NewEvent(EventTypeDelete, testGenPod("default", "pod-2", "3"))))
assert.Empty(t, cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-2", "3"))))
assert.Empty(t, cache.update(NewEvent(EventTypeCreate, testGenPod("default", "pod-2", "3"))))
evts, err = cache.update(NewEvent(EventTypeDelete, testGenPod("default", "pod-2", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)
evts, err = cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-2", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)
evts, err = cache.update(NewEvent(EventTypeCreate, testGenPod("default", "pod-2", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)

assert.Empty(t, cache.update(NewEvent(EventTypeDelete, testGenPod("default", "pod-3", "3"))))
assert.Empty(t, cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-3", "3"))))
assert.Empty(t, cache.update(NewEvent(EventTypeCreate, testGenPod("default", "pod-3", "3"))))
evts, err = cache.update(NewEvent(EventTypeDelete, testGenPod("default", "pod-3", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)
evts, err = cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-3", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)
evts, err = cache.update(NewEvent(EventTypeCreate, testGenPod("default", "pod-3", "3")))
assert.NoError(t, err)
assert.Empty(t, evts)

evts := cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-1", "5")))
evts, err = cache.update(NewEvent(EventTypeUpdate, testGenPod("default", "pod-1", "5")))
assert.NoError(t, err)
assert.Len(t, evts, 1)
assert.Equal(t, EventTypeDelete, evts[0].Type())
assert.Equal(t, "default", evts[0].Resource().GetNamespace())
Expand All @@ -204,7 +229,9 @@ func TestCache_lifecycle_ctx(t *testing.T) {

cache := newCache(ctx, log, nil, filter.Null())

assert.Len(t, cache.sync([]metav1.Object{testGenPod("a", "b", "1")}), 1)
evts, err := cache.sync([]metav1.Object{testGenPod("a", "b", "1")})
assert.NoError(t, err)
assert.Len(t, evts, 1)

obj, err := cache.Get("a", "b")
assert.NoError(t, err)
Expand All @@ -220,16 +247,24 @@ func TestCache_lifecycle_ctx(t *testing.T) {

testutil.AssertDone(t, "cache", cache)

assert.Nil(t, cache.sync([]metav1.Object{testGenPod("a", "b", "1")}))
assert.Nil(t, cache.update(testGenEvent(EventTypeCreate, "a", "b", "2")))
assert.Nil(t, cache.refilter([]metav1.Object{testGenPod("a", "c", "3")}, filter.All()))
evts, err = cache.sync([]metav1.Object{testGenPod("a", "b", "1")})
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

evts, err = cache.update(testGenEvent(EventTypeCreate, "a", "b", "2"))
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

evts, err = cache.refilter([]metav1.Object{testGenPod("a", "c", "3")}, filter.All())
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

list, err = cache.List()
assert.Equal(t, ErrNotRunning, err)
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Empty(t, list)

obj, err = cache.Get("a", "b")
assert.Equal(t, ErrNotRunning, err)
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, obj)
}

Expand All @@ -246,15 +281,23 @@ func TestCache_lifecycle_stopch(t *testing.T) {
close(stopch)
testutil.AssertDone(t, "cache", cache)

assert.Nil(t, cache.sync([]metav1.Object{testGenPod("a", "b", "1")}))
assert.Nil(t, cache.update(testGenEvent(EventTypeCreate, "a", "b", "2")))
assert.Nil(t, cache.refilter([]metav1.Object{testGenPod("a", "c", "3")}, filter.All()))
evts, err := cache.sync([]metav1.Object{testGenPod("a", "b", "1")})
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

evts, err = cache.update(testGenEvent(EventTypeCreate, "a", "b", "2"))
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

evts, err = cache.refilter([]metav1.Object{testGenPod("a", "c", "3")}, filter.All())
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, evts)

list, err := cache.List()
assert.Equal(t, ErrNotRunning, err)
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Empty(t, list)

obj, err := cache.Get("a", "b")
assert.Equal(t, ErrNotRunning, err)
assert.Equal(t, ErrNotRunning, errors.Cause(err))
assert.Nil(t, obj)
}
Loading

0 comments on commit 144f4ca

Please sign in to comment.