-
Notifications
You must be signed in to change notification settings - Fork 11
/
consume.go
139 lines (120 loc) · 3.77 KB
/
consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package rill
import (
"github.com/destel/rill/internal/core"
)
// ForEach applies a function f to each item in an input stream.
//
// This is a blocking unordered function that processes items concurrently using n goroutines.
// When n = 1, processing becomes sequential, making the function ordered and similar to a regular for-range loop.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error {
var retErr error
var once core.OnceWithWait
setReturns := func(err error) {
once.Do(func() {
retErr = err
})
}
go func() {
core.ForEach(in, n, func(a Try[A]) {
if once.WasCalled() {
return // drain
}
err := a.Error
if err == nil {
err = f(a.Value)
}
if err != nil {
setReturns(err)
}
})
setReturns(nil)
}()
once.Wait()
return retErr
}
// Err returns the first error encountered in the input stream or nil if there were no errors.
//
// This is a blocking ordered function that processes items sequentially.
// See the package documentation for more information on blocking ordered functions and error handling.
func Err[A any](in <-chan Try[A]) error {
defer DrainNB(in)
for a := range in {
if a.Error != nil {
return a.Error
}
}
return nil
}
// First returns the first item or error encountered in the input stream, whichever comes first.
// The found return flag is set to false if the stream was empty, otherwise it is set to true.
//
// This is a blocking ordered function that processes items sequentially.
// See the package documentation for more information on blocking ordered functions and error handling.
func First[A any](in <-chan Try[A]) (value A, found bool, err error) {
defer DrainNB(in)
for a := range in {
return a.Value, true, a.Error
}
found = false
return
}
// Any checks if there is an item in the input stream that satisfies the condition f.
// This function returns true as soon as it finds such an item. Otherwise, it returns false.
//
// Any is a blocking unordered function that processes items concurrently using n goroutines.
// When n = 1, processing becomes sequential, making the function ordered.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
var retFound bool
var retErr error
var once core.OnceWithWait
setReturns := func(found bool, err error) {
once.Do(func() {
retFound = found
retErr = err
})
}
go func() {
core.ForEach(in, n, func(a Try[A]) {
if once.WasCalled() {
return // drain
}
if err := a.Error; err != nil {
setReturns(false, err)
return
}
ok, err := f(a.Value)
if err != nil {
setReturns(false, err)
return
}
if ok {
setReturns(true, nil)
return
}
})
setReturns(false, nil)
}()
once.Wait()
return retFound, retErr
}
// All checks if all items in the input stream satisfy the condition f.
// This function returns false as soon as it finds an item that does not satisfy the condition. Otherwise, it returns true,
// including the case when the stream was empty.
//
// This is a blocking unordered function that processes items concurrently using n goroutines.
// When n = 1, processing becomes sequential, making the function ordered.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func All[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
// Idea: x && y && z is the same as !(!x || !y || !z)
// So we can use Any with a negated condition to implement All
res, err := Any(in, n, func(a A) (bool, error) {
ok, err := f(a)
return !ok, err // negate
})
return !res, err // negate
}