diff --git a/src/blogs/backend/GolangSync.md b/src/blogs/backend/GolangSync.md index 30f8c98..3d6bcb3 100644 --- a/src/blogs/backend/GolangSync.md +++ b/src/blogs/backend/GolangSync.md @@ -5,9 +5,9 @@ tags: - Golang categories: - 后端 -date: 2024-02-06 23:34:42 +date: 2024-02-05 23:34:42 --- - + Go中可以使用一个`go`关键字让程序异步执行 @@ -17,11 +17,11 @@ Go中可以使用一个`go`关键字让程序异步执行 func main() { - go do1() - - go do2() - - go do3() +go do1() + +go do2() + +go do3() } @@ -31,7 +31,7 @@ func main() { for i := range []int{1,2,3}{ - go do(i) +go do(i) } @@ -39,21 +39,18 @@ for i := range []int{1,2,3}{ ``` - - 如果了解Go并发机制,就知道`main`在其他goroutine运行完成之前就已经结束了,所以上面代码的运行结果是不符合预期的。我们需要使用一种叫做并发控制的手段,来保证程序正确运行 举个例子: 已知有一个现成的函数`search`,能够按照关键词执行搜索 -期望实现一个新的函数`coSearch`能够进行批量查询 +期望实现一个新的函数`coSearch`能够进行批量 ```go package main - import ( @@ -67,7 +64,6 @@ import ( ) - func search(ctx context.Context, word string) (string, error) { @@ -81,19 +77,17 @@ return fmt.Sprintf("result: %s", word), nil // 模拟结果 } - func coSearch(ctx context.Context, words []string) (results []string, err error) { //tbd - return } - + func main() { @@ -109,24 +103,23 @@ return } - + fmt.Println(results) } ``` - + ## **并发控制基础** `sync.WaitGroup`是Go标准库中用来控制并发的结构,这里放一个使用`WaitGroup`实现`coSearch`的示例 - ```go package main - + import ( @@ -140,7 +133,7 @@ import ( ) - + func search(ctx context.Context, word string) (string, error) { @@ -154,7 +147,7 @@ return fmt.Sprintf("result: %s", word), nil // 模拟结果 } - + func coSearch(ctx context.Context, words []string) ([]string, error) { @@ -170,19 +163,19 @@ err error ) - + for i, word := range words { wg.Add(1) - + go func(word string, i int) { defer wg.Done() - + result, e := search(ctx, word) @@ -194,13 +187,13 @@ err = e }) - + return } - + results[i] = result @@ -208,17 +201,17 @@ results[i] = result } - + wg.Wait() - + return results, err } - + func main() { @@ -234,7 +227,6 @@ return } - fmt.Println(results) @@ -242,32 +234,20 @@ fmt.Println(results) ``` - - 上面的代码中有非常多的细节,来逐个聊一聊 - - ### `sync.WaitGroup{}`并发控制 - - `sync.WaitGroup{}`的用法非常简洁 - - - 当新运行一个goroutine时,我们需要调用`wg.Add(1)` - 当一个goroutine运行完成的时候,我们需要调用`wg.Done()` - `wg.Wait()`让程序阻塞在此处,直到所有的goroutine运行完毕。 - - 对于`coSearch`来说,等待所有goroutine运行完成,也就完成了函数的任务,返回最终的结果 - - ```go var ( @@ -278,13 +258,11 @@ wg = sync.WaitGroup{} ) - for i, word := range words { wg.Add(1) - go func(word string, i int) { @@ -296,29 +274,22 @@ defer wg.Done() } - wg.Wait() ``` - - ### `for`循环中的goroutine - - 这是一个Go经典错误,如果goroutine中使用了`for`迭代的变量,所有goroutine都会获得最后一次循环的值。例如下面的示例,并不会输出"a", "b", "c" 而是输出 "c", "c", "c" - - ```go func main() { done := make(chan bool) - + values := []string{"a", "b", "c"} @@ -334,7 +305,7 @@ done <- true } - + // wait for all goroutines to complete before exiting @@ -348,12 +319,8 @@ for _ = range values { ``` - - 正确的做法就是像上文示例一样,将迭代的变量赋值给函数参数,或者赋值给新的变量 - - ```go for i, word := range words { @@ -368,7 +335,7 @@ go func(word string, i int) { } - + for i, word := range words { @@ -384,9 +351,7 @@ go func() { ``` - > 由于这个错误实在太常见,从Go 1.22开始Go已经修正了这个经典的错误:Fixing For Loops in Go 1.22。 - > 不过Go 1.22默认不会开启修正,需要设置环境变量`GOEXPERIMENT=loopvar`才会 开启 ### 并发安全 @@ -407,7 +372,6 @@ goroutine1 goroutine2 goroutine3 ``` - 这也意味着如果使用`results = append(results, result)`的方式并发赋值,因为会涉及到slice的扩容等操作,所以并不是并发安全的,需要利用`sync.Mutex{}`进行加锁 如果想尽可能的提高程序的并发性能,推荐使用 `results[i] = result`这种方式赋值 @@ -424,7 +388,7 @@ go func(word string, i int) { defer wg.Done() - + result, e := search(ctx, word) @@ -432,13 +396,13 @@ if e != nil && err == nil { err = e - + return } - + results[i] = result @@ -460,7 +424,7 @@ go func(word string, i int) { defer wg.Done() - + result, e := search(ctx, word) @@ -472,13 +436,13 @@ err = e }) - + return } - + results[i] = result @@ -488,18 +452,17 @@ results[i] = result ``` - ### goroutine数量控制 `coSearch`入参的数组可能非常大,如果不加以控制可能导致我们的服务器资源耗尽,我们需要控制并发的数量 -利用带缓冲channel可以实现 +利用带缓冲channel可以实现 ```go tokens := make(chan struct{}, 10) - + for i, word := range words { @@ -507,7 +470,7 @@ tokens <- struct{}{} // 新增 wg.Add(1) - + go func(word string, i int) { @@ -519,7 +482,7 @@ wg.Done() }() - + result, e := search(ctx, word) @@ -531,13 +494,13 @@ err = e }) - + return } - + results[i] = result @@ -545,37 +508,27 @@ results[i] = result } - + wg.Wait() ``` - - 如上,代码中创建了10个缓冲区的channel,当channel被填满时,继续写入会被阻塞;当goroutine运行完成之后,除了原有的`wg.Done()`,我们需要从channel读取走一个数据,来允许新的goroutine运行 - - 通过这种方式,我们控制了`coSearch`最多只能运行10个goroutine,当超过10个时需要等待前面运行的goroutine结束 - - ### context.Context - - 并发执行的goroutine只要有一个出错,其他goroutine就可以停止,没有必要继续执行下去了。如何把取消的事件传导到其他goroutine呢?`context.Context`就是用来传递类似上下文信息的结构 - - ```go ctx, cancel := context.WithCancelCause(ctx) // 新增 defer cancel(nil) // 新增 - + for i, word := range words { @@ -583,7 +536,7 @@ tokens <- struct{}{} wg.Add(1) - + go func(word string, i int) { @@ -595,7 +548,7 @@ wg.Done() }() - + result, e := search(ctx, word) @@ -609,13 +562,13 @@ cancel(e) // 新增 }) - + return } - + results[i] = result @@ -623,27 +576,21 @@ results[i] = result } - + wg.Wait() ``` - - ## **完整的代码** - - 最终完成的效果如下 - - ```go package main - + import ( @@ -657,7 +604,7 @@ import ( ) - + func search(ctx context.Context, word string) (string, error) { @@ -681,7 +628,7 @@ return fmt.Sprintf("result: %s", word), nil // 模拟结果 } - + func coSearch(ctx context.Context, words []string) ([]string, error) { @@ -689,7 +636,7 @@ ctx, cancel := context.WithCancelCause(ctx) defer cancel(nil) - + var ( @@ -697,19 +644,19 @@ wg = sync.WaitGroup{} once = sync.Once{} - + results = make([]string, len(words)) tokens = make(chan struct{}, 2) - + err error ) - + for i, word := range words { @@ -717,7 +664,7 @@ tokens <- struct{}{} wg.Add(1) - + go func(word string, i int) { @@ -729,7 +676,7 @@ wg.Done() }() - + result, e := search(ctx, word) @@ -743,13 +690,13 @@ cancel(e) }) - + return } - + results[i] = result @@ -757,11 +704,11 @@ results[i] = result } - + wg.Wait() - + return results, err @@ -769,25 +716,17 @@ return results, err ``` - - ## **并发控制库errgroup** - - 可以看到要实现一个较为完备的并发控制,需要做的工作非常多。不过Go官方团队为大家准备了 golang.org/x/sync/errgroup - - `errgroup`提供的能力和上文的示例类似,实现方式也类似,包含并发控制,错误传递,`context.Context`传递等 - - ```go package main - + import ( @@ -797,13 +736,13 @@ import ( "sync" - + "golang.org/x/sync/errgroup" ) - + func coSearch(ctx context.Context, words []string) ([]string, error) { @@ -813,13 +752,13 @@ g.SetLimit(10) results := make([]string, len(words)) - + for i, word := range words { i, word := i, word - + g.Go(func() error { @@ -831,7 +770,7 @@ return err } - + results[i] = result @@ -841,11 +780,11 @@ return nil } - + err := g.Wait() - + return results, err @@ -853,8 +792,6 @@ return results, err ``` - - `errgroup`的用法也很简单 - 使用`g, ctx := errgroup.WithContext(ctx)`来创建goroutine的管理器 @@ -863,4 +800,4 @@ return results, err - 类似于`go`关键词,`g.Go`异步执行函数 -- `g.Wait()`和`sync.WaitGroup{}`的`wg.Wait()`类似,会阻塞直到所有goroutine都运行完成,并返回其中一个goroutine的错误 \ No newline at end of file +- `g.Wait()`和`sync.WaitGroup{}`的`wg.Wait()`类似,会阻塞直到所有goroutine都运行完成,并返回其中一个goroutine的错误 diff --git a/src/blogs/backend/GolangTemplate.md b/src/blogs/backend/GolangTemplate.md new file mode 100644 index 0000000..b3f5459 --- /dev/null +++ b/src/blogs/backend/GolangTemplate.md @@ -0,0 +1,271 @@ +--- +title: golang 批量执行任务的通用模板 +author: Rain +tags: + - Golang +categories: + - 后端 +date: 2024-02-06 23:49:03 +--- + +在Go中,批量执行任务的通用模板包括以下步骤: + +首先,有一个需求:接口调用时,接收到一个包含十个元素的列表。希望并发执行这十个任务,每个任务都会返回执行的结果和可能的异常。最后,要将返回的结果整合到一个切片列表中,然后一并返回。 + +为了实现这个需求,首先定义一个结构体 `Order` 用于表示任务的信息: + +```go + +type Order struct { + +Name string `json:"name"` + +Id int `json:"id"` + +} + +``` + +然后,决定并发执行十个任务,因此初始化了两个通道,一个用于接收任务的结果,另一个用于接收异常: + +```go + +taskNum := 10 + +orderCh := make(chan Order, taskNum) // 用于接收返回的结果 + +errCh := make(chan error, taskNum) // 用于接收返回的异常 + +``` + +接下来,创建任务执行函数: + +```go + +func processTask(task Task) { + +// 执行任务的逻辑 + +} + +``` + +然后,启动十个协程来执行这些任务,并使用 `sync.WaitGroup` 来等待它们完成: + +```go + +var wg sync.WaitGroup + + +for i := 0; i < taskNum; i++ { + +wg.Add(1) + +go func() { + +defer wg.Done() + +// 任务的执行逻辑 + +}() + +} + + +// 等待所有任务协程完成 + +wg.Wait() + +``` + +接着,使用 `for-select` 结构从结果通道中接收执行结果: + +```go + +orderList := make([]Order, taskNum) + + +for i := 0; i < taskNum; i++ { + +select { + +case order, ok := <-orderCh: + +if ok { + +orderList = append(orderList, order) + +} + +case err := <-errCh: + +if err != nil { + +return err // 在发现错误时,根据需求选择是继续执行还是返回错误 + +} + +default: + +fmt.Println("done") + +} + +} + +``` + +最后,关闭通道,表示不再发送任务: + +```go + +close(orderCh) + +close(errCh) + +``` + +如果需要控制每个任务的执行时间,可以使用定时器来解决超时问题: + +```go + +timeoutTime := time.Second * 3 // 超时时间 + +taskTimer := time.NewTimer(timeoutTime) // 初始化定时器 + + +for i := 0; i < taskNum; i++ { + +select { + +case <-taskTimer.C: + +err := errors.New("task timeout") + +return err + +// 其他 case 分支处理任务的执行和结果接收 + +} + +// 每次执行都需要重置定时器 + +taskTimer.Reset(timeoutTime) + +} + +``` + +在协程内处理 panic 问题是很重要的,在协程内使用 `defer` 来捕获 panic: + +```go + +for i := 0; i < taskNum; i++ { + +wg.Add(1) + +go func() { + +defer func() { + +wg.Done() + +if r := recover(); r != nil { + +err := errors.New(fmt.Sprintf("System panic: %v", r)) + +errCh <- err + +return + +} + +}() + +// 任务的执行逻辑 + +}() + +} + +``` + +最后,如果需要保持任务执行结果的顺序,可以定义一个带序号的结构体,并通过带序号的通道接收结果: + +```go + +type OrderWithSeq struct { + +Seq int + +OrderItem Order + +} + +orderCh := make(chan OrderWithSeq, taskNum) // 用于接收带序号的结构体 + +``` + +在任务执行时,加入序号信息: + +```go + +for i := 0; i < taskNum; i++ { + +i := i + +wg.Add(1) + +go func() { + +defer wg.Done() + +// 任务的执行逻辑 + +orderCh <- OrderWithSeq{ + +Seq: i, + +OrderItem: res, + +} + +}() + +} + +``` + +最后,在结果接收时,按照带序号的结构体进行排序: + +```go + +orderSeqList := make([]OrderWithSeq, taskNum) + +for i := 0; i < taskNum; i++ { + +select { + +case order, ok := <-orderCh: + +if ok { + +orderList = append(orderSeqList, order) + +} + +// 其他 case 分支处理异常等 + +} + +} + + + +// 按原始顺序进行排序 + +sort.Sort(BySeq(orderSeqList)) + +``` + +这就是一个完整的批量执行任务的通用模板,根据实际需求和场景,可能需要调整其中的一些部分。