From 2e38d0bbdeb32f783de1775f2ec2654783523868 Mon Sep 17 00:00:00 2001 From: David Finkel Date: Thu, 10 Dec 2020 16:07:55 -0500 Subject: [PATCH] consistenthash: Add a GetReplicated method Some use-cases require multiple owners for a key. Add a `GetReplicated` method to `consistenthash.Map` that returns a slice of owners. This method tries to guarantee that the requested number of replicas is returned. --- consistenthash/consistenthash.go | 70 +++++++++++++++++++- consistenthash/consistenthash_test.go | 93 +++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) diff --git a/consistenthash/consistenthash.go b/consistenthash/consistenthash.go index de32004e..c45692aa 100644 --- a/consistenthash/consistenthash.go +++ b/consistenthash/consistenthash.go @@ -18,6 +18,7 @@ limitations under the License. package consistenthash // import "github.com/vimeo/galaxycache/consistenthash" import ( + "encoding/binary" "hash/crc32" "sort" "strconv" @@ -76,6 +77,11 @@ func (m *Map) Get(key string) string { hash := m.hash([]byte(key)) + _, _, owner := m.findSegmentOwner(hash) + return owner +} + +func (m *Map) findSegmentOwner(hash uint32) (int, uint32, string) { // Binary search for appropriate replica. idx := sort.Search(len(m.keyHashes), func(i int) bool { return m.keyHashes[i] >= hash }) @@ -84,5 +90,67 @@ func (m *Map) Get(key string) string { idx = 0 } - return m.hashMap[m.keyHashes[idx]] + return idx, m.keyHashes[idx], m.hashMap[m.keyHashes[idx]] +} + +func (m *Map) prevSegmentOwner(idx int, lastSegHash, hash uint32) (int, uint32, string) { + if len(m.keys) == 1 { + panic("attempt to find alternate owner for single-key map") + } + if idx == 0 { + // if idx is 0, then wrap around + return m.prevSegmentOwner(len(m.keyHashes)-1, lastSegHash, hash) + } + + // we're moving backwards within a ring; decrement the index + idx-- + + return idx, m.keyHashes[idx], m.hashMap[m.keyHashes[idx]] +} + +func (m *Map) idxedKeyReplica(key string, replica int) uint32 { + // For replica zero, do not append a suffix so Get() and GetReplicated are compatible + if replica == 0 { + return m.hash([]byte(key)) + } + // Allocate an extra 2 bytes so we have 2 bytes of padding to function + // as a separator between the main key and the suffix + idxSuffixBuf := [binary.MaxVarintLen64 + 2]byte{} + // Set those 2 bytes of padding to a nice non-zero value with + // alternating zeros and ones. + idxSuffixBuf[0] = 0xaa + idxSuffixBuf[1] = 0xaa + + // Encode the replica using unsigned varints which are more compact and cheaper to encode. + // definition: https://developers.google.com/protocol-buffers/docs/encoding#varints + vIntLen := binary.PutUvarint(idxSuffixBuf[2:], uint64(replica)) + + idxHashKey := append([]byte(key), idxSuffixBuf[:vIntLen+2]...) + return m.hash(idxHashKey) +} + +// GetReplicated gets the closest item in the hash to a deterministic set of +// keyReplicas variations of the provided key. +// The returned set of segment-owning keys is dedup'd, and collisions are +// resolved by traversing backwards in the hash-ring to find an unused +// owning-key. +func (m *Map) GetReplicated(key string, keyReplicas int) []string { + if m.IsEmpty() { + return []string{} + } + out := make([]string, 0, keyReplicas) + segOwners := make(map[string]struct{}, keyReplicas) + + for i := 0; i < keyReplicas && len(out) < len(m.keys); i++ { + h := m.idxedKeyReplica(key, i) + segIdx, segBound, owner := m.findSegmentOwner(h) + for _, present := segOwners[owner]; present; _, present = segOwners[owner] { + // this may overflow, which is fine. + segIdx, segBound, owner = m.prevSegmentOwner(segIdx, segBound, h) + } + segOwners[owner] = struct{}{} + out = append(out, owner) + } + + return out } diff --git a/consistenthash/consistenthash_test.go b/consistenthash/consistenthash_test.go index 91c5e22b..d9533f85 100644 --- a/consistenthash/consistenthash_test.go +++ b/consistenthash/consistenthash_test.go @@ -83,9 +83,61 @@ func TestConsistency(t *testing.T) { hash1.Get("Bonny") != hash2.Get("Bonny") { t.Errorf("Direct matches should always return the same entry") } + if hash1.Get("Ben") != hash2.GetReplicated("Ben", 1)[0] || + hash1.Get("Bob") != hash2.GetReplicated("Bob", 1)[0] || + hash1.Get("Bonny") != hash2.GetReplicated("Bonny", 1)[0] { + t.Errorf("Direct matches should always return the same entry with GetReplicated") + } } +func TestGetReplicated(t *testing.T) { + hr := New(20, nil) + hr.Add("Bill", "Bob", "Bonny", "Clyde", "Computer", "Long") + + for _, itbl := range []struct { + replicas int + expKeys int + }{ + {replicas: 4, expKeys: 4}, + {replicas: 5, expKeys: 5}, + {replicas: 6, expKeys: 6}, + {replicas: 7, expKeys: 6}, + {replicas: 8, expKeys: 6}, + {replicas: 9, expKeys: 6}, + {replicas: 10, expKeys: 6}, + {replicas: 11, expKeys: 6}, + {replicas: 12, expKeys: 6}, + {replicas: 13, expKeys: 6}, + {replicas: 14, expKeys: 6}, + } { + tbl := itbl + t.Run(fmt.Sprintf("repl%d", tbl.replicas), func(t *testing.T) { + oneHash := hr.Get("zombie") + multiHash := hr.GetReplicated("zombie", tbl.replicas) + if len(multiHash) != tbl.expKeys { + t.Fatalf("unexpected length of return from GetReplicated: %d (expected %d)", + len(multiHash), tbl.expKeys) + } + if multiHash[0] != oneHash { + t.Errorf("element zero from GetReplicated should match Get; got %q and %q respectively", + multiHash[0], oneHash) + } + for k := range hr.keys { + if hr.Get(k) != hr.GetReplicated(k, 1)[0] { + t.Errorf("Direct matches should always return the same entry with GetReplicated") + } + // Check that a manually constructed key gets the same hash as a + // computed one (this way we have a test ensuring stability across + // versions) + if gdo, gro := hr.idxedKeyReplica(string(append([]byte(k), 0xaa, 0xaa, 0x01)), 0), hr.idxedKeyReplica(k, 1); gdo != gro { + t.Errorf("mismatched second object-hashes for %q direct() = %d; keyed() = %d", k, gdo, gro) + } + } + }) + } +} + func BenchmarkGet(b *testing.B) { for _, itbl := range []struct { segsPerKey int @@ -116,3 +168,44 @@ func BenchmarkGet(b *testing.B) { }) } } + +func BenchmarkGetReplicated(b *testing.B) { + for _, itbl := range []struct { + segsPerKey int + shards int + replicas int + }{ + {segsPerKey: 50, shards: 8, replicas: 2}, + {segsPerKey: 50, shards: 32, replicas: 2}, + {segsPerKey: 50, shards: 128, replicas: 2}, + {segsPerKey: 50, shards: 512, replicas: 2}, + {segsPerKey: 50, shards: 8, replicas: 4}, + {segsPerKey: 50, shards: 32, replicas: 4}, + {segsPerKey: 50, shards: 128, replicas: 4}, + {segsPerKey: 50, shards: 512, replicas: 4}, + {segsPerKey: 50, shards: 8, replicas: 8}, + {segsPerKey: 50, shards: 32, replicas: 8}, + {segsPerKey: 50, shards: 128, replicas: 8}, + {segsPerKey: 50, shards: 512, replicas: 8}, + } { + tbl := itbl + b.Run(fmt.Sprintf("segs%d-shards%d-reps%d", tbl.segsPerKey, tbl.shards, tbl.replicas), func(b *testing.B) { + b.ReportAllocs() + + hash := New(tbl.segsPerKey, nil) + + var buckets []string + for i := 0; i < tbl.shards; i++ { + buckets = append(buckets, fmt.Sprintf("shard-%d", i)) + } + + hash.Add(buckets...) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hash.GetReplicated(buckets[i&(tbl.shards-1)], tbl.replicas) + } + }) + } +}