5
5
"errors"
6
6
"fmt"
7
7
"runtime/debug"
8
+ "sync/atomic"
8
9
)
9
10
10
11
var (
@@ -21,76 +22,23 @@ type Nursery interface {
21
22
22
23
type nursery struct {
23
24
context.Context
24
- cancel func ()
25
- panics chan any
26
- errors chan error
27
-
28
- maxRoutines int
29
- routineDone chan error
30
-
31
- goRoutine chan func () error
32
- onError func (error )
25
+ cancel func ()
26
+ onError func (error )
27
+ errors chan error
28
+ limiter limiter
29
+ goRoutine chan func () error
30
+ routinesCount atomic.Int32
33
31
}
34
32
35
- func newNursery (ctx context.Context ) * nursery {
36
- ctx , cancel := context .WithCancel (ctx )
33
+ func newNursery () * nursery {
37
34
n := & nursery {
38
- Context : ctx ,
39
- cancel : cancel ,
40
- panics : make (chan any ),
35
+ Context : nil ,
36
+ cancel : nil ,
41
37
errors : make (chan error ),
38
+ limiter : nil ,
42
39
goRoutine : make (chan func () error ),
43
40
}
44
41
45
- // Event loop.
46
- go func () {
47
- done := false
48
- routinesCount := 0
49
- routineDone := make (chan error )
50
- for ! done {
51
- handleRoutineDone := func (routineValue error ) {
52
- routinesCount --
53
- if gpanic , isPanic := routineValue .(GoroutinePanic ); isPanic {
54
- // Cancel all routines.
55
- n .cancel ()
56
- n .panics <- gpanic
57
- } else if routineValue != nil {
58
- n .errors <- routineValue
59
- }
60
- if routinesCount == 0 {
61
- close (routineDone )
62
- close (n .panics )
63
- close (n .errors )
64
- n .cancel ()
65
- done = true
66
- }
67
- }
68
-
69
- // We can spawn routine.
70
- if routinesCount < n .maxRoutines || n .maxRoutines <= 0 {
71
- select {
72
- case routine := <- n .goRoutine :
73
- routinesCount ++
74
- go func () {
75
- defer catchPanics (routineDone )
76
- err := routine ()
77
- if err != nil && n .onError != nil {
78
- n .onError (err )
79
- }
80
- routineDone <- err
81
- }()
82
-
83
- case routineValue := <- routineDone :
84
- handleRoutineDone (routineValue )
85
- }
86
- } else {
87
- // We can't spawn routine.
88
- routineValue := <- routineDone
89
- handleRoutineDone (routineValue )
90
- }
91
- }
92
- }()
93
-
94
42
return n
95
43
}
96
44
@@ -116,36 +64,74 @@ func (n *nursery) mustNotBeDone() {
116
64
func (n * nursery ) Go (routine func () error ) {
117
65
n .mustNotBeDone ()
118
66
67
+ n .routinesCount .Add (1 )
68
+ if n .limiter == nil {
69
+ select {
70
+ case n .goRoutine <- routine :
71
+ // Successfully reused a goroutine.
72
+ default :
73
+ // No goroutine available, spawn a new one.
74
+ n .goNew (routine )
75
+ }
76
+ } else {
77
+ select {
78
+ case n .limiter <- struct {}{}:
79
+ // We are below our limit.
80
+ n .goNew (routine )
81
+ case n .goRoutine <- routine :
82
+ // Successfully reused a goroutine.
83
+ }
84
+ }
85
+ }
86
+
87
+ func (n * nursery ) goNew (routine func () error ) {
88
+ go func () {
89
+ defer catchPanics (n .errors )
90
+ for {
91
+ select {
92
+ case <- n .Done ():
93
+ // Nursery is done, we can free this goroutine.
94
+ return
95
+ case r := <- n .goRoutine :
96
+ n .errors <- r ()
97
+ }
98
+ }
99
+ }()
100
+
119
101
n .goRoutine <- routine
120
102
}
121
103
122
- // Block starts a nursery block that returns when all goroutines have
123
- // returned. If a goroutine panic, context is canceled and panic is immediately
124
- // forwarded without waiting for other goroutines to handle context cancellation.
125
- // Errors returned by goroutines are joined and returned at the end of the block.
104
+ // Block starts a nursery block that returns when all goroutines have returned.
105
+ // If a goroutine panic, context is canceled and panic is immediately forwarded
106
+ // without waiting for other goroutines to handle context cancellation. Errors
107
+ // returned by goroutines are joined and returned at the end of the block.
126
108
func Block (block func (n Nursery ) error , opts ... BlockOption ) (err error ) {
127
- n := newNursery (context .Background ())
128
-
109
+ n := newNursery ()
129
110
for _ , opt := range opts {
130
111
opt (n )
131
112
}
132
113
114
+ // Default context.
115
+ if n .Context == nil {
116
+ n .Context , n .cancel = context .WithCancel (context .Background ())
117
+ }
118
+ defer n .cancel ()
119
+
120
+ // Start block.
133
121
n .Go (func () error {
134
- block (n )
135
- return nil
122
+ return block (n )
136
123
})
137
124
138
- // Wait for all routine to be done.
139
- loop:
125
+ // Event loop.
140
126
for {
141
- select {
142
- case panicValue := <- n . panics :
143
- if panicValue != nil {
144
- panic ( panicValue )
145
- }
146
- break loop
147
- case e := <- n . errors :
148
- err = errors . Join ( err , e )
127
+ e := <- n . errors
128
+ if panicValue , isPanic := e .( GoroutinePanic ); isPanic {
129
+ panic ( panicValue )
130
+ }
131
+ err = errors . Join ( err , e )
132
+ count := n . routinesCount . Add ( - 1 )
133
+ if count == 0 {
134
+ break
149
135
}
150
136
}
151
137
@@ -176,3 +162,5 @@ func (gp GoroutinePanic) Unwrap() error {
176
162
177
163
return nil
178
164
}
165
+
166
+ type limiter chan struct {}
0 commit comments