forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxl-v1-object.go
493 lines (430 loc) · 14.7 KB
/
xl-v1-object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/hex"
"io"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/minio/minio/pkg/mimedb"
)
/// Object Operations
// GetObject - reads an object erasured coded across multiple
// disks. Supports additional parameters like offset and length
// which is synonymous with HTTP Range requests.
//
// startOffset indicates the location at which the client requested
// object to be read at. length indicates the total length of the
// object requested by client.
func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Lock the object before reading.
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
// Read metadata associated with the object from all disks.
metaArr, errs := xl.readAllXLMetadata(bucket, object)
// Do we have read quorum?
if !isQuorum(errs, xl.readQuorum) {
return toObjectErr(errXLReadQuorum, bucket, object)
}
// List all online disks.
onlineDisks, highestVersion, err := xl.listOnlineDisks(metaArr, errs)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Pick latest valid metadata.
var xlMeta xlMetaV1
for _, meta := range metaArr {
if meta.IsValid() && meta.Stat.Version == highestVersion {
xlMeta = meta
break
}
}
// Get start part index and offset.
partIndex, partOffset, err := xlMeta.ObjectToPartOffset(startOffset)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Get last part index to read given length.
lastPartIndex, _, err := xlMeta.ObjectToPartOffset(startOffset + length - 1)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo
for index := range onlineDisks {
eInfos = append(eInfos, metaArr[index].Erasure)
}
totalBytesRead := int64(0)
// Read from all parts.
for ; partIndex <= lastPartIndex; partIndex++ {
if length == totalBytesRead {
break
}
// Save the current part name and size.
partName := xlMeta.Parts[partIndex].Name
partSize := xlMeta.Parts[partIndex].Size
readSize := partSize - partOffset
// readSize should be adjusted so that we don't write more data than what was requested.
if readSize > (length - totalBytesRead) {
readSize = length - totalBytesRead
}
// Start reading the part name.
n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize)
if err != nil {
return err
}
totalBytesRead += n
// partOffset will be valid only for the first part, hence reset it to 0 for
// the remaining parts.
partOffset = 0
} // End of read all parts loop.
// Return success.
return nil
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return info, nil
}
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {
var xlMeta xlMetaV1
xlMeta, err = xl.readXLMetadata(bucket, object)
if err != nil {
// Return error.
return ObjectInfo{}, err
}
objInfo = ObjectInfo{
IsDir: false,
Bucket: bucket,
Name: object,
Size: xlMeta.Stat.Size,
ModTime: xlMeta.Stat.ModTime,
MD5Sum: xlMeta.Meta["md5Sum"],
ContentType: xlMeta.Meta["content-type"],
ContentEncoding: xlMeta.Meta["content-encoding"],
}
return objInfo, nil
}
func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) {
var wg = &sync.WaitGroup{}
// Undo rename object on disks where RenameFile succeeded.
// If srcEntry/dstEntry are objects then add a trailing slash to copy
// over all the parts inside the object directory
if !isPart {
srcEntry = retainSlash(srcEntry)
dstEntry = retainSlash(dstEntry)
}
for index, disk := range xl.storageDisks {
if disk == nil {
continue
}
// Undo rename object in parallel.
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if errs[index] != nil {
return
}
_ = disk.RenameFile(dstBucket, dstEntry, srcBucket, srcEntry)
}(index, disk)
}
wg.Wait()
}
// undoRenameObject - renames back the partially successful rename operations.
func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) {
isPart := false
xl.undoRename(srcBucket, srcObject, dstBucket, dstObject, isPart, errs)
}
// undoRenamePart - renames back the partially successful rename operation.
func (xl xlObjects) undoRenamePart(srcBucket, srcPart, dstBucket, dstPart string, errs []error) {
isPart := true
xl.undoRename(srcBucket, srcPart, dstBucket, dstPart, isPart, errs)
}
// rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations.
func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool) error {
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var errs = make([]error, len(xl.storageDisks))
if !isPart {
dstEntry = retainSlash(dstEntry)
srcEntry = retainSlash(srcEntry)
}
// Rename file on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
errs[index] = errDiskNotFound
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound {
errs[index] = err
}
}(index, disk)
}
// Wait for all renames to finish.
wg.Wait()
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure. Cleanup successful renames.
if !isQuorum(errs, xl.writeQuorum) {
// Check we have successful read quorum.
if isQuorum(errs, xl.readQuorum) {
return nil // Return success.
} // else - failed to acquire read quorum.
// Undo all the partial rename operations.
xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs)
return errXLWriteQuorum
}
// Return on first error, also undo any partially successful rename operations.
for _, err := range errs {
if err != nil && err != errDiskNotFound {
// Undo all the partial rename operations.
xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs)
return err
}
}
return nil
}
// renamePart - renames a part of the source object to the destination
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func (xl xlObjects) renamePart(srcBucket, srcObject, dstBucket, dstObject string) error {
isPart := true
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart)
}
// renameObject - renames all source objects to destination object
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error {
isPart := false
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart)
}
// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future
// object operations.
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Verify bucket exists.
if !xl.isBucketExist(bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
Bucket: bucket,
Object: object,
}
}
// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
uniqueID := getUUID()
tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "part.1")
minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix)
tempObj := uniqueID
// Initialize xl meta.
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
// Do we have write quroum?.
if !isQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
// Initialize md5 writer.
md5Writer := md5.New()
// Tee reader combines incoming data stream and md5, data read
// from input stream is written to md5.
teeReader := io.TeeReader(data, md5Writer)
// Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo
for range onlineDisks {
eInfos = append(eInfos, xlMeta.Erasure)
}
// Erasure code and write across all disks.
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "part.1", teeReader, eInfos, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
}
if size == -1 {
size = n
}
// Save additional erasureMetadata.
modTime := time.Now().UTC()
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// Update the md5sum if not set with the newly calculated one.
if len(metadata["md5Sum"]) == 0 {
metadata["md5Sum"] = newMD5Hex
}
// Guess content-type from the extension if possible.
if metadata["content-type"] == "" {
if objectExt := filepath.Ext(object); objectExt != "" {
if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok {
metadata["content-type"] = content.ContentType
}
}
}
// md5Hex representation.
md5Hex := metadata["md5Sum"]
if md5Hex != "" {
if newMD5Hex != md5Hex {
// MD5 mismatch, delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
// Returns md5 mismatch.
return "", BadDigest{md5Hex, newMD5Hex}
}
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return "", toObjectErr(errFileAccessDenied, bucket, object)
}
// Rename if an object already exists to temporary location.
newUniqueID := getUUID()
if xl.isObject(bucket, object) {
err = xl.renameObject(bucket, object, minioMetaTmpBucket, newUniqueID)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
}
// Fill all the necessary metadata.
xlMeta.Meta = metadata
xlMeta.Stat.Size = size
xlMeta.Stat.ModTime = modTime
xlMeta.Stat.Version = higherVersion
// Add the final part.
xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size)
// Update `xl.json` content on each disks.
for index := range partsMetadata {
partsMetadata[index] = xlMeta
partsMetadata[index].Erasure = newEInfos[index]
}
// Write unique `xl.json` for each disk.
if err = xl.writeUniqueXLMetadata(minioMetaTmpBucket, tempObj, partsMetadata); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Rename the successfully written temporary object to final location.
err = xl.renameObject(minioMetaTmpBucket, tempObj, bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}
// deleteObject - wrapper for delete object, deletes an object from
// all the disks in parallel, including `xl.json` associated with the
// object.
func (xl xlObjects) deleteObject(bucket, object string) error {
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
err := cleanupDir(disk, bucket, object)
if err != nil && err != errFileNotFound {
dErrs[index] = err
}
}(index, disk)
}
// Wait for all routines to finish.
wg.Wait()
if !isQuorum(dErrs, xl.writeQuorum) {
// Return errXLWriteQuorum if errors were more than
// allowed write quorum.
return errXLWriteQuorum
}
return nil
}
// DeleteObject - deletes an object, this call doesn't necessary reply
// any error as it is not necessary for the handler to reply back a
// response to the client request.
func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// Validate object exists.
if !xl.isObject(bucket, object) {
return ObjectNotFound{bucket, object}
} // else proceed to delete the object.
// Delete the object on all disks.
err = xl.deleteObject(bucket, object)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Success.
return nil
}