diff --git a/gopool.go b/gopool.go index 0c356f7..0875445 100644 --- a/gopool.go +++ b/gopool.go @@ -2,6 +2,7 @@ package gopool import ( "context" + "sort" "sync" "time" ) @@ -153,26 +154,38 @@ func (p *goPool) adjustWorkers() { ticker := time.NewTicker(p.adjustInterval) defer ticker.Stop() + var adjustFlag bool + for { + adjustFlag = false select { case <-ticker.C: p.cond.L.Lock() if len(p.taskQueue) > len(p.workers)*3/4 && len(p.workers) < p.maxWorkers { + adjustFlag = true // Double the number of workers until it reaches the maximum newWorkers := min(len(p.workers)*2, p.maxWorkers) - len(p.workers) for i := 0; i < newWorkers; i++ { worker := newWorker() p.workers = append(p.workers, worker) + // Don't use len(p.workerStack)-1 here, because it will be less than len(p.workers)-1 when the pool is busy p.workerStack = append(p.workerStack, len(p.workers)-1) worker.start(p, len(p.workers)-1) } - } else if len(p.taskQueue) == 0 && len(p.workers) > p.minWorkers { + } else if len(p.taskQueue) == 0 && len(p.workerStack) == len(p.workers) && len(p.workers) > p.minWorkers { + adjustFlag = true // Halve the number of workers until it reaches the minimum removeWorkers := (len(p.workers) - p.minWorkers + 1) / 2 + // Sort the workerStack before removing workers. + // [1,2,3,4,5] -working-> [1,2,3] -expansive-> [1,2,3,6,7] -idle-> [1,2,3,6,7,4,5] + sort.Ints(p.workerStack) p.workers = p.workers[:len(p.workers)-removeWorkers] p.workerStack = p.workerStack[:len(p.workerStack)-removeWorkers] } p.cond.L.Unlock() + if adjustFlag { + p.cond.Broadcast() + } case <-p.ctx.Done(): return } diff --git a/gopool_benchmark_test.go b/gopool_benchmark_test.go index 6790519..550c2f7 100644 --- a/gopool_benchmark_test.go +++ b/gopool_benchmark_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/daniel-hutao/spinlock" "github.com/alitto/pond" + "github.com/daniel-hutao/spinlock" "github.com/panjf2000/ants/v2" )