-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathformat.go
291 lines (259 loc) · 11.3 KB
/
format.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package gobulk
import (
"time"
"go.uber.org/zap"
)
// Format contains methods that describe specific data handling pipeline and provide storages and
// other configuration engaged in import process.
type Format interface {
// Name returns the name of the format.
Name() string
// Setup will be called once when creating a new runner, it can be used to preconfigure or
// initialise the format for forthcoming import process.
Setup() error
// SetIteration is used to later give access to the current iteration using the format.
SetIteration(iteration *Iteration)
// NewIterationOnRestart defines whether after every restart a complete re-import should be
// performed.
NewIterationOnRestart() bool
// Tracker returns the format specific tracker.
Tracker() Tracker
// Input returns the format specific input.
Input() Input
// Output returns the format specific output.
Output() Output
// Parse processes the input element RawData and populates the element ParsedData with the result.
Parse(container *Container, input Element) (Element, error)
// Plan creates executable operations based on the element.
Plan(container *Container, inputElements []Element) ([]*Operation, error)
// ReadStrategy defines the order of containers read from the input. I.e. whether the first or the
// last tracked container should be the starting import point.
ReadStrategy() Strategy
// ContainerBulkSize defines how many containers should be read at a time and then grouped into
// one bulk. This can be handy if the data to be imported is split across many small containers,
// e.g. one file for each document to import instead of one file containing multiple documents.
// To boost performance you can increase the container bulk size.
ContainerBulkSize() int
// ExecutorBulkSize returns the number of operations to perform by executor at a time. This
// value as well as the ContainerBulkSize can be increased in case of small numerous containers.
// It decreases the amount of distinct calls for the Output operations execution by grouping
// operations to bulks.
ExecutorBulkSize() int
// StopOnError returns whether the format processing should be stopped once an error occurred
// with a single container at any step. The true value could mean e.g. that each container
// depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker.
StopOnError() bool
// BeforeIssue gets called whenever an issue gets tracked, it can be used for general format
// specific modifications or for sending notifications or automated infrastructure tasks.
BeforeIssue(issue *Issue) *Issue
// MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format.
MetricsTracker() MetricsTracker
// Logger returns the logger to be used alongside the format.
Logger() *zap.Logger
// ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for
// something it'll return a time in the future, otherwise nil.
ExecutionShouldBeIntermitted() (*time.Time, error)
// ExecutionIsIntermitted returns whether the format execution is still intermitted.
ExecutionIsIntermitted() bool
// SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to
// IntermitUntil.
SetIntermitUntil(t *time.Time)
}
const (
// defaultNewIterationOnRestart is the default result value for the BaseFormat NewIterationOnRestart
// method which means not to create a new iteration on restarts.
defaultNewIterationOnRestart bool = false
// defaultReadStrategy is the default result value for the BaseFormat ReadStrategy method which
// means to read containers from the input from the first to the last.
defaultReadStrategy Strategy = StrategyFIFO
// defaultContainerBulkSize is the default result value for the BaseFormat ContainerBulkSize
// method which means to read containers from the input one by one.
defaultContainerBulkSize int = 1
// defaultExecutorBulkSize is the default result value for the BaseFormat ExecutorBulkSize
// method which means to execute operations one by one.
defaultExecutorBulkSize int = 1
// defaultStopOnError is the default result value for the BaseFormat StopOnError method which
// means to stop the import process on the first occurred error.
defaultStopOnError bool = true
)
var (
// defaultMetricsTracker is the default value for the BaseFormat MetricsTracker method which
// means to skip any metrics tracking.
defaultMetricsTracker MetricsTracker = emptyMetricsTracker{}
)
// buildDefaultLogger creates a default value for the BaseFormat Logger method which commits the
// debug and higher level logs supplemented with the format name as the "context" field value.
func buildDefaultLogger(context string) *zap.Logger {
logger, _ := zap.NewDevelopment()
logger = logger.With(zap.String("context", context))
return logger
}
// FormatOpt is a type that modifies the default BaseFormat behaviour.
type FormatOpt func(f *BaseFormat)
// FormatWithNewIterationOnRestart makes the import process create a new iteration instance each time
// the format is initialised.
var FormatWithNewIterationOnRestart = func() func(f *BaseFormat) {
return func(f *BaseFormat) {
f.newIterationOnRestart = true
}
}
// FormatWithBackwardImport makes the import process read containers from the input from the last
// one to the first one.
var FormatWithBackwardImport = func() func(f *BaseFormat) {
return func(f *BaseFormat) {
f.readStrategy = StrategyLIFO
}
}
// FormatWithContainerBulkSize makes the import process read, parse and plan containers from the
// input by bulks of the specified size.
var FormatWithContainerBulkSize = func(size int) func(f *BaseFormat) {
return func(f *BaseFormat) {
f.containerBulkSize = size
}
}
// FormatWithExecutorBulkSize makes the import execute operations by bulks of the specified size.
var FormatWithExecutorBulkSize = func(size int) func(f *BaseFormat) {
return func(f *BaseFormat) {
f.executorBulkSize = size
}
}
// FormatWithIssuesTracking prevents the import from being stopped when issues occur. Instead, issues
// are saved in the Tracker and the import process remains running with the issued containers skipped.
var FormatWithIssuesTracking = func() func(f *BaseFormat) {
return func(f *BaseFormat) {
f.stopOnError = false
}
}
// FormatWithMetricsTracker makes the import track metrics using the specified MetricsTracker.
var FormatWithMetricsTracker = func(tracker MetricsTracker) func(f *BaseFormat) {
return func(f *BaseFormat) {
f.metricsTracker = tracker
}
}
// FormatWithLogger enhances the format with the passed logger which will be used in import logging.
var FormatWithLogger = func(logger *zap.Logger) func(f *BaseFormat) {
return func(f *BaseFormat) {
f.logger = logger
}
}
// NewBaseFormat creates a new instance of BaseFormat. By default, a BaseFormat with the default
// settings is created and returned, but it's possible to modify the further import process behaviour
// by config optional parameters usage. This func must be a part of all format constructors.
func NewBaseFormat(name string, tracker Tracker, input Input, output Output, opts ...FormatOpt) BaseFormat {
f := BaseFormat{
name: name,
tracker: tracker,
input: input,
output: output,
newIterationOnRestart: defaultNewIterationOnRestart,
readStrategy: defaultReadStrategy,
containerBulkSize: defaultContainerBulkSize,
executorBulkSize: defaultExecutorBulkSize,
stopOnError: defaultStopOnError,
metricsTracker: defaultMetricsTracker,
logger: buildDefaultLogger(name),
}
for _, opt := range opts {
opt(&f)
}
return f
}
// BaseFormat must be embedded to all formats.
type BaseFormat struct {
Iteration *Iteration
tracker Tracker
input Input
output Output
name string
newIterationOnRestart bool
intermitUntil *time.Time
readStrategy Strategy
containerBulkSize int
executorBulkSize int
stopOnError bool
metricsTracker MetricsTracker
logger *zap.Logger
}
// Name returns the name of the format.
func (f *BaseFormat) Name() string {
return f.name
}
// SetIteration is used to later give access to the current iteration using the format.
func (f *BaseFormat) SetIteration(iteration *Iteration) {
f.Iteration = iteration
}
// NewIterationOnRestart defines whether after every restart a complete re-import should be
// performed.
func (f *BaseFormat) NewIterationOnRestart() bool {
return f.newIterationOnRestart
}
// Tracker returns the format specific tracker.
func (f *BaseFormat) Tracker() Tracker {
return f.tracker
}
// Input returns the format specific input.
func (f *BaseFormat) Input() Input {
return f.input
}
// Output returns the format specific output.
func (f *BaseFormat) Output() Output {
return f.output
}
// ReadStrategy defines the order of containers read from the input. I.e. whether the first or the
// last tracked container should be the starting import point.
func (f *BaseFormat) ReadStrategy() Strategy {
return f.readStrategy
}
// ContainerBulkSize defines how many containers should be read at a time and then grouped into
// one bulk. This can be handy if the data to be imported is split across many small containers,
// e.g. one file for each document to import instead of one file containing multiple documents.
// To boost performance you can increase the container bulk size.
func (f *BaseFormat) ContainerBulkSize() int {
return f.containerBulkSize
}
// ExecutorBulkSize returns the number of operations to perform by executor at a time. This value
// as well as the ContainerBulkSize can be increased in case of small numerous containers. It
// decreases the amount of distinct calls for the Output operations execution by grouping
// operations to bulks.
func (f *BaseFormat) ExecutorBulkSize() int {
return f.executorBulkSize
}
// StopOnError returns whether the format processing should be stopped once an error occurred
// with a single container at any step. The true value could mean e.g. that each container
// depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker.
func (f *BaseFormat) StopOnError() bool {
return f.stopOnError
}
// BeforeIssue simply returns the issue without modifying it.
func (f *BaseFormat) BeforeIssue(issue *Issue) *Issue {
return issue
}
// MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format.
func (f *BaseFormat) MetricsTracker() MetricsTracker {
return f.metricsTracker
}
// Logger returns the logger to be used alongside the format.
func (f *BaseFormat) Logger() *zap.Logger {
return f.logger
}
// ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for
// something it'll return a time in the future, otherwise nil.
func (f *BaseFormat) ExecutionShouldBeIntermitted() (*time.Time, error) {
return nil, nil
}
// ExecutionIsIntermitted returns whether the format execution is still intermitted.
func (f *BaseFormat) ExecutionIsIntermitted() bool {
if f.intermitUntil == nil {
return false
}
if f.intermitUntil.Before(time.Now()) {
f.SetIntermitUntil(nil)
return false
}
return true
}
// SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to
// IntermitUntil.
func (f *BaseFormat) SetIntermitUntil(t *time.Time) {
f.intermitUntil = t
}