Skip to content

Commit

Permalink
Merge pull request #14 from vimeo/fetcher_tweaks
Browse files Browse the repository at this point in the history
Match HTTP fetcher capabilities with GRPC improve testing a bit
  • Loading branch information
dfinkel authored Oct 14, 2019
2 parents 5b6af00 + 79b0e66 commit ed31064
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 57 deletions.
31 changes: 28 additions & 3 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func NewHTTPFetchProtocol(opts *HTTPOptions) *HTTPFetchProtocol {

// NewFetcher implements the Protocol interface for HTTPProtocol by constructing
// a new fetcher to fetch from peers via HTTP
// Prefixes URL with http:// if neither http:// nor https:// are prefixes of
// the URL argument.
func (hp *HTTPFetchProtocol) NewFetcher(url string) (gc.RemoteFetcher, error) {
if !(strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://")) {
url = "http://" + url
}
return &httpFetcher{transport: hp.transport, baseURL: url + hp.basePath}, nil
}

Expand Down Expand Up @@ -116,14 +121,34 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, h.basePath) {
panic("HTTPHandler serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(h.basePath):], "/", 2)
strippedPath := r.URL.Path[len(h.basePath):]
needsUnescaping := false
if r.URL.RawPath != "" && r.URL.RawPath != r.URL.Path {
strippedPath = r.URL.RawPath[len(h.basePath):]
needsUnescaping = true
}
parts := strings.SplitN(strippedPath, "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
galaxyName := parts[0]
key := parts[1]

if needsUnescaping {
gn, gnUnescapeErr := url.PathUnescape(galaxyName)
if gnUnescapeErr != nil {
http.Error(w, fmt.Sprintf("failed to unescape galaxy name %q: %s", galaxyName, gnUnescapeErr), http.StatusBadRequest)
return
}
k, keyUnescapeErr := url.PathUnescape(key)
if keyUnescapeErr != nil {
http.Error(w, fmt.Sprintf("failed to unescape key %q: %s", key, keyUnescapeErr), http.StatusBadRequest)
return
}
galaxyName, key = gn, k
}

// Fetch the value for this galaxy/key.
galaxy := h.universe.GetGalaxy(galaxyName)
if galaxy == nil {
Expand Down Expand Up @@ -156,8 +181,8 @@ func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]b
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(galaxy),
url.QueryEscape(key),
url.PathEscape(galaxy),
url.PathEscape(key),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
Expand Down
119 changes: 65 additions & 54 deletions http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,65 +45,84 @@ func TestHTTPHandler(t *testing.T) {
nGets = 100
)

var peerAddresses []string
var peerListeners []net.Listener

for i := 0; i < nRoutines; i++ {
newListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
peerAddresses = append(peerAddresses, newListener.Addr().String())
peerListeners = append(peerListeners, newListener)
}

universe := gc.NewUniverse(NewHTTPFetchProtocol(nil), "shouldBeIgnored")
serveMux := http.NewServeMux()
RegisterHTTPHandler(universe, nil, serveMux)
err := universe.Set(addrToURL(peerAddresses)...)
if err != nil {
t.Errorf("Error setting peers: %s", err)
}

getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error {
return fmt.Errorf("oh no! Local get occurred")
})
g := universe.NewGalaxy("peerFetchTest", 1<<20, getter)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, listener := range peerListeners {
go makeHTTPServerUniverse(ctx, t, peerAddresses, listener)
for _, galaxyNameLoop := range []string{"peerFetchTest", "peerFetchTestWithSlash/foobar"} {
galaxyName := galaxyNameLoop
t.Run(galaxyName, func(t *testing.T) {

var peerAddresses []string
var peerListeners []net.Listener

for i := 0; i < nRoutines; i++ {
newListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
peerAddresses = append(peerAddresses, newListener.Addr().String())
peerListeners = append(peerListeners, newListener)
}

universe := gc.NewUniverse(NewHTTPFetchProtocol(nil), "shouldBeIgnored")
serveMux := http.NewServeMux()
RegisterHTTPHandler(universe, nil, serveMux)
err := universe.Set(peerAddresses...)
if err != nil {
t.Errorf("Error setting peers: %s", err)
}

getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error {
return fmt.Errorf("oh no! Local get occurred")
})
g := universe.NewGalaxy(galaxyName, 1<<20, getter)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, listener := range peerListeners {
go makeHTTPServerUniverse(ctx, t, galaxyName, peerAddresses, listener)
}

for _, key := range testKeys(nGets) {
var value gc.StringCodec
if err := g.Get(ctx, key, &value); err != nil {
t.Fatal(err)
}
if suffix := ":" + key; !strings.HasSuffix(string(value), suffix) {
t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix)
}
t.Logf("Get key=%q, value=%q (peer:key)", key, value)
}
// Try it again, this time with a slash in the middle to ensure we're
// handling those characters properly
for _, key := range testKeys(nGets) {
var value gc.StringCodec
testKey := key + "/" + key
if err := g.Get(ctx, testKey, &value); err != nil {
t.Fatal(err)
}
if suffix := ":" + testKey; !strings.HasSuffix(string(value), suffix) {
t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix)
}
t.Logf("Get key=%q, value=%q (peer:key)", testKey, value)
}

})
}

