-
Notifications
You must be signed in to change notification settings - Fork 10
/
transform.go
216 lines (181 loc) · 6.38 KB
/
transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package rill
import (
"github.com/destel/rill/internal/core"
)
// Map takes a stream of items of type A and transforms them into items of type B using a function f.
// Returns a new stream of transformed items.
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedMap], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
b, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}
return Try[B]{Value: b}, true
})
}
// OrderedMap is the ordered version of [Map].
func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
b, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}
return Try[B]{Value: b}, true
})
}
// Filter takes a stream of items of type A and filters them using a predicate function f.
// Returns a new stream of items that passed the filter.
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedFilter], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
keep, err := f(a.Value)
if err != nil {
return Try[A]{Error: err}, true // never filter out errors
}
return a, keep
})
}
// OrderedFilter is the ordered version of [Filter].
func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
keep, err := f(a.Value)
if err != nil {
return Try[A]{Error: err}, true // never filter out errors
}
return a, keep
})
}
// FilterMap takes a stream of items of type A, applies a function f that can filter and transform them into items of type B.
// Returns a new stream of transformed items that passed the filter. This operation is equivalent to a
// [Filter] followed by a [Map].
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedFilterMap], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func FilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}
return Try[B]{Value: b}, keep
})
}
// OrderedFilterMap is the ordered version of [FilterMap].
func OrderedFilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}
return Try[B]{Value: b}, keep
})
}
// FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f.
// Those sub-streams are then flattened into a single output stream, which is returned.
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedFlatMap], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
if in == nil {
return nil
}
out := make(chan Try[B])
core.Loop(in, out, n, func(a Try[A]) {
if a.Error != nil {
out <- Try[B]{Error: a.Error}
return
}
bb := f(a.Value)
for b := range bb {
out <- b
}
})
return out
}
// OrderedFlatMap is the ordered version of [FlatMap].
func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
if in == nil {
return nil
}
out := make(chan Try[B])
core.OrderedLoop(in, out, n, func(a Try[A], canWrite <-chan struct{}) {
if a.Error != nil {
<-canWrite
out <- Try[B]{Error: a.Error}
return
}
bb := f(a.Value)
<-canWrite
for b := range bb {
out <- b
}
})
return out
}
// Catch allows handling errors in the middle of a stream processing pipeline.
// Every error encountered in the input stream is passed to the function f for handling.
//
// The outcome depends on the return value of f:
// - If f returns nil, the error is considered handled and filtered out from the output stream.
// - If f returns a non-nil error, the original error is replaced with the result of f.
//
// This is a non-blocking unordered function that handles errors concurrently using n goroutines.
// An ordered version of this function, [OrderedCatch], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
err := f(a.Error)
if err == nil {
return a, false // error handled, filter out
}
return Try[A]{Error: err}, true // error replaced by f(a.Error)
})
}
// OrderedCatch is the ordered version of [Catch].
func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
err := f(a.Error)
if err == nil {
return a, false // error handled, filter out
}
return Try[A]{Error: err}, true // error replaced by f(a.Error)
})
}