Skip to content

Commit

Permalink
Allow for configurable number of shards
Browse files Browse the repository at this point in the history
closes orcaman#17
  • Loading branch information
sparrc committed Aug 15, 2016
1 parent cb2afd1 commit fc0dfb6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
24 changes: 15 additions & 9 deletions concurrent_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"sync"
)

var SHARD_COUNT = 32
const SHARD_COUNT = 32

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
// To avoid lock bottlenecks this map is dived to several (len(m)) map shards.
type ConcurrentMap []*ConcurrentMapShared

// A "thread" safe string to anything map.
Expand All @@ -18,9 +18,15 @@ type ConcurrentMapShared struct {
}

// Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
func New(shardCount ...int) ConcurrentMap {
var nShards int
if len(shardCount) > 0 {
nShards = shardCount[0]
} else {
nShards = SHARD_COUNT
}
m := make(ConcurrentMap, nShards)
for i := 0; i < nShards; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
Expand Down Expand Up @@ -94,7 +100,7 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
for i := 0; i < len(m); i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
Expand Down Expand Up @@ -141,7 +147,7 @@ func (m ConcurrentMap) Iter() <-chan Tuple {
ch := make(chan Tuple)
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
// Foreach shard.
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
Expand All @@ -165,7 +171,7 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
ch := make(chan Tuple, m.Count())
go func() {
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
// Foreach shard.
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
Expand Down Expand Up @@ -203,7 +209,7 @@ func (m ConcurrentMap) Keys() []string {
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(len(m))
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
Expand Down
6 changes: 3 additions & 3 deletions concurrent_map_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmap
import "testing"
import "strconv"

var m ConcurrentMap

func BenchmarkItems(b *testing.B) {
m := New()

Expand Down Expand Up @@ -177,10 +179,8 @@ func GetSet(m ConcurrentMap, finished chan struct{}) (set func(key, value string
}

func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
oldShardsCount := SHARD_COUNT
SHARD_COUNT = shardsCount
m = New(shardsCount)
bench(b)
SHARD_COUNT = oldShardsCount
}

func BenchmarkKeys(b *testing.B) {
Expand Down
6 changes: 1 addition & 5 deletions concurrent_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,8 @@ func TestConcurrent(t *testing.T) {
}

func TestJsonMarshal(t *testing.T) {
SHARD_COUNT = 2
defer func() {
SHARD_COUNT = 32
}()
expected := "{\"a\":1,\"b\":2}"
m := New()
m := New(2)
m.Set("a", 1)
m.Set("b", 2)
j, err := json.Marshal(m)
Expand Down

0 comments on commit fc0dfb6

Please sign in to comment.