-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
192 lines (179 loc) · 5.28 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package gobulk
import (
"fmt"
"go.uber.org/zap"
"golang.org/x/net/context"
)
// Listener checks the input of a iteration with it's format for (new) containers (e.g. files),
// creates records in the tracker (tracks then) and delegates tracked containers download jobs
// to the loader.
type Listener struct {
iteration *Iteration
tracker Tracker
input Input
marker *Container
loader *Loader
initLoader func() *Loader
ready bool
readyChan chan struct{}
logger *zap.Logger
*switcher
}
// NewListener returns a validated and preconfigured listener struct.
func NewListener(
iteration *Iteration,
tracker Tracker,
input Input,
marker *Container,
logger *zap.Logger,
state SwitcherState,
workersLimit int,
) (*Listener, error) {
return &Listener{
iteration: iteration,
tracker: tracker,
input: input,
marker: marker,
initLoader: func() *Loader { return NewLoader(input.Read, int64(workersLimit)) },
readyChan: make(chan struct{}),
logger: logger,
switcher: newSwitcher(state),
}, nil
}
// Listen scans the format's input for containers and tracks and downloads them.
func (l Listener) Listen(ctx context.Context) error {
contCh := make(chan []*Container)
doneCh := make(chan struct{})
errCh := make(chan error)
scanCtx, stopScan := context.WithCancel(ctx)
if l.IsOn() {
l.logger.Info("starting listener")
go l.input.Scan(scanCtx, l.marker, contCh, doneCh, errCh)
}
go l.loader.Run()
defer l.loader.Stop()
if err := l.preloadContainers(); err != nil {
l.logger.Warn("listener failed to preload unfinished containers", zap.NamedError("error_message", err))
}
for {
if l.IsOff() {
l.logger.Info("listener is off and waiting to be launched")
select {
case <-l.On():
l.logger.Info("starting listener")
go l.input.Scan(scanCtx, l.marker, contCh, doneCh, errCh)
case <-ctx.Done():
l.logger.Info("listener has been stopped by context")
return nil
}
} else {
select {
case <-l.Off():
l.logger.Info("listener has been turned off")
stopScan()
scanCtx, stopScan = context.WithCancel(ctx)
case err := <-errCh:
if err := l.handleListenError(err, nil); err != nil {
return err
}
case <-ctx.Done():
l.logger.Info("listener has been stopped by context")
return nil
case <-doneCh:
l.logger.Info("listener has been stopped: all available containers have been scanned")
if !l.ready {
l.markAsReady()
}
return nil
default:
select {
case cs := <-contCh:
l.setContainersIteration(cs)
resp, err := l.tracker.TrackContainers(cs)
if err != nil {
if err := l.handleListenError(err, cs); err != nil {
return err
}
}
if len(resp.Tracked) == 0 {
l.logger.Info("no new containers have been returned by scan")
continue
}
l.marker = resp.Tracked[len(resp.Tracked)-1]
l.loader.Put(resp.Tracked)
l.logger.Info("a bulk of containers has been tracked after scanning",
zap.Int("amount", len(resp.Tracked)),
zap.Int("already_tracked_scanned_containers", len(resp.Conflicted)),
)
if !l.ready {
l.markAsReady()
l.logger.Info("listener is ready to be listened to")
}
default:
}
}
}
}
}
// Ready returns true if the Listener has sent the first containers bulk to the Tracker.
func (l *Listener) Ready() bool {
return l.ready
}
// ReadyChan returns a channel that notifies all subscribers with a value (being closed) once the
// Listener is ready.
func (l *Listener) ReadyChan() <-chan struct{} {
return l.readyChan
}
// Prepare prepares the listener for a run.
func (l *Listener) Prepare() {
l.loader = l.initLoader()
}
// Reset resets the Listener state to the default one.
func (l *Listener) Reset() {
l.ready = false
l.readyChan = make(chan struct{})
}
// preloadContainers gets tracked unfinished containers from the tracker and puts them to the loader
// to speed up their download progress.
func (l *Listener) preloadContainers() error {
containers, err := l.tracker.GetUnfinishedContainers()
if err != nil {
return err
}
l.loader.Put(containers)
l.logger.Info("preloaded unfinished containers to the loader", zap.Int("amount", len(containers)))
return nil
}
// handleListenError tracks the error if it's an Issue, or returns the error otherwise.
func (l *Listener) handleListenError(err error, containers []*Container) error {
issue, ok := err.(*Issue)
if !ok {
return err
}
if len(containers) == 0 {
issue.complete(l.iteration, nil, StepListener)
if err := l.tracker.TrackIssue(issue); err != nil {
return fmt.Errorf("failed to track issue %s: %v", issue.Error(), err)
}
} else {
for _, container := range containers {
issue.complete(l.iteration, container, StepListener)
if err := l.tracker.TrackIssue(issue); err != nil {
return fmt.Errorf("failed to track issue %s: %v", issue.Error(), err)
}
}
}
return nil
}
// setContainersIteration populates the containers Iteration field with the listener one.
func (l *Listener) setContainersIteration(containers []*Container) {
for _, c := range containers {
c.IterationID = l.iteration.ID
}
}
// markAsReady sets the Listener ready field to true and notifies all the l.readyChan listeners
// about the Listener is ready.
func (l *Listener) markAsReady() {
l.ready = true
close(l.readyChan)
}