for _, key := range testKeys(nGets) {
var value gc.StringCodec
if err := g.Get(ctx, key, &value); err != nil {
t.Fatal(err)
}
if suffix := ":" + key; !strings.HasSuffix(string(value), suffix) {
t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix)
}
t.Logf("Get key=%q, value=%q (peer:key)", key, value)
}

}

func makeHTTPServerUniverse(ctx context.Context, t testing.TB, addresses []string, listener net.Listener) {
universe := gc.NewUniverse(NewHTTPFetchProtocol(nil), "http://"+listener.Addr().String())
func makeHTTPServerUniverse(ctx context.Context, t testing.TB, galaxyName string, addresses []string, listener net.Listener) {
universe := gc.NewUniverse(NewHTTPFetchProtocol(nil), listener.Addr().String())
serveMux := http.NewServeMux()
wrappedHandler := &ochttp.Handler{Handler: serveMux}
RegisterHTTPHandler(universe, nil, serveMux)
err := universe.Set(addrToURL(addresses)...)
err := universe.Set(addresses...)
if err != nil {
t.Errorf("Error setting peers: %s", err)
}
getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error {
dest.UnmarshalBinary([]byte(":" + key))
return nil
})
universe.NewGalaxy("peerFetchTest", 1<<20, getter)
universe.NewGalaxy(galaxyName, 1<<20, getter)
newServer := http.Server{Handler: wrappedHandler}
go func() {
err := newServer.Serve(listener)
Expand All @@ -123,11 +142,3 @@ func testKeys(n int) (keys []string) {
}
return
}

func addrToURL(addr []string) []string {
url := make([]string, len(addr))
for i := range addr {
url[i] = "http://" + addr[i]
}
return url
}
23 changes: 23 additions & 0 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package galaxycache

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -151,3 +152,25 @@ type FetchProtocol interface {
// data from that peer
NewFetcher(url string) (RemoteFetcher, error)
}

// NullFetchProtocol implements FetchProtocol, but always returns errors.
// (useful for unit-testing)
type NullFetchProtocol struct{}

// NewFetcher instantiates the connection between the current and a
// remote peer and returns a RemoteFetcher to be used for fetching
// data from that peer
func (n *NullFetchProtocol) NewFetcher(url string) (RemoteFetcher, error) {
return &nullFetchFetcher{}, nil
}

type nullFetchFetcher struct{}

func (n *nullFetchFetcher) Fetch(context context.Context, galaxy string, key string) ([]byte, error) {
return nil, errors.New("empty fetcher")
}

// Close closes a client-side connection (may be a nop)
func (n *nullFetchFetcher) Close() error {
return nil
}

0 comments on commit ed31064

Please sign in to comment.