generated from devnw/oss-template
-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstream.go
252 lines (218 loc) · 5.48 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package stream
import (
"context"
"reflect"
"sync"
"go.devnw.com/gen"
)
// Pipe accepts an incoming data channel and pipes it to the supplied
// outgoing data channel.
//
// NOTE: Execute the Pipe function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func Pipe[T any](
ctx context.Context, in <-chan T, out chan<- T,
) {
ctx = _ctx(ctx)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- v:
}
}
}
}
type InterceptFunc[T, U any] func(context.Context, T) (U, bool)
// Intercept accepts an incoming data channel and a function literal that
// accepts the incoming data and returns data of the same type and a boolean
// indicating whether the data should be forwarded to the output channel.
// The function is executed for each data item in the incoming channel as long
// as the context is not canceled or the incoming channel remains open.
func Intercept[T, U any](
ctx context.Context,
in <-chan T,
fn InterceptFunc[T, U],
) <-chan U {
ctx = _ctx(ctx)
out := make(chan U)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
// Executing this in a function literal ensures that any panic
// will be caught during execution of the function
func() {
// Determine if the function was successful
result, ok := fn(ctx, v)
if !ok {
return
}
// Execute the function against the incoming value
// and send the result to the output channel.
select {
case <-ctx.Done():
return
case out <- result:
}
}()
}
}
}()
return out
}
// FanIn accepts incoming data channels and forwards returns a single channel
// that receives all the data from the supplied channels.
//
// NOTE: The transfer takes place in a goroutine for each channel
// so ensuring that the context is canceled or the incoming channels
// are closed is important to ensure that the goroutine is terminated.
func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
ctx = _ctx(ctx)
out := make(chan T)
if len(in) == 0 {
defer close(out)
return out
}
var wg sync.WaitGroup
defer func() {
go func() {
wg.Wait()
close(out)
}()
}()
wg.Add(len(in))
for _, i := range in {
// Pipe the result of the channel to the output channel.
go func(i <-chan T) {
defer wg.Done()
Pipe(ctx, i, out)
}(i)
}
return out
}
// FanOut accepts an incoming data channel and copies the data to each of the
// supplied outgoing data channels.
//
// NOTE: Execute the FanOut function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func FanOut[T any](
ctx context.Context, in <-chan T, out ...chan<- T,
) {
ctx = _ctx(ctx)
if len(out) == 0 {
return
}
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
// Closure to catch panic on closed channel write.
selectCases := make([]reflect.SelectCase, 0, len(out)+1)
// 0 index is context
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
})
for _, outc := range out {
// Skip nil channels until they are non-nil
if outc == nil {
continue
}
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(outc),
Send: reflect.ValueOf(v),
})
}
for len(selectCases) > 1 {
chosen, _, _ := reflect.Select(selectCases)
// The context was canceled.
if chosen == 0 {
return
}
selectCases = gen.Exclude(selectCases, selectCases[chosen])
}
}
}
}
// Distribute accepts an incoming data channel and distributes the data among
// the supplied outgoing data channels using a dynamic select statement.
//
// NOTE: Execute the Distribute function in a goroutine if parallel execution is
// desired. Canceling the context or closing the incoming channel is important
// to ensure that the goroutine is properly terminated.
func Distribute[T any](
ctx context.Context, in <-chan T, out ...chan<- T,
) {
ctx = _ctx(ctx)
if len(out) == 0 {
return
}
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
selectCases := make([]reflect.SelectCase, 0, len(out)+1)
for _, outc := range out {
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(outc),
Send: reflect.ValueOf(v),
})
}
selectCases = append(selectCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
})
_, _, _ = reflect.Select(selectCases)
}
}
}
// Drain accepts a channel and drains the channel until the channel is closed
// or the context is canceled.
func Drain[T any](ctx context.Context, in <-chan T) {
ctx = _ctx(ctx)
go func() {
for {
select {
case <-ctx.Done():
return
case _, ok := <-in:
if !ok {
return
}
}
}
}()
}
// Any accepts an incoming data channel and converts the channel to a readonly
// channel of the `any` type.
func Any[T any](ctx context.Context, in <-chan T) <-chan any {
return Intercept(ctx, in, func(_ context.Context, in T) (any, bool) {
return in, true
})
}