Skip to content

Commit

Permalink
feat(coroutine): Optimize Concurrent and Parallel (#75)
Browse files Browse the repository at this point in the history
Signed-off-by: Flc゛ <[email protected]>
  • Loading branch information
flc1125 authored Jan 25, 2024
1 parent d3d1bdc commit 99a279b
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 28 deletions.
45 changes: 45 additions & 0 deletions coroutine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Coroutine

## Usage

```go
package main

import (
"log"

"github.com/go-kratos-ecosystem/components/v2/coroutine"
)

func main() {
funcs := []func(){
func() {
log.Println("1")
},
func() {
log.Println("2")
},
func() {
log.Println("3")
},
}

// Concurrent Example1
c := coroutine.NewConcurrent(2)
defer c.Close()
c.Add(funcs...)

c.Wait()

// Concurrent Example2
coroutine.RunConcurrent(2, funcs...)

// Parallel Example1
p := coroutine.NewParallel()
p.Add(funcs...)
p.Wait()

// Parallel Example2
coroutine.RunParallel(funcs...)
}
```
56 changes: 40 additions & 16 deletions coroutine/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,48 @@ package coroutine

import "sync"

func Concurrent(limit int, tasks ...func()) {
var (
wg sync.WaitGroup
ch = make(chan struct{}, limit)
)
defer close(ch)
wg.Add(len(tasks))

for _, task := range tasks {
ch <- struct{}{}
go func(task func()) {
type Concurrent struct {
wg sync.WaitGroup
ch chan struct{}
fs []func()
}

func NewConcurrent(limit int) *Concurrent {
return &Concurrent{
wg: sync.WaitGroup{},
ch: make(chan struct{}, limit),
fs: make([]func(), 0),
}
}

func (c *Concurrent) Add(fs ...func()) *Concurrent {
c.fs = append(c.fs, fs...)
return c
}

func (c *Concurrent) Wait() {
c.wg.Add(len(c.fs))

for _, f := range c.fs {
c.ch <- struct{}{}
go func(f func()) {
defer func() {
<-ch
wg.Done()
<-c.ch
c.wg.Done()
}()
task()
}(task)
f()
}(f)
}

wg.Wait()
c.wg.Wait()
}

func (c *Concurrent) Close() {
close(c.ch)
}

func RunConcurrent(limit int, tasks ...func()) {
c := NewConcurrent(limit).Add(tasks...)
defer c.Close()
c.Wait()
}
2 changes: 1 addition & 1 deletion coroutine/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestConcurrent(t *testing.T) {
mu sync.Mutex
)

Concurrent(2, func() {
RunConcurrent(2, func() {
time.Sleep(1 * time.Second)
mu.Lock()
defer mu.Unlock()
Expand Down
40 changes: 30 additions & 10 deletions coroutine/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,36 @@ package coroutine

import "sync"

func Parallel(tasks ...func()) {
var wg sync.WaitGroup
wg.Add(len(tasks))

for _, task := range tasks {
go func(task func()) {
defer wg.Done()
task()
}(task)
type Parallel struct {
wg sync.WaitGroup
fs []func()
}

func NewParallel() *Parallel {
return &Parallel{
wg: sync.WaitGroup{},
fs: make([]func(), 0),
}
}

func (p *Parallel) Add(fs ...func()) *Parallel {
p.fs = append(p.fs, fs...)
return p
}

func (p *Parallel) Wait() {
p.wg.Add(len(p.fs))

for _, f := range p.fs {
go func(f func()) {
defer p.wg.Done()
f()
}(f)
}

wg.Wait()
p.wg.Wait()
}

func RunParallel(fs ...func()) {
NewParallel().Add(fs...).Wait()
}
2 changes: 1 addition & 1 deletion coroutine/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestParallel(t *testing.T) {
mu sync.Mutex
)

Parallel(func() {
RunParallel(func() {
time.Sleep(1 * time.Second)
mu.Lock()
defer mu.Unlock()
Expand Down

0 comments on commit 99a279b

Please sign in to comment.