@@ -12,7 +12,6 @@ import (
12
12
"net/http"
13
13
"net/http/httputil"
14
14
"net/url"
15
- "strings"
16
15
)
17
16
18
17
type BulkService struct {
@@ -23,6 +22,7 @@ type BulkService struct {
23
22
requests []BulkableRequest
24
23
//replicationType string
25
24
//consistencyLevel string
25
+ timeout string
26
26
refresh * bool
27
27
pretty bool
28
28
debug bool
@@ -54,6 +54,11 @@ func (s *BulkService) Type(_type string) *BulkService {
54
54
return s
55
55
}
56
56
57
+ func (s * BulkService ) Timeout (timeout string ) * BulkService {
58
+ s .timeout = timeout
59
+ return s
60
+ }
61
+
57
62
func (s * BulkService ) Refresh (refresh bool ) * BulkService {
58
63
s .refresh = & refresh
59
64
return s
@@ -132,6 +137,9 @@ func (s *BulkService) Do() (*BulkResponse, error) {
132
137
if s .refresh != nil {
133
138
params .Set ("refresh" , fmt .Sprintf ("%v" , * s .refresh ))
134
139
}
140
+ if s .timeout != "" {
141
+ params .Set ("timeout" , s .timeout )
142
+ }
135
143
if len (params ) > 0 {
136
144
urls += "?" + params .Encode ()
137
145
}
@@ -194,154 +202,111 @@ func (s *BulkService) Do() (*BulkResponse, error) {
194
202
return ret , nil
195
203
}
196
204
197
- // Response to bulk execution.
205
+ // BulkResponse is a response to a bulk execution.
206
+ //
207
+ // Example:
208
+ // {
209
+ // "took":3,
210
+ // "errors":false,
211
+ // "items":[{
212
+ // "index":{
213
+ // "_index":"index1",
214
+ // "_type":"tweet",
215
+ // "_id":"1",
216
+ // "_version":3,
217
+ // "status":201
218
+ // }
219
+ // },{
220
+ // "index":{
221
+ // "_index":"index2",
222
+ // "_type":"tweet",
223
+ // "_id":"2",
224
+ // "_version":3,
225
+ // "status":200
226
+ // }
227
+ // },{
228
+ // "delete":{
229
+ // "_index":"index1",
230
+ // "_type":"tweet",
231
+ // "_id":"1",
232
+ // "_version":4,
233
+ // "status":200,
234
+ // "found":true
235
+ // }
236
+ // },{
237
+ // "update":{
238
+ // "_index":"index2",
239
+ // "_type":"tweet",
240
+ // "_id":"2",
241
+ // "_version":4,
242
+ // "status":200
243
+ // }
244
+ // }]
245
+ // }
198
246
type BulkResponse struct {
199
- Took int `json:"took"`
247
+ Took int `json:"took,omitempty"`
248
+ Errors bool `json:"errors,omitempty"`
249
+ Items []map [string ]* BulkResponseItem `json:"items,omitempty"`
200
250
}
201
251
202
- // Generic interface to bulkable requests.
203
- type BulkableRequest interface {
204
- fmt.Stringer
205
- Source () ([]string , error )
252
+ // BulkResponseItem is the result of a single bulk request.
253
+ type BulkResponseItem struct {
254
+ Index string `json:"_index,omitempty"`
255
+ Type string `json:"_type,omitempty"`
256
+ Id string `json:"_id,omitempty"`
257
+ Version int `json:"_version,omitempty"`
258
+ Status int `json:"status,omitempty"`
259
+ Found bool `json:"found,omitempty"`
206
260
}
207
261
208
- // Bulk request to add document to ElasticSearch.
209
- type BulkIndexRequest struct {
210
- BulkableRequest
211
- Index string
212
- Type string
213
- Id string
214
- Data interface {}
262
+ // Indexed returns all bulk request results of "index" actions.
263
+ func (r * BulkResponse ) Indexed () []* BulkResponseItem {
264
+ return r .ByAction ("index" )
215
265
}
216
266
217
- func NewBulkIndexRequest (index , _type , id string , data interface {}) * BulkIndexRequest {
218
- return & BulkIndexRequest {
219
- Index : index ,
220
- Type : _type ,
221
- Id : id ,
222
- Data : data ,
223
- }
267
+ // Created returns all bulk request results of "create" actions.
268
+ func (r * BulkResponse ) Created () []* BulkResponseItem {
269
+ return r .ByAction ("create" )
224
270
}
225
271
226
- func (r BulkIndexRequest ) String () string {
227
- lines , err := r .Source ()
228
- if err == nil {
229
- return strings .Join (lines , "\n " )
230
- }
231
- return fmt .Sprintf ("error: %v" , err )
272
+ // Updated returns all bulk request results of "update" actions.
273
+ func (r * BulkResponse ) Updated () []* BulkResponseItem {
274
+ return r .ByAction ("update" )
232
275
}
233
276
234
- func (r BulkIndexRequest ) Source () ([]string , error ) {
235
- // { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
236
- // { "field1" : "value1" }
237
-
238
- lines := make ([]string , 2 )
239
-
240
- // "index" ...
241
- command := make (map [string ]interface {})
242
- indexCommand := make (map [string ]interface {})
243
- command ["index" ] = indexCommand
244
- if r .Index != "" {
245
- indexCommand ["_index" ] = r .Index
246
- }
247
- if r .Type != "" {
248
- indexCommand ["_type" ] = r .Type
249
- }
250
- if r .Id != "" {
251
- indexCommand ["_id" ] = r .Id
252
- }
253
- // TODO _version
254
- // TODO _version_type
255
- // TODO _routing
256
- // TODO _percolate
257
- // TODO _parent
258
- // TODO _timestamp
259
- // TODO _ttl
260
- line , err := json .Marshal (command )
261
- if err != nil {
262
- return nil , err
263
- }
264
- lines [0 ] = string (line )
265
-
266
- // "field1" ...
267
- if r .Data != nil {
268
- switch t := r .Data .(type ) {
269
- default :
270
- body , err := json .Marshal (r .Data )
271
- if err != nil {
272
- return nil , err
273
- }
274
- lines [1 ] = string (body )
275
- case json.RawMessage :
276
- lines [1 ] = string (t )
277
- case * json.RawMessage :
278
- lines [1 ] = string (* t )
279
- case string :
280
- lines [1 ] = t
281
- case * string :
282
- lines [1 ] = * t
283
- }
284
- } else {
285
- lines [1 ] = "{}"
286
- }
287
-
288
- return lines , nil
277
+ // Deleted returns all bulk request results of "delete" actions.
278
+ func (r * BulkResponse ) Deleted () []* BulkResponseItem {
279
+ return r .ByAction ("delete" )
289
280
}
290
281
291
- // Bulk request to remove document from ElasticSearch.
292
- type BulkDeleteRequest struct {
293
- BulkableRequest
294
- Index string
295
- Type string
296
- Id string
297
- }
298
-
299
- func NewBulkDeleteRequest (index , _type , id string ) * BulkDeleteRequest {
300
- return & BulkDeleteRequest {
301
- Index : index ,
302
- Type : _type ,
303
- Id : id ,
282
+ // ByAction returns all bulk request results of a certain action,
283
+ // e.g. "index" or "delete".
284
+ func (r * BulkResponse ) ByAction (action string ) []* BulkResponseItem {
285
+ if r .Items == nil {
286
+ return nil
304
287
}
305
- }
306
-
307
- func (r BulkDeleteRequest ) String () string {
308
- lines , err := r .Source ()
309
- if err == nil {
310
- return strings .Join (lines , "\n " )
288
+ items := make ([]* BulkResponseItem , 0 )
289
+ for _ , item := range r .Items {
290
+ if result , found := item [action ]; found {
291
+ items = append (items , result )
292
+ }
311
293
}
312
- return fmt . Sprintf ( "error: %v" , err )
294
+ return items
313
295
}
314
296
315
- func (r BulkDeleteRequest ) Source () ([]string , error ) {
316
- lines := make ([]string , 1 )
317
-
318
- source := make (map [string ]interface {})
319
- data := make (map [string ]interface {})
320
- source ["delete" ] = data
321
-
322
- if r .Index != "" {
323
- data ["_index" ] = r .Index
297
+ // ById returns all bulk request results of a given document id,
298
+ // regardless of the action ("index", "delete" etc.).
299
+ func (r * BulkResponse ) ById (id string ) []* BulkResponseItem {
300
+ if r .Items == nil {
301
+ return nil
324
302
}
325
- if r .Type != "" {
326
- data ["_type" ] = r .Type
327
- }
328
- if r .Id != "" {
329
- data ["_id" ] = r .Id
330
- }
331
- // TODO _version
332
- // TODO _version_type
333
- // TODO _routing
334
- // TODO _percolate
335
- // TODO _parent
336
- // TODO _timestamp
337
- // TODO _ttl
338
-
339
- body , err := json .Marshal (source )
340
- if err != nil {
341
- return nil , err
303
+ items := make ([]* BulkResponseItem , 0 )
304
+ for _ , item := range r .Items {
305
+ for _ , result := range item {
306
+ if result .Id == id {
307
+ items = append (items , result )
308
+ }
309
+ }
342
310
}
343
-
344
- lines [0 ] = string (body )
345
-
346
- return lines , nil
311
+ return items
347
312
}
0 commit comments