Skip to content

Commit

Permalink
Merge pull request #21 from vimeo/consistenthash_replicated
Browse files Browse the repository at this point in the history
consistenthash: Add a GetReplicated method
  • Loading branch information
dfinkel authored Dec 11, 2020
2 parents a04b8f1 + 2e38d0b commit 553b3a0
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 1 deletion.
70 changes: 69 additions & 1 deletion consistenthash/consistenthash.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package consistenthash // import "github.com/vimeo/galaxycache/consistenthash"

import (
"encoding/binary"
"hash/crc32"
"sort"
"strconv"
Expand Down Expand Up @@ -76,6 +77,11 @@ func (m *Map) Get(key string) string {

hash := m.hash([]byte(key))

_, _, owner := m.findSegmentOwner(hash)
return owner
}

func (m *Map) findSegmentOwner(hash uint32) (int, uint32, string) {
// Binary search for appropriate replica.
idx := sort.Search(len(m.keyHashes), func(i int) bool { return m.keyHashes[i] >= hash })

Expand All @@ -84,5 +90,67 @@ func (m *Map) Get(key string) string {
idx = 0
}

return m.hashMap[m.keyHashes[idx]]
return idx, m.keyHashes[idx], m.hashMap[m.keyHashes[idx]]
}

func (m *Map) prevSegmentOwner(idx int, lastSegHash, hash uint32) (int, uint32, string) {
if len(m.keys) == 1 {
panic("attempt to find alternate owner for single-key map")
}
if idx == 0 {
// if idx is 0, then wrap around
return m.prevSegmentOwner(len(m.keyHashes)-1, lastSegHash, hash)
}

// we're moving backwards within a ring; decrement the index
idx--

return idx, m.keyHashes[idx], m.hashMap[m.keyHashes[idx]]
}

func (m *Map) idxedKeyReplica(key string, replica int) uint32 {
// For replica zero, do not append a suffix so Get() and GetReplicated are compatible
if replica == 0 {
return m.hash([]byte(key))
}
// Allocate an extra 2 bytes so we have 2 bytes of padding to function
// as a separator between the main key and the suffix
idxSuffixBuf := [binary.MaxVarintLen64 + 2]byte{}
// Set those 2 bytes of padding to a nice non-zero value with
// alternating zeros and ones.
idxSuffixBuf[0] = 0xaa
idxSuffixBuf[1] = 0xaa

// Encode the replica using unsigned varints which are more compact and cheaper to encode.
// definition: https://developers.google.com/protocol-buffers/docs/encoding#varints
vIntLen := binary.PutUvarint(idxSuffixBuf[2:], uint64(replica))

idxHashKey := append([]byte(key), idxSuffixBuf[:vIntLen+2]...)
return m.hash(idxHashKey)
}

// GetReplicated gets the closest item in the hash to a deterministic set of
// keyReplicas variations of the provided key.
// The returned set of segment-owning keys is dedup'd, and collisions are
// resolved by traversing backwards in the hash-ring to find an unused
// owning-key.
func (m *Map) GetReplicated(key string, keyReplicas int) []string {
if m.IsEmpty() {
return []string{}
}
out := make([]string, 0, keyReplicas)
segOwners := make(map[string]struct{}, keyReplicas)

for i := 0; i < keyReplicas && len(out) < len(m.keys); i++ {
h := m.idxedKeyReplica(key, i)
segIdx, segBound, owner := m.findSegmentOwner(h)
for _, present := segOwners[owner]; present; _, present = segOwners[owner] {
// this may overflow, which is fine.
segIdx, segBound, owner = m.prevSegmentOwner(segIdx, segBound, h)
}
segOwners[owner] = struct{}{}
out = append(out, owner)
}

return out
}
93 changes: 93 additions & 0 deletions consistenthash/consistenthash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,61 @@ func TestConsistency(t *testing.T) {
hash1.Get("Bonny") != hash2.Get("Bonny") {
t.Errorf("Direct matches should always return the same entry")
}
if hash1.Get("Ben") != hash2.GetReplicated("Ben", 1)[0] ||
hash1.Get("Bob") != hash2.GetReplicated("Bob", 1)[0] ||
hash1.Get("Bonny") != hash2.GetReplicated("Bonny", 1)[0] {
t.Errorf("Direct matches should always return the same entry with GetReplicated")
}

}

func TestGetReplicated(t *testing.T) {
hr := New(20, nil)
hr.Add("Bill", "Bob", "Bonny", "Clyde", "Computer", "Long")

for _, itbl := range []struct {
replicas int
expKeys int
}{
{replicas: 4, expKeys: 4},
{replicas: 5, expKeys: 5},
{replicas: 6, expKeys: 6},
{replicas: 7, expKeys: 6},
{replicas: 8, expKeys: 6},
{replicas: 9, expKeys: 6},
{replicas: 10, expKeys: 6},
{replicas: 11, expKeys: 6},
{replicas: 12, expKeys: 6},
{replicas: 13, expKeys: 6},
{replicas: 14, expKeys: 6},
} {
tbl := itbl
t.Run(fmt.Sprintf("repl%d", tbl.replicas), func(t *testing.T) {
oneHash := hr.Get("zombie")
multiHash := hr.GetReplicated("zombie", tbl.replicas)
if len(multiHash) != tbl.expKeys {
t.Fatalf("unexpected length of return from GetReplicated: %d (expected %d)",
len(multiHash), tbl.expKeys)
}
if multiHash[0] != oneHash {
t.Errorf("element zero from GetReplicated should match Get; got %q and %q respectively",
multiHash[0], oneHash)
}
for k := range hr.keys {
if hr.Get(k) != hr.GetReplicated(k, 1)[0] {
t.Errorf("Direct matches should always return the same entry with GetReplicated")
}
// Check that a manually constructed key gets the same hash as a
// computed one (this way we have a test ensuring stability across
// versions)
if gdo, gro := hr.idxedKeyReplica(string(append([]byte(k), 0xaa, 0xaa, 0x01)), 0), hr.idxedKeyReplica(k, 1); gdo != gro {
t.Errorf("mismatched second object-hashes for %q direct() = %d; keyed() = %d", k, gdo, gro)
}
}
})
}
}

func BenchmarkGet(b *testing.B) {
for _, itbl := range []struct {
segsPerKey int
Expand Down Expand Up @@ -116,3 +168,44 @@ func BenchmarkGet(b *testing.B) {
})
}
}

func BenchmarkGetReplicated(b *testing.B) {
for _, itbl := range []struct {
segsPerKey int
shards int
replicas int
}{
{segsPerKey: 50, shards: 8, replicas: 2},
{segsPerKey: 50, shards: 32, replicas: 2},
{segsPerKey: 50, shards: 128, replicas: 2},
{segsPerKey: 50, shards: 512, replicas: 2},
{segsPerKey: 50, shards: 8, replicas: 4},
{segsPerKey: 50, shards: 32, replicas: 4},
{segsPerKey: 50, shards: 128, replicas: 4},
{segsPerKey: 50, shards: 512, replicas: 4},
{segsPerKey: 50, shards: 8, replicas: 8},
{segsPerKey: 50, shards: 32, replicas: 8},
{segsPerKey: 50, shards: 128, replicas: 8},
{segsPerKey: 50, shards: 512, replicas: 8},
} {
tbl := itbl
b.Run(fmt.Sprintf("segs%d-shards%d-reps%d", tbl.segsPerKey, tbl.shards, tbl.replicas), func(b *testing.B) {
b.ReportAllocs()

hash := New(tbl.segsPerKey, nil)

var buckets []string
for i := 0; i < tbl.shards; i++ {
buckets = append(buckets, fmt.Sprintf("shard-%d", i))
}

hash.Add(buckets...)

b.ResetTimer()

for i := 0; i < b.N; i++ {
hash.GetReplicated(buckets[i&(tbl.shards-1)], tbl.replicas)
}
})
}
}

0 comments on commit 553b3a0

Please sign in to comment